Skip to content

Commit

Permalink
fix(eventbus): Idempotent wildcardSub close (#3045)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo authored Nov 18, 2024
1 parent 66e8db2 commit 829a438
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
12 changes: 8 additions & 4 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,21 @@ type wildcardSub struct {
w *wildcardNode
metricsTracer MetricsTracer
name string
closeOnce sync.Once
}

func (w *wildcardSub) Out() <-chan interface{} {
return w.ch
}

func (w *wildcardSub) Close() error {
w.w.removeSink(w.ch)
if w.metricsTracer != nil {
w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
w.closeOnce.Do(func() {
w.w.removeSink(w.ch)
if w.metricsTracer != nil {
w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
}
})

return nil
}

Expand Down
15 changes: 11 additions & 4 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,13 @@ func TestManyWildcardSubscriptions(t *testing.T) {
require.NoError(t, em1.Emit(EventA{}))
require.NoError(t, em2.Emit(EventB(1)))

// the first five still have 2 events, while the other five have 4 events.
for _, s := range subs[:5] {
require.Len(t, s.Out(), 2)
}
// the first five 0 events because it was closed. The other five
// have 4 events.
require.EventuallyWithT(t, func(t *assert.CollectT) {
for _, s := range subs[:5] {
require.Len(t, s.Out(), 0, "expected closed subscription to have flushed events")
}
}, 2*time.Second, 100*time.Millisecond)

for _, s := range subs[5:] {
require.Len(t, s.Out(), 4)
Expand All @@ -407,6 +410,10 @@ func TestManyWildcardSubscriptions(t *testing.T) {
for _, s := range subs {
require.NoError(t, s.Close())
}

for _, s := range subs {
require.Zero(t, s.(*wildcardSub).w.nSinks.Load())
}
}

func TestWildcardValidations(t *testing.T) {
Expand Down

0 comments on commit 829a438

Please sign in to comment.