Skip to content

Commit

Permalink
dragonboat: fixed #260
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Dec 6, 2022
1 parent 2da3403 commit 014329c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 20 deletions.
40 changes: 21 additions & 19 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,14 +478,14 @@ func (nh *NodeHost) Stop() {
// cluster.
//
// As a summary, when -
// - starting a brand new Raft cluster, set join to false and specify all initial
// member node details in the initialMembers map.
// - joining a new node to an existing Raft cluster, set join to true and leave
// the initialMembers map empty. This requires the joining node to have already
// been added as a member node of the Raft cluster.
// - restarting an crashed or stopped node, set join to false and leave the
// initialMembers map to be empty. This applies to both initial member nodes
// and those joined later.
// - starting a brand new Raft cluster, set join to false and specify all initial
// member node details in the initialMembers map.
// - joining a new node to an existing Raft cluster, set join to true and leave
// the initialMembers map empty. This requires the joining node to have already
// been added as a member node of the Raft cluster.
// - restarting an crashed or stopped node, set join to false and leave the
// initialMembers map to be empty. This applies to both initial member nodes
// and those joined later.
func (nh *NodeHost) StartCluster(initialMembers map[uint64]Target,
join bool, create sm.CreateStateMachineFunc, cfg config.Config) error {
cf := func(clusterID uint64, nodeID uint64,
Expand Down Expand Up @@ -1520,17 +1520,17 @@ func (nh *NodeHost) getClusterSetIndex() uint64 {

// there are three major reasons to bootstrap the cluster
//
// 1. when possible, we check whether user incorrectly specified parameters
// for the startCluster method, e.g. call startCluster with join=true first,
// then restart the NodeHost instance and call startCluster again with
// join=false and len(nodes) > 0
// 2. when restarting a node which is a part of the initial cluster members,
// for user convenience, we allow the caller not to provide the details of
// initial members. when the initial cluster member info is required, however
// we still need to get the initial member info from somewhere. bootstrap is
// the procedure that records such info.
// 3. the bootstrap record is used as a marker record in our default LogDB
// implementation to indicate that a certain node exists here
// 1. when possible, we check whether user incorrectly specified parameters
// for the startCluster method, e.g. call startCluster with join=true first,
// then restart the NodeHost instance and call startCluster again with
// join=false and len(nodes) > 0
// 2. when restarting a node which is a part of the initial cluster members,
// for user convenience, we allow the caller not to provide the details of
// initial members. when the initial cluster member info is required, however
// we still need to get the initial member info from somewhere. bootstrap is
// the procedure that records such info.
// 3. the bootstrap record is used as a marker record in our default LogDB
// implementation to indicate that a certain node exists here
func (nh *NodeHost) bootstrapCluster(initialMembers map[uint64]Target,
join bool, cfg config.Config,
smType pb.StateMachineType) (map[uint64]string, bool, error) {
Expand Down Expand Up @@ -1999,6 +1999,8 @@ func getRequestState(ctx context.Context, rs *RequestState) (sm.Result, error) {
return sm.Result{}, ErrClusterClosed
} else if r.Dropped() {
return sm.Result{}, ErrClusterNotReady
} else if r.Aborted() {
return sm.Result{}, ErrAborted
}
plog.Panicf("unknown v code %v", r)
case <-ctx.Done():
Expand Down
50 changes: 50 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,56 @@ func TestRecoverFromSnapshotCanBeStopped(t *testing.T) {
runNodeHostTest(t, to, fs)
}

func TestGetRequestState(t *testing.T) {
tests := []struct {
code RequestResultCode
err error
}{
{requestCompleted, nil},
{requestRejected, ErrRejected},
{requestTimeout, ErrTimeout},
{requestTerminated, ErrClusterClosed},
{requestDropped, ErrClusterNotReady},
{requestAborted, ErrAborted},
}

for _, tt := range tests {
rs := &RequestState{
CompletedC: make(chan RequestResult, 1),
}
result := RequestResult{code: tt.code}
rs.notify(result)
if _, err := getRequestState(context.TODO(), rs); !errors.Is(err, tt.err) {
t.Errorf("expect error %v, got %v", tt.err, err)
}
}
}

func TestGetRequestStateTimeoutAndCancel(t *testing.T) {
func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
time.Sleep(2 * time.Millisecond)
rs := &RequestState{
CompletedC: make(chan RequestResult, 1),
}
if _, err := getRequestState(ctx, rs); !errors.Is(err, ErrTimeout) {
t.Errorf("got %v", err)
}
}()

func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
cancel()
rs := &RequestState{
CompletedC: make(chan RequestResult, 1),
}
if _, err := getRequestState(ctx, rs); !errors.Is(err, ErrCanceled) {
t.Errorf("got %v", err)
}
}()
}

func TestNodeHostIDIsStatic(t *testing.T) {
fs := vfs.GetTestFS()
id := ""
Expand Down
6 changes: 5 additions & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ var (
ErrCanceled = errors.New("request canceled")
// ErrRejected indicates that the request has been rejected.
ErrRejected = errors.New("request rejected")
// ErrAborted indicates that the request has been aborted, usually by user
// defined behaviours.
ErrAborted = errors.New("request aborted")
// ErrClusterNotReady indicates that the request has been dropped as the
// specified raft cluster is not ready to handle the request. Unknown leader
// is the most common cause of this error, trying to use a cluster not fully
Expand Down Expand Up @@ -115,7 +118,8 @@ func IsTempError(err error) bool {
err == ErrClusterNotInitialized ||
err == ErrClusterNotReady ||
err == ErrTimeout ||
err == ErrClosed
err == ErrClosed ||
err == ErrAborted
}

// RequestResultCode is the result code returned to the client to indicate the
Expand Down
1 change: 1 addition & 0 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestIsTempError(t *testing.T) {
{ErrTimeout, true},
{ErrCanceled, false},
{ErrRejected, false},
{ErrAborted, true},
{ErrClusterNotReady, true},
{ErrInvalidTarget, false},
{ErrInvalidNodeHostID, false},
Expand Down

0 comments on commit 014329c

Please sign in to comment.