Skip to content

Commit

Permalink
dragonboat: fixed a data race when requesting leadership transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Aug 4, 2019
1 parent b275e9f commit b26b55c
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 74 deletions.
1 change: 0 additions & 1 deletion internal/raft/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (rc *Peer) QuiescedTick() {
// RequestLeaderTransfer makes a request to transfer the leadership to the
// specified target node.
func (rc *Peer) RequestLeaderTransfer(target uint64) {
plog.Infof("RequestLeaderTransfer called, target %d", target)
rc.raft.Handle(pb.Message{
Type: pb.LeaderTransfer,
To: rc.raft.nodeID,
Expand Down
156 changes: 85 additions & 71 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,49 +52,50 @@ type engine interface {
}

type node struct {
readReqCount uint64
leaderID uint64
instanceID uint64
raftAddress string
config config.Config
confChangeC <-chan configChangeRequest
snapshotC <-chan rsm.SnapshotRequest
taskQ *rsm.TaskQueue
mq *server.MessageQueue
smAppliedIndex uint64
confirmedIndex uint64
pushedIndex uint64
engine engine
getStreamConnection func(uint64, uint64) pb.IChunkSink
handleSnapshotStatus func(uint64, uint64, bool)
sendRaftMessage func(pb.Message)
sm *rsm.StateMachine
smType pb.StateMachineType
incomingProposals *entryQueue
incomingReadIndexes *readIndexQueue
pendingProposals *pendingProposal
pendingReadIndexes *pendingReadIndex
pendingConfigChange *pendingConfigChange
pendingSnapshot *pendingSnapshot
raftMu sync.Mutex
p *raft.Peer
logreader *logdb.LogReader
logdb raftio.ILogDB
snapshotter *snapshotter
nodeRegistry transport.INodeRegistry
stopc chan struct{}
clusterInfo atomic.Value
tickCount uint64
expireNotified uint64
tickMillisecond uint64
syncTask *task
rateLimited bool
new bool
closeOnce sync.Once
ss *snapshotState
snapshotLock *syncutil.Lock
raftEvents *raftEventListener
initializedMu struct {
readReqCount uint64
leaderID uint64
instanceID uint64
raftAddress string
config config.Config
confChangeC <-chan configChangeRequest
snapshotC <-chan rsm.SnapshotRequest
taskQ *rsm.TaskQueue
mq *server.MessageQueue
smAppliedIndex uint64
confirmedIndex uint64
pushedIndex uint64
engine engine
getStreamConnection func(uint64, uint64) pb.IChunkSink
handleSnapshotStatus func(uint64, uint64, bool)
sendRaftMessage func(pb.Message)
sm *rsm.StateMachine
smType pb.StateMachineType
incomingProposals *entryQueue
incomingReadIndexes *readIndexQueue
pendingProposals *pendingProposal
pendingReadIndexes *pendingReadIndex
pendingConfigChange *pendingConfigChange
pendingSnapshot *pendingSnapshot
pendingLeaderTransfer *pendingLeaderTransfer
raftMu sync.Mutex
p *raft.Peer
logreader *logdb.LogReader
logdb raftio.ILogDB
snapshotter *snapshotter
nodeRegistry transport.INodeRegistry
stopc chan struct{}
clusterInfo atomic.Value
tickCount uint64
expireNotified uint64
tickMillisecond uint64
syncTask *task
rateLimited bool
new bool
closeOnce sync.Once
ss *snapshotState
snapshotLock *syncutil.Lock
raftEvents *raftEventListener
initializedMu struct {
sync.Mutex
initialized bool
}
Expand Down Expand Up @@ -126,39 +127,41 @@ func newNode(raftAddress string,
readIndexes := newReadIndexQueue(incomingReadIndexMaxLen)
confChangeC := make(chan configChangeRequest, 1)
snapshotC := make(chan rsm.SnapshotRequest, 1)
leaderTransfer := newPendingLeaderTransfer()
pp := newPendingProposal(requestStatePool,
proposals, config.ClusterID, config.NodeID, raftAddress, tickMillisecond)
pscr := newPendingReadIndex(requestStatePool, readIndexes, tickMillisecond)
pcc := newPendingConfigChange(confChangeC, tickMillisecond)
ps := newPendingSnapshot(snapshotC, tickMillisecond)
lr := logdb.NewLogReader(config.ClusterID, config.NodeID, ldb)
rn := &node{
instanceID: atomic.AddUint64(&instanceID, 1),
tickMillisecond: tickMillisecond,
config: config,
raftAddress: raftAddress,
incomingProposals: proposals,
incomingReadIndexes: readIndexes,
confChangeC: confChangeC,
snapshotC: snapshotC,
engine: engine,
getStreamConnection: getStreamConnection,
handleSnapshotStatus: handleSnapshotStatus,
stopc: stopc,
pendingProposals: pp,
pendingReadIndexes: pscr,
pendingConfigChange: pcc,
pendingSnapshot: ps,
nodeRegistry: nodeRegistry,
snapshotter: snapshotter,
logreader: lr,
sendRaftMessage: sendMessage,
mq: mq,
logdb: ldb,
snapshotLock: syncutil.NewLock(),
ss: &snapshotState{},
syncTask: newTask(syncTaskInterval),
smType: smType,
instanceID: atomic.AddUint64(&instanceID, 1),
tickMillisecond: tickMillisecond,
config: config,
raftAddress: raftAddress,
incomingProposals: proposals,
incomingReadIndexes: readIndexes,
confChangeC: confChangeC,
snapshotC: snapshotC,
engine: engine,
getStreamConnection: getStreamConnection,
handleSnapshotStatus: handleSnapshotStatus,
stopc: stopc,
pendingProposals: pp,
pendingReadIndexes: pscr,
pendingConfigChange: pcc,
pendingSnapshot: ps,
pendingLeaderTransfer: leaderTransfer,
nodeRegistry: nodeRegistry,
snapshotter: snapshotter,
logreader: lr,
sendRaftMessage: sendMessage,
mq: mq,
logdb: ldb,
snapshotLock: syncutil.NewLock(),
ss: &snapshotState{},
syncTask: newTask(syncTaskInterval),
smType: smType,
quiesceManager: quiesceManager{
electionTick: config.ElectionRTT * 2,
enabled: config.Quiesce,
Expand Down Expand Up @@ -356,8 +359,8 @@ func (n *node) read(handler ICompleteHandler,
return rs, err
}

func (n *node) requestLeaderTransfer(nodeID uint64) {
n.p.RequestLeaderTransfer(nodeID)
func (n *node) requestLeaderTransfer(nodeID uint64) error {
return n.pendingLeaderTransfer.request(nodeID)
}

func (n *node) requestSnapshot(opt SnapshotOption,
Expand Down Expand Up @@ -987,6 +990,9 @@ func (n *node) handleEvents() bool {
if n.handleProposals() {
hasEvent = true
}
if n.handleLeaderTransferRequest() {
hasEvent = true
}
if n.handleSnapshotRequest(lastApplied) {
hasEvent = true
}
Expand All @@ -1002,6 +1008,14 @@ func (n *node) handleEvents() bool {
return hasEvent
}

func (n *node) handleLeaderTransferRequest() bool {
target, ok := n.pendingLeaderTransfer.get()
if ok {
n.p.RequestLeaderTransfer(target)
}
return ok
}

func (n *node) handleSnapshotRequest(lastApplied uint64) bool {
var req rsm.SnapshotRequest
select {
Expand Down
7 changes: 5 additions & 2 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,8 +1087,11 @@ func (nh *NodeHost) RequestLeaderTransfer(clusterID uint64,
}
plog.Infof("RequestLeaderTransfer called on cluster %d target nodeid %d",
clusterID, targetNodeID)
v.requestLeaderTransfer(targetNodeID)
return nil
err := v.requestLeaderTransfer(targetNodeID)
if err == nil {
nh.execEngine.setNodeReady(clusterID)
}
return err
}

// SyncRemoveData is the synchronous variant of the RemoveData. It waits for
Expand Down
5 changes: 5 additions & 0 deletions raftpb/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ var (
emptyState = State{}
)

const (
// NoNode is the flag used to indicate that the node id field is not set.
NoNode uint64 = 0
)

// TODO
// structs below are not pb generated. move them to a more suitable place?

Expand Down
35 changes: 35 additions & 0 deletions requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ var (
// ErrClusterNotReady indicates that the request has been dropped as the
// raft cluster is not ready.
ErrClusterNotReady = errors.New("request dropped as the cluster is not ready")
// ErrInvalidTarget indicates that the specified node id invalid.
ErrInvalidTarget = errors.New("invalid target node ID")
// ErrPendingLeaderTransferExist indicates that leader transfer request exist.
ErrPendingLeaderTransferExist = errors.New("pending leader transfer exist")
)

// IsTempError returns a boolean value indicating whether the specified error
Expand Down Expand Up @@ -383,6 +387,37 @@ type pendingSnapshot struct {
logicalClock
}

type pendingLeaderTransfer struct {
leaderTransferC chan uint64
}

func newPendingLeaderTransfer() *pendingLeaderTransfer {
return &pendingLeaderTransfer{
leaderTransferC: make(chan uint64, 1),
}
}

func (l *pendingLeaderTransfer) request(target uint64) error {
if target == pb.NoNode {
return ErrInvalidTarget
}
select {
case l.leaderTransferC <- target:
default:
return ErrPendingLeaderTransferExist
}
return nil
}

func (l *pendingLeaderTransfer) get() (uint64, bool) {
select {
case v := <-l.leaderTransferC:
return v, true
default:
}
return 0, false
}

func newPendingSnapshot(snapshotC chan<- rsm.SnapshotRequest,
tickInMillisecond uint64) *pendingSnapshot {
gcTick := defaultGCTick
Expand Down
47 changes: 47 additions & 0 deletions requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,53 @@ const (
testTickInMillisecond uint64 = 50
)

func TestPendingLeaderTransferCanBeCreated(t *testing.T) {
p := newPendingLeaderTransfer()
if len(p.leaderTransferC) != 0 || p.leaderTransferC == nil {
t.Errorf("leaderTransferC not ready")
}
}

func TestLeaderTransferCanBeRequested(t *testing.T) {
p := newPendingLeaderTransfer()
if err := p.request(1); err != nil {
t.Errorf("failed to request leadership transfer %v", err)
}
if len(p.leaderTransferC) != 1 {
t.Errorf("leader transfer not requested")
}
}

func TestInvalidLeaderTransferIsNotAllowed(t *testing.T) {
p := newPendingLeaderTransfer()
if err := p.request(0); err != ErrInvalidTarget {
t.Errorf("failed to reject invalid target node id")
}
if err := p.request(1); err != nil {
t.Errorf("failed to request %v", err)
}
if err := p.request(2); err != ErrPendingLeaderTransferExist {
t.Errorf("failed to reject")
}
}

func TestCanGetExitingLeaderTransferRequest(t *testing.T) {
p := newPendingLeaderTransfer()
_, ok := p.get()
if ok {
t.Errorf("unexpectedly returned request")
}
p.request(1)
v, ok := p.get()
if !ok || v != 1 {
t.Errorf("failed to get request")
}
v, ok = p.get()
if ok || v != 0 {
t.Errorf("unexpectedly returned request")
}
}

func TestRequestStatePanicWhenNotReadyForRead(t *testing.T) {
fn := func(rs *RequestState) {
defer func() {
Expand Down

0 comments on commit b26b55c

Please sign in to comment.