Skip to content

Commit

Permalink
transport: fixed unreachable notificaation
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed May 7, 2021
1 parent 04d4cfa commit cd3d154
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 27 deletions.
28 changes: 10 additions & 18 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (t *Transport) send(req pb.Message) (bool, failedSend) {
}
t.stopper.RunWorker(func() {
affected := make(nodeMap)
if !t.connectAndProcess(clusterID, toNodeID, addr, sq, from, affected) {
if !t.connectAndProcess(addr, sq, from, affected) {
t.notifyUnreachable(addr, affected)
}
shutdownQueue()
Expand All @@ -411,14 +411,8 @@ func (t *Transport) send(req pb.Message) (bool, failedSend) {

// connectAndProcess returns a boolean value indicating whether it is stopped
// gracefully when the system is being shutdown
func (t *Transport) connectAndProcess(clusterID uint64,
toNodeID uint64, remoteHost string, sq sendQueue,
from uint64, affected nodeMap) bool {
n := raftio.NodeInfo{
ClusterID: clusterID,
NodeID: from,
}
affected[n] = struct{}{}
func (t *Transport) connectAndProcess(remoteHost string,
sq sendQueue, from uint64, affected nodeMap) bool {
breaker := t.GetCircuitBreaker(remoteHost)
successes := breaker.Successes()
consecFailures := breaker.ConsecFailures()
Expand All @@ -433,11 +427,10 @@ func (t *Transport) connectAndProcess(clusterID uint64,
defer conn.Close()
breaker.Success()
if successes == 0 || consecFailures > 0 {
plog.Debugf("%s, message stream to %s (%s) established",
dn(clusterID, from), dn(clusterID, toNodeID), remoteHost)
plog.Debugf("message streaming to %s established", remoteHost)
t.sysEvents.ConnectionEstablished(remoteHost, false)
}
return t.processMessages(clusterID, toNodeID, sq, conn, affected)
return t.processMessages(remoteHost, sq, conn, affected)
}(); err != nil {
plog.Warningf("breaker %s to %s failed, connect and process failed: %s",
t.sourceID, remoteHost, err.Error())
Expand All @@ -449,9 +442,8 @@ func (t *Transport) connectAndProcess(clusterID uint64,
return true
}

func (t *Transport) processMessages(clusterID uint64,
toNodeID uint64, sq sendQueue, conn raftio.IConnection,
affected nodeMap) error {
func (t *Transport) processMessages(remoteHost string,
sq sendQueue, conn raftio.IConnection, affected nodeMap) error {
idleTimer := time.NewTimer(idleTimeout)
defer idleTimer.Stop()
sz := uint64(0)
Expand All @@ -470,7 +462,7 @@ func (t *Transport) processMessages(clusterID uint64,
return nil
case req := <-sq.ch:
n := raftio.NodeInfo{
ClusterID: clusterID,
ClusterID: req.ClusterId,
NodeID: req.From,
}
affected[n] = struct{}{}
Expand Down Expand Up @@ -499,14 +491,14 @@ func (t *Transport) processMessages(clusterID uint64,
}
if err := t.sendMessageBatch(conn, batch); err != nil {
plog.Errorf("send batch failed, target %s (%v), %d",
dn(clusterID, toNodeID), err, len(batch.Requests))
remoteHost, err, len(batch.Requests))
return err
}
if twoBatch {
batch.Requests = []pb.Message{requests[len(requests)-1]}
if err := t.sendMessageBatch(conn, batch); err != nil {
plog.Errorf("send batch failed, taret node %s (%v), %d",
dn(clusterID, toNodeID), err, len(batch.Requests))
remoteHost, err, len(batch.Requests))
return err
}
}
Expand Down
9 changes: 0 additions & 9 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,6 @@ func (h *testMessageHandler) getSnapshotCount(clusterID uint64,
return h.getMessageCount(h.snapshotCount, clusterID, nodeID)
}

func (h *testMessageHandler) getUnreachableCount(clusterID uint64,
nodeID uint64) uint64 {
return h.getMessageCount(h.unreachableCount, clusterID, nodeID)
}

func (h *testMessageHandler) getMessageCount(m map[raftio.NodeInfo]uint64,
clusterID uint64, nodeID uint64) uint64 {
h.mu.Lock()
Expand Down Expand Up @@ -601,10 +596,6 @@ func TestCircuitBreakerKicksInOnConnectivityIssue(t *testing.T) {
if !breaker.Ready() {
t.Errorf("breaker is not ready after wait")
}
if handler.getUnreachableCount(100, 1) == 0 {
t.Errorf("unreachable count %d, want 1",
handler.getUnreachableCount(100, 1))
}
}

func getTestSnapshotMessage(to uint64) raftpb.Message {
Expand Down

0 comments on commit cd3d154

Please sign in to comment.