Skip to content

Commit

Permalink
Merge pull request #133 from DrmagicE/dev
Browse files Browse the repository at this point in the history
fix(federation): Fix client hanging issue
  • Loading branch information
DrmagicE authored Jul 20, 2021
2 parents 0a2b151 + 7ef79be commit 8d33b56
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 38 deletions.
Binary file added plugin/.DS_Store
Binary file not shown.
79 changes: 57 additions & 22 deletions plugin/federation/federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,21 @@ func (s *sessionMgr) add(nodeName string, id string) (cleanStart bool, nextID ui
// TODO config
seenEvents: newLRUCache(100),
nextEventID: 0,
close: make(chan struct{}),
}
}
return
}

func (s *sessionMgr) del(nodeName string) {
s.Lock()
defer s.Unlock()
if sess := s.sessions[nodeName]; sess != nil {
close(sess.close)
}
delete(s.sessions, nodeName)
}

func (s *sessionMgr) get(nodeName string) *session {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -334,6 +344,7 @@ type session struct {
nextEventID uint64
// seenEvents cache recently seen events to avoid duplicate events.
seenEvents *lruCache
close chan struct{}
}

// lruCache is the cache for recently seen events.
Expand Down Expand Up @@ -387,6 +398,11 @@ func (f *Federation) Hello(ctx context.Context, req *ClientHello) (resp *ServerH
if err != nil {
return nil, err
}
f.memberMu.Lock()
if f.peers[nodeName] == nil {
return nil, status.Errorf(codes.Internal, "Hello: the node [%s] has not yet joined", nodeName)
}
f.memberMu.Unlock()
cleanStart, nextID := f.sessionMgr.add(nodeName, req.SessionId)
if cleanStart {
_ = f.fedSubStore.UnsubscribeAll(nodeName)
Expand Down Expand Up @@ -449,31 +465,50 @@ func (f *Federation) EventStream(stream Federation_EventStreamServer) (err error
}
sess := f.sessionMgr.get(nodeName)
if sess == nil {
return status.Errorf(codes.Internal, "EventStream: node not exist")
return status.Errorf(codes.Internal, "EventStream: node [%s] does not exist", nodeName)
}
for {
var in *Event
in, err = stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if ce := log.Check(zapcore.DebugLevel, "event received"); ce != nil {
ce.Write(zap.String("event", in.String()))
}
ack := f.eventStreamHandler(sess, in)

err = stream.Send(ack)
if err != nil {
return err
}
if ce := log.Check(zapcore.DebugLevel, "event ack sent"); ce != nil {
ce.Write(zap.Uint64("id", ack.EventId))
errCh := make(chan error, 1)
done := make(chan struct{})
// close the session if the client node has been mark as failed.
go func() {
<-sess.close
errCh <- fmt.Errorf("EventStream: the session of node [%s] has been closed", nodeName)
close(done)
}()
go func() {
for {
var in *Event
select {
case <-done:
default:
in, err = stream.Recv()
if err != nil {
errCh <- err
return
}
if ce := log.Check(zapcore.DebugLevel, "event received"); ce != nil {
ce.Write(zap.String("event", in.String()))
}

ack := f.eventStreamHandler(sess, in)

err = stream.Send(ack)
if err != nil {
errCh <- err
return
}
if ce := log.Check(zapcore.DebugLevel, "event ack sent"); ce != nil {
ce.Write(zap.Uint64("id", ack.EventId))
}
sess.nextEventID = ack.EventId + 1
}
}
sess.nextEventID = ack.EventId + 1
}()
err = <-errCh
if err == io.EOF {
return nil
}
return err
}

func (f *Federation) mustEmbedUnimplementedFederationServer() {
Expand Down
1 change: 1 addition & 0 deletions plugin/federation/federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestFederation_Hello(t *testing.T) {
p, _ := New(testConfig)
f := p.(*Federation)
clientNodeName := "node1"
f.peers[clientNodeName] = &peer{}
clientSid := "session_id"
f.fedSubStore.Subscribe(clientNodeName, &gmqtt.Subscription{
TopicFilter: "topicA",
Expand Down
1 change: 1 addition & 0 deletions plugin/federation/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (f *Federation) nodeFail(member serf.MemberEvent) {
p.stop()
delete(f.peers, v.Name)
_ = f.fedSubStore.UnsubscribeAll(v.Name)
f.sessionMgr.del(v.Name)
}
}
}
28 changes: 18 additions & 10 deletions plugin/federation/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"github.com/DrmagicE/gmqtt/persistence/subscription"
)

type peerState byte

const (
peerStateStopped = iota + 1
peerStateStopped peerState = iota + 1
peerStateStreaming
)

Expand All @@ -33,14 +35,16 @@ type peer struct {
// local session id
sessionID string
queue queue
// client-side stream
// stateMu guards the following fields
stateMu sync.Mutex
state int
stream *stream
state peerState
// client-side stream
stream *stream
}

type stream struct {
queue queue
conn *grpc.ClientConn
client Federation_EventStreamClient
close chan struct{}
errOnce sync.Once
Expand Down Expand Up @@ -177,12 +181,15 @@ func (p *peer) stop() {
close(p.exit)
}
p.stateMu.Lock()
if p.state == peerStateStreaming {
_ = p.stream.client.CloseSend()
state := p.state
if state == peerStateStreaming {
_ = p.stream.conn.Close()
}
p.state = peerStateStopped
p.stateMu.Unlock()
p.stream.wg.Wait()
if state == peerStateStreaming {
p.stream.wg.Wait()
}
}

func (p *peer) serveEventStream() {
Expand Down Expand Up @@ -210,7 +217,7 @@ func (p *peer) serveEventStream() {
}
}

func (p *peer) initStream(client FederationClient) (s *stream, err error) {
func (p *peer) initStream(client FederationClient, conn *grpc.ClientConn) (s *stream, err error) {
p.stateMu.Lock()
defer func() {
if err == nil {
Expand Down Expand Up @@ -265,6 +272,7 @@ func (p *peer) initStream(client FederationClient) (s *stream, err error) {
p.queue.open()
s = &stream{
queue: p.queue,
conn: conn,
client: c,
close: make(chan struct{}),
}
Expand All @@ -291,7 +299,7 @@ func (p *peer) serveStream(reconnectCount int, backoff *time.Timer) (err error)
return err
}
client := NewFederationClient(conn)
s, err := p.initStream(client)
s, err := p.initStream(client, conn)
if err != nil {
return err
}
Expand All @@ -309,7 +317,7 @@ func (s *stream) serve() error {
func (s *stream) setError(err error) {
s.errOnce.Do(func() {
s.queue.close()
s.client.CloseSend()
s.conn.Close()
close(s.close)
if err != nil && err != io.EOF {
log.Error("stream error", zap.Error(err))
Expand Down
8 changes: 2 additions & 6 deletions plugin/federation/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPeer_initStream_CleanStart(t *testing.T) {
}).Times(4)

client.EXPECT().EventStream(gomock.Any())
_, err := p.initStream(client)
_, err := p.initStream(client, nil)

a.NoError(err)
for k, v := range msgEvents {
Expand Down Expand Up @@ -166,11 +166,7 @@ func TestPeer_initStream_CleanStartFalse(t *testing.T) {

client.EXPECT().EventStream(gomock.Any())

_, err := p.initStream(client)
_, err := p.initStream(client, nil)
a.NoError(err)

}

func TestEventQueue(t *testing.T) {

}

0 comments on commit 8d33b56

Please sign in to comment.