Skip to content

Commit

Permalink
reject gRPC requests from failed nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
DrmagicE committed Jul 20, 2021
1 parent 8f8990f commit 7ef79be
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 23 deletions.
Binary file added plugin/.DS_Store
Binary file not shown.
79 changes: 57 additions & 22 deletions plugin/federation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,21 @@ func (s *sessionMgr) add(nodeName string, id string) (cleanStart bool, nextID ui
// TODO config
seenEvents: newLRUCache(100),
nextEventID: 0,
close: make(chan struct{}),
}
}
return
}

func (s *sessionMgr) del(nodeName string) {
s.Lock()
defer s.Unlock()
if sess := s.sessions[nodeName]; sess != nil {
close(sess.close)
}
delete(s.sessions, nodeName)
}

func (s *sessionMgr) get(nodeName string) *session {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -334,6 +344,7 @@ type session struct {
nextEventID uint64
// seenEvents cache recently seen events to avoid duplicate events.
seenEvents *lruCache
close chan struct{}
}

// lruCache is the cache for recently seen events.
Expand Down Expand Up @@ -387,6 +398,11 @@ func (f *Federation) Hello(ctx context.Context, req *ClientHello) (resp *ServerH
if err != nil {
return nil, err
}
f.memberMu.Lock()
if f.peers[nodeName] == nil {
return nil, status.Errorf(codes.Internal, "Hello: the node [%s] has not yet joined", nodeName)
}
f.memberMu.Unlock()
cleanStart, nextID := f.sessionMgr.add(nodeName, req.SessionId)
if cleanStart {
_ = f.fedSubStore.UnsubscribeAll(nodeName)
Expand Down Expand Up @@ -449,31 +465,50 @@ func (f *Federation) EventStream(stream Federation_EventStreamServer) (err error
}
sess := f.sessionMgr.get(nodeName)
if sess == nil {
return status.Errorf(codes.Internal, "EventStream: node not exist")
return status.Errorf(codes.Internal, "EventStream: node [%s] does not exist", nodeName)
}
for {
var in *Event
in, err = stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if ce := log.Check(zapcore.DebugLevel, "event received"); ce != nil {
ce.Write(zap.String("event", in.String()))
}
ack := f.eventStreamHandler(sess, in)

err = stream.Send(ack)
if err != nil {
return err
}
if ce := log.Check(zapcore.DebugLevel, "event ack sent"); ce != nil {
ce.Write(zap.Uint64("id", ack.EventId))
errCh := make(chan error, 1)
done := make(chan struct{})
// close the session if the client node has been mark as failed.
go func() {
<-sess.close
errCh <- fmt.Errorf("EventStream: the session of node [%s] has been closed", nodeName)
close(done)
}()
go func() {
for {
var in *Event
select {
case <-done:
default:
in, err = stream.Recv()
if err != nil {
errCh <- err
return
}
if ce := log.Check(zapcore.DebugLevel, "event received"); ce != nil {
ce.Write(zap.String("event", in.String()))
}

ack := f.eventStreamHandler(sess, in)

err = stream.Send(ack)
if err != nil {
errCh <- err
return
}
if ce := log.Check(zapcore.DebugLevel, "event ack sent"); ce != nil {
ce.Write(zap.Uint64("id", ack.EventId))
}
sess.nextEventID = ack.EventId + 1
}
}
sess.nextEventID = ack.EventId + 1
}()
err = <-errCh
if err == io.EOF {
return nil
}
return err
}

func (f *Federation) mustEmbedUnimplementedFederationServer() {
Expand Down
1 change: 1 addition & 0 deletions plugin/federation/federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestFederation_Hello(t *testing.T) {
p, _ := New(testConfig)
f := p.(*Federation)
clientNodeName := "node1"
f.peers[clientNodeName] = &peer{}
clientSid := "session_id"
f.fedSubStore.Subscribe(clientNodeName, &gmqtt.Subscription{
TopicFilter: "topicA",
Expand Down
1 change: 1 addition & 0 deletions plugin/federation/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (f *Federation) nodeFail(member serf.MemberEvent) {
p.stop()
delete(f.peers, v.Name)
_ = f.fedSubStore.UnsubscribeAll(v.Name)
f.sessionMgr.del(v.Name)
}
}
}
2 changes: 1 addition & 1 deletion plugin/federation/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestPeer_initStream_CleanStartFalse(t *testing.T) {

client.EXPECT().EventStream(gomock.Any())

_, err := p.initStream(client,nil)
_, err := p.initStream(client, nil)
a.NoError(err)

}

0 comments on commit 7ef79be

Please sign in to comment.