Skip to content

Commit

Permalink
Simplify callbacks (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
ders authored May 30, 2024
1 parent 759842d commit 5b4825c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 44 deletions.
12 changes: 5 additions & 7 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *Collection) Add(label string, s Service) {
// Just in case someone adds a service to a running collection, make sure we get its events. The alternative would
// be to disallow adding the service in the first place, but we don't want to do that.
if c.running {
s.Register(c)
s.RegisterCallback(c.stateChanged)
}
}

Expand All @@ -83,11 +83,11 @@ func (c *Collection) Add(label string, s Service) {
//
// Calling Run on an already running collection has no effect.
func (c *Collection) Run() {
defer c.OnNotify(Event{})
defer c.stateChanged(Event{})
c.Lock()
defer c.Unlock()
for _, s := range c.services {
s.Register(c)
s.RegisterCallback(c.stateChanged)
}
}

Expand Down Expand Up @@ -124,11 +124,9 @@ func (c *Collection) Stop() {
}
}

// OnNotify updates the state of the collection according to the states of all of the monitored services. No update is
// stateChanged updates the state of the collection according to the states of all of the monitored services. No update is
// done if any of the services are still initializing.
//
// OnNotify is used internally as a callback when any monitored service changes state. It is not normally called directly.
func (c *Collection) OnNotify(_ Event) {
func (c *Collection) stateChanged(_ Event) {

c.Lock()
defer c.Unlock()
Expand Down
40 changes: 25 additions & 15 deletions monitor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nested

import (
"math/rand"
"sync"
)

Expand All @@ -11,7 +12,7 @@ type Monitor struct {
sync.Mutex
state State // current state
err error // current error state, if the state is not ready
observers map[Observer]struct{}
callbacks map[Token]func(Event)
}

// Verifies that a Monitor implements the Service interface. Note that the Service interface does NOT include the
Expand All @@ -38,22 +39,31 @@ func (m *Monitor) Stop() {
m.setState(Stopped, nil)
}

// Register registers an observer, whose OnNotify method will be called any time there is a state change. Does nothing
// if the observer is already registered.
func (m *Monitor) Register(o Observer) {
// RegisterCallback registers a function which will be called any time there is a state change. Returns a token that
// can be used to deregister it later.
func (m *Monitor) RegisterCallback(f func(Event)) Token {
m.Lock()
defer m.Unlock()
if m.observers == nil {
m.observers = make(map[Observer]struct{})
if m.callbacks == nil {
m.callbacks = make(map[Token]func(Event))
}
m.observers[o] = struct{}{}

// Choose a random token that we haven't used.
var token Token
for ok := true; ok; {
token = Token(rand.Uint32())
_, ok = m.callbacks[token]
}

m.callbacks[token] = f
return token
}

// Deregister removes a registered observer. Does nothing if the observer is not registered.
func (m *Monitor) Deregister(o Observer) {
// Deregister removes a registered callback. Does nothing if there is no callback registered with the provided token.
func (m *Monitor) DeregisterCallback(token Token) {
m.Lock()
defer m.Unlock()
delete(m.observers, o)
delete(m.callbacks, token)
}

// SetReady sets the monitor state to Ready. If there are registered observers, all observers are called before returning.
Expand Down Expand Up @@ -98,12 +108,12 @@ func (m *Monitor) setState(newState State, newErr error) {
}

// Notify all observers.
wg.Add(len(m.observers))
for o := range m.observers {
wg.Add(len(m.callbacks))
for _, cb := range m.callbacks {
// Run these in the background so as not to block while holding the lock.
go func(o Observer) {
o.OnNotify(ev)
go func(f func(Event)) {
f(ev)
wg.Done()
}(o)
}(cb)
}
}
20 changes: 8 additions & 12 deletions monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestMonitor(t *testing.T) {
func TestMonitorNotifications(t *testing.T) {

mon := Monitor{}
ch := make(testObserver, 1)
mon.Register(ch)
ch := make(chan Event, 1)
mon.RegisterCallback(func(ev Event) { ch <- ev })

// Set to Ready.
mon.SetReady()
Expand Down Expand Up @@ -118,12 +118,9 @@ func TestMonitorNotifications(t *testing.T) {
func TestDeregister(t *testing.T) {

mon := Monitor{}
ch := make(testObserver, 1)
ch := make(chan Event, 1)

// Deregistering something that doesn't exist is not an error.
mon.Deregister(ch)

mon.Register(ch)
foo := mon.RegisterCallback(func(ev Event) { ch <- ev })

// Set to ready.
mon.SetReady()
Expand All @@ -132,17 +129,16 @@ func TestDeregister(t *testing.T) {
assertEqual(t, n.NewState, Ready)
assertEqual(t, n.Error, nil)

mon.Deregister(ch)
mon.DeregisterCallback(foo)

// No more notifications.
mon.Stop()
if len(ch) > 0 {
t.Error("unexpected notification")
}

// Deregistering again is not an error
mon.DeregisterCallback(foo)

close(ch)
}

type testObserver chan Event

func (ch testObserver) OnNotify(ev Event) { ch <- ev }
18 changes: 8 additions & 10 deletions nested.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ type Event struct {
Error error // error condition if the new state is Error, nil otherwise
}

// An observer receives notifications of state changes.
type Observer interface {
OnNotify(Event)
}

// The Service interface defines the behavior of a nested service.
type Service interface {
// GetState returns the current state of the service.
Expand All @@ -40,9 +35,12 @@ type Service interface {
Err() error
// Stop stops the service and releases all resources. Stop should not return until the service shutdown is complete.
Stop()
// Register registers an observer, whose OnNotify method will be called any time there is a state change. Does
// nothing if the observer is already registered.
Register(Observer)
// Deregister removes a registered observer. Does nothing if the observer is not registered.
Deregister(Observer)
// RegisterCallback registers a function which will be called any time there is a state change. Returns a token
// that can be used to deregister it later.
RegisterCallback(f func(Event)) Token
// Deregister removes a registered callback. Does nothing if there is no callback registered with the provided token.
DeregisterCallback(Token)
}

// A Token identifies a registered callback so that it can later be deregistered.
type Token uint32

0 comments on commit 5b4825c

Please sign in to comment.