diff --git a/internal/rsm/statemachine.go b/internal/rsm/statemachine.go index a3539a749..9bb725590 100644 --- a/internal/rsm/statemachine.go +++ b/internal/rsm/statemachine.go @@ -707,6 +707,10 @@ func (s *StateMachine) setLastApplied(entries []pb.Entry) { } } +func (s *StateMachine) savingDummySnapshot(r SSRequest) bool { + return s.OnDiskStateMachine() && !r.Streaming() && !r.Exported() +} + func (s *StateMachine) checkSnapshotStatus(r SSRequest) error { if s.aborted { return sm.ErrSnapshotStopped @@ -771,7 +775,7 @@ func (s *StateMachine) prepare(r SSRequest) (SSMeta, error) { } var err error var ctx interface{} - if s.Concurrent() { + if s.Concurrent() && !s.savingDummySnapshot(r) { ctx, err = s.sm.Prepare() if err != nil { return SSMeta{}, err diff --git a/internal/rsm/statemachine_test.go b/internal/rsm/statemachine_test.go index d55bd9809..c051bda53 100644 --- a/internal/rsm/statemachine_test.go +++ b/internal/rsm/statemachine_test.go @@ -2016,11 +2016,15 @@ func TestAlreadyAppliedInOnDiskSMEntryTreatedAsNoOP(t *testing.T) { } type testManagedStateMachine struct { - first uint64 - last uint64 - synced bool - nalookup bool - corruptIndex bool + first uint64 + last uint64 + synced bool + nalookup bool + corruptIndex bool + concurrent bool + onDisk bool + smType pb.StateMachineType + prepareInvoked bool } func (t *testManagedStateMachine) Open() (uint64, error) { return 10, nil } @@ -2042,8 +2046,11 @@ func (t *testManagedStateMachine) Sync() error { t.synced = true return nil } -func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil } -func (t *testManagedStateMachine) Prepare() (interface{}, error) { return nil, nil } +func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil } +func (t *testManagedStateMachine) Prepare() (interface{}, error) { + t.prepareInvoked = true + return nil, nil +} func (t *testManagedStateMachine) Save(SSMeta, io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) { return false, nil @@ -2056,9 +2063,9 @@ func (t *testManagedStateMachine) Offloaded() bool { return func (t *testManagedStateMachine) Loaded() {} func (t *testManagedStateMachine) Close() {} func (t *testManagedStateMachine) DestroyedC() <-chan struct{} { return nil } -func (t *testManagedStateMachine) Concurrent() bool { return false } -func (t *testManagedStateMachine) OnDisk() bool { return false } -func (t *testManagedStateMachine) Type() pb.StateMachineType { return 0 } +func (t *testManagedStateMachine) Concurrent() bool { return t.concurrent } +func (t *testManagedStateMachine) OnDisk() bool { return t.onDisk } +func (t *testManagedStateMachine) Type() pb.StateMachineType { return t.smType } func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error) { if !t.corruptIndex { t.first = ents[0].Index @@ -2068,7 +2075,6 @@ func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, er ents[idx].Index = ents[idx].Index + 1 } } - return ents, nil } @@ -2411,3 +2417,98 @@ func TestSetLastApplied(t *testing.T) { }() } } + +func TestSavingDummySnapshot(t *testing.T) { + tests := []struct { + smType pb.StateMachineType + streaming bool + export bool + result bool + }{ + {pb.RegularStateMachine, true, false, false}, + {pb.RegularStateMachine, false, true, false}, + {pb.RegularStateMachine, false, false, false}, + {pb.ConcurrentStateMachine, true, false, false}, + {pb.ConcurrentStateMachine, false, true, false}, + {pb.ConcurrentStateMachine, false, false, false}, + {pb.OnDiskStateMachine, true, false, false}, + {pb.OnDiskStateMachine, false, true, false}, + {pb.OnDiskStateMachine, false, false, true}, + } + for idx, tt := range tests { + sm := StateMachine{ + onDiskSM: tt.smType == pb.OnDiskStateMachine, + } + var rt SSReqType + if tt.export && tt.streaming { + panic("bad test input") + } + if tt.export { + rt = Exported + } else if tt.streaming { + rt = Streaming + } + if r := sm.savingDummySnapshot(SSRequest{Type: rt}); r != tt.result { + t.Errorf("%d, got %t, want %t", idx, r, tt.result) + } + } +} + +func TestPrepareIsNotCalledWhenSavingDummySnapshot(t *testing.T) { + tests := []struct { + onDiskSM bool + streaming bool + export bool + prepareInvoked bool + }{ + {true, false, false, false}, + {true, true, false, true}, + {true, false, true, true}, + {false, false, false, true}, + {false, false, true, true}, + } + + for idx, tt := range tests { + msm := &testManagedStateMachine{ + concurrent: true, + onDisk: tt.onDiskSM, + smType: pb.ConcurrentStateMachine, + } + if tt.onDiskSM { + msm.smType = pb.OnDiskStateMachine + } + m := &membership{ + members: &pb.Membership{ + Addresses: map[uint64]string{1: "localhost:1234"}, + }, + } + sm := StateMachine{ + index: 100, + onDiskSM: tt.onDiskSM, + sm: msm, + members: m, + node: &testNodeProxy{}, + sessions: NewSessionManager(), + } + var rt SSReqType + if tt.export && tt.streaming { + panic("bad test input") + } + if tt.export { + rt = Exported + } else if tt.streaming { + rt = Streaming + } + meta, err := sm.prepare(SSRequest{Type: rt}) + if err != nil { + t.Errorf("prepare failed, %v", err) + } + if meta.Index != 100 { + t.Errorf("failed to get the snapshot metadata") + } + if msm.prepareInvoked != tt.prepareInvoked { + t.Errorf("%d, prepareInvoked got %t, want %t", + idx, msm.prepareInvoked, tt.prepareInvoked) + } + } +}