diff --git a/nodehost.go b/nodehost.go index 5a1f87e62..89961a670 100644 --- a/nodehost.go +++ b/nodehost.go @@ -478,14 +478,14 @@ func (nh *NodeHost) Stop() { // cluster. // // As a summary, when - -// - starting a brand new Raft cluster, set join to false and specify all initial -// member node details in the initialMembers map. -// - joining a new node to an existing Raft cluster, set join to true and leave -// the initialMembers map empty. This requires the joining node to have already -// been added as a member node of the Raft cluster. -// - restarting an crashed or stopped node, set join to false and leave the -// initialMembers map to be empty. This applies to both initial member nodes -// and those joined later. +// - starting a brand new Raft cluster, set join to false and specify all initial +// member node details in the initialMembers map. +// - joining a new node to an existing Raft cluster, set join to true and leave +// the initialMembers map empty. This requires the joining node to have already +// been added as a member node of the Raft cluster. +// - restarting an crashed or stopped node, set join to false and leave the +// initialMembers map to be empty. This applies to both initial member nodes +// and those joined later. func (nh *NodeHost) StartCluster(initialMembers map[uint64]Target, join bool, create sm.CreateStateMachineFunc, cfg config.Config) error { cf := func(clusterID uint64, nodeID uint64, @@ -1520,17 +1520,17 @@ func (nh *NodeHost) getClusterSetIndex() uint64 { // there are three major reasons to bootstrap the cluster // -// 1. when possible, we check whether user incorrectly specified parameters -// for the startCluster method, e.g. call startCluster with join=true first, -// then restart the NodeHost instance and call startCluster again with -// join=false and len(nodes) > 0 -// 2. when restarting a node which is a part of the initial cluster members, -// for user convenience, we allow the caller not to provide the details of -// initial members. when the initial cluster member info is required, however -// we still need to get the initial member info from somewhere. bootstrap is -// the procedure that records such info. -// 3. the bootstrap record is used as a marker record in our default LogDB -// implementation to indicate that a certain node exists here +// 1. when possible, we check whether user incorrectly specified parameters +// for the startCluster method, e.g. call startCluster with join=true first, +// then restart the NodeHost instance and call startCluster again with +// join=false and len(nodes) > 0 +// 2. when restarting a node which is a part of the initial cluster members, +// for user convenience, we allow the caller not to provide the details of +// initial members. when the initial cluster member info is required, however +// we still need to get the initial member info from somewhere. bootstrap is +// the procedure that records such info. +// 3. the bootstrap record is used as a marker record in our default LogDB +// implementation to indicate that a certain node exists here func (nh *NodeHost) bootstrapCluster(initialMembers map[uint64]Target, join bool, cfg config.Config, smType pb.StateMachineType) (map[uint64]string, bool, error) { @@ -1999,6 +1999,8 @@ func getRequestState(ctx context.Context, rs *RequestState) (sm.Result, error) { return sm.Result{}, ErrClusterClosed } else if r.Dropped() { return sm.Result{}, ErrClusterNotReady + } else if r.Aborted() { + return sm.Result{}, ErrAborted } plog.Panicf("unknown v code %v", r) case <-ctx.Done(): diff --git a/nodehost_test.go b/nodehost_test.go index a850422ef..fd2b266b2 100644 --- a/nodehost_test.go +++ b/nodehost_test.go @@ -1537,6 +1537,56 @@ func TestRecoverFromSnapshotCanBeStopped(t *testing.T) { runNodeHostTest(t, to, fs) } +func TestGetRequestState(t *testing.T) { + tests := []struct { + code RequestResultCode + err error + }{ + {requestCompleted, nil}, + {requestRejected, ErrRejected}, + {requestTimeout, ErrTimeout}, + {requestTerminated, ErrClusterClosed}, + {requestDropped, ErrClusterNotReady}, + {requestAborted, ErrAborted}, + } + + for _, tt := range tests { + rs := &RequestState{ + CompletedC: make(chan RequestResult, 1), + } + result := RequestResult{code: tt.code} + rs.notify(result) + if _, err := getRequestState(context.TODO(), rs); !errors.Is(err, tt.err) { + t.Errorf("expect error %v, got %v", tt.err, err) + } + } +} + +func TestGetRequestStateTimeoutAndCancel(t *testing.T) { + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + time.Sleep(2 * time.Millisecond) + rs := &RequestState{ + CompletedC: make(chan RequestResult, 1), + } + if _, err := getRequestState(ctx, rs); !errors.Is(err, ErrTimeout) { + t.Errorf("got %v", err) + } + }() + + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + cancel() + rs := &RequestState{ + CompletedC: make(chan RequestResult, 1), + } + if _, err := getRequestState(ctx, rs); !errors.Is(err, ErrCanceled) { + t.Errorf("got %v", err) + } + }() +} + func TestNodeHostIDIsStatic(t *testing.T) { fs := vfs.GetTestFS() id := "" diff --git a/request.go b/request.go index 0c0af1002..1c8aad6e7 100644 --- a/request.go +++ b/request.go @@ -75,6 +75,9 @@ var ( ErrCanceled = errors.New("request canceled") // ErrRejected indicates that the request has been rejected. ErrRejected = errors.New("request rejected") + // ErrAborted indicates that the request has been aborted, usually by user + // defined behaviours. + ErrAborted = errors.New("request aborted") // ErrClusterNotReady indicates that the request has been dropped as the // specified raft cluster is not ready to handle the request. Unknown leader // is the most common cause of this error, trying to use a cluster not fully @@ -115,7 +118,8 @@ func IsTempError(err error) bool { err == ErrClusterNotInitialized || err == ErrClusterNotReady || err == ErrTimeout || - err == ErrClosed + err == ErrClosed || + err == ErrAborted } // RequestResultCode is the result code returned to the client to indicate the diff --git a/request_test.go b/request_test.go index daf218ee1..81eb3d462 100644 --- a/request_test.go +++ b/request_test.go @@ -45,6 +45,7 @@ func TestIsTempError(t *testing.T) { {ErrTimeout, true}, {ErrCanceled, false}, {ErrRejected, false}, + {ErrAborted, true}, {ErrClusterNotReady, true}, {ErrInvalidTarget, false}, {ErrInvalidNodeHostID, false},