Skip to content

Commit

Permalink
Merge branch 'inmem_mem_usage_fix' into release-3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Oct 24, 2019
2 parents 988668b + 6ea71d7 commit c35c20c
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 22 deletions.
3 changes: 2 additions & 1 deletion internal/logdb/kv/pebble/kv_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"bytes"
"fmt"

"github.com/cockroachdb/pebble"

"github.com/lni/dragonboat/v3/internal/logdb/kv"
"github.com/lni/dragonboat/v3/raftio"
"github.com/petermattis/pebble"
)

type pebbleWriteBatch struct {
Expand Down
28 changes: 24 additions & 4 deletions internal/raft/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
// inMemory is a two stage in memory log storage struct to keep log entries
// that will be used by the raft protocol in immediate future.
type inMemory struct {
shrunk bool
snapshot *pb.Snapshot
entries []pb.Entry
markerIndex uint64
Expand All @@ -36,6 +37,9 @@ type inMemory struct {
}

func newInMemory(lastIndex uint64, rl *server.RateLimiter) inMemory {
if minEntrySliceSize >= entrySliceSize {
panic("minEntrySliceSize >= entrySliceSize")
}
return inMemory{
markerIndex: lastIndex + 1,
savedTo: lastIndex,
Expand Down Expand Up @@ -135,6 +139,7 @@ func (im *inMemory) appliedLogTo(index uint64) {
}
newMarkerIndex := index
applied := im.entries[:newMarkerIndex-im.markerIndex]
im.shrunk = true
im.entries = im.entries[newMarkerIndex-im.markerIndex:]
im.markerIndex = newMarkerIndex
im.resizeEntrySlice()
Expand All @@ -152,11 +157,23 @@ func (im *inMemory) savedSnapshotTo(index uint64) {
}
}

func (im *inMemory) resize() {
old := im.entries
im.shrunk = false
sz := max(entrySliceSize, uint64(len(old)*2))
im.entries = make([]pb.Entry, 0, sz)
im.entries = append(im.entries, old...)
}

func (im *inMemory) tryResize() {
if im.shrunk {
im.resize()
}
}

func (im *inMemory) resizeEntrySlice() {
if cap(im.entries)-len(im.entries) < int(minEntrySliceSize) {
old := im.entries
im.entries = make([]pb.Entry, 0, entrySliceSize)
im.entries = append(im.entries, old...)
if im.shrunk && cap(im.entries)-len(im.entries) < int(minEntrySliceSize) {
im.resize()
}
}

Expand All @@ -172,6 +189,7 @@ func (im *inMemory) merge(ents []pb.Entry) {
} else if firstNewIndex <= im.markerIndex {
im.markerIndex = firstNewIndex
// ents might come from entryQueue, copy it to its own storage
im.shrunk = false
im.entries = newEntrySlice(ents)
im.savedTo = firstNewIndex - 1
if im.rateLimited() {
Expand All @@ -180,6 +198,7 @@ func (im *inMemory) merge(ents []pb.Entry) {
} else {
existing := im.getEntries(im.markerIndex, firstNewIndex)
checkEntriesToAppend(existing, ents)
im.shrunk = false
im.entries = make([]pb.Entry, 0, len(existing)+len(ents))
im.entries = append(im.entries, existing...)
im.entries = append(im.entries, ents...)
Expand All @@ -195,6 +214,7 @@ func (im *inMemory) merge(ents []pb.Entry) {
func (im *inMemory) restore(ss pb.Snapshot) {
im.snapshot = &ss
im.markerIndex = ss.Index + 1
im.shrunk = false
im.entries = nil
im.savedTo = ss.Index
if im.rateLimited() {
Expand Down
105 changes: 89 additions & 16 deletions internal/raft/inmemory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,11 @@ func TestInMemRestore(t *testing.T) {
},
}
ss := pb.Snapshot{Index: 100}
im.shrunk = true
im.restore(ss)
if im.shrunk {
t.Errorf("shrunk flag not cleared")
}
if len(im.entries) != 0 || im.markerIndex != 101 || im.snapshot == nil {
t.Errorf("unexpected im state")
}
Expand All @@ -254,20 +258,25 @@ func TestInMemSaveSnapshotTo(t *testing.T) {
}
}

func TestInMemMergeFullAppend(t *testing.T) {
func testInMemMergeFullAppend(t *testing.T, shrunk bool) {
im := inMemory{
markerIndex: 5,
entries: []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
},
}
im.resize()
im.entries = append(im.entries, []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
}...)
im.shrunk = shrunk
ents := []pb.Entry{
{Index: 8, Term: 8},
{Index: 9, Term: 9},
}
im.merge(ents)
if im.shrunk != shrunk {
t.Errorf("shrunk flag unexpectedly changed, %t:%t", im.shrunk, shrunk)
}
if len(im.entries) != 5 || im.markerIndex != 5 {
t.Errorf("not fully appended")
}
Expand All @@ -276,20 +285,30 @@ func TestInMemMergeFullAppend(t *testing.T) {
}
}

func TestInMemMergeFullAppend(t *testing.T) {
testInMemMergeFullAppend(t, false)
testInMemMergeFullAppend(t, true)
}

func TestInMemMergeReplace(t *testing.T) {
im := inMemory{
markerIndex: 5,
entries: []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
},
}
im.resize()
im.entries = append(im.entries, []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
}...)
im.shrunk = true
ents := []pb.Entry{
{Index: 2, Term: 2},
{Index: 3, Term: 3},
}
im.merge(ents)
if im.shrunk {
t.Errorf("shrunk flag unexpectedly not cleared")
}
if len(im.entries) != 2 || im.markerIndex != 2 {
t.Errorf("not fully appended")
}
Expand Down Expand Up @@ -323,17 +342,22 @@ func TestInMemMergeWithHoleCausePanic(t *testing.T) {
func TestInMemMerge(t *testing.T) {
im := inMemory{
markerIndex: 5,
entries: []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
},
}
im.resize()
im.entries = append(im.entries, []pb.Entry{
{Index: 5, Term: 5},
{Index: 6, Term: 6},
{Index: 7, Term: 7},
}...)
im.shrunk = true
ents := []pb.Entry{
{Index: 6, Term: 7},
{Index: 7, Term: 10},
}
im.merge(ents)
if im.shrunk {
t.Errorf("shrunk flag unexpectedly not cleared")
}
if len(im.entries) != 3 || im.markerIndex != 5 {
t.Errorf("not fully appended")
}
Expand Down Expand Up @@ -511,7 +535,14 @@ func TestAppliedLogTo(t *testing.T) {
{10, 1, 10},
}
for idx, tt := range tests {
len1 := len(im.entries)
im.appliedLogTo(tt.appliedTo)
len2 := len(im.entries)
if len2 < len1 {
if !im.shrunk {
t.Errorf("shrunk flag not set")
}
}
if len(im.entries) != tt.length {
t.Errorf("%d, unexpected entry slice len %d, want %d",
idx, len(im.entries), tt.length)
Expand Down Expand Up @@ -624,3 +655,45 @@ func TestRateLimitCanBeUpdatedAfterCutAndMergingEntries(t *testing.T) {
t.Errorf("log size %d, want %d", im.rl.Get(), expSz)
}
}

func TestResize(t *testing.T) {
im := inMemory{
markerIndex: 10,
entries: []pb.Entry{
{Index: 10, Term: 1},
{Index: 11, Term: 1},
},
shrunk: true,
}
im.resize()
if uint64(cap(im.entries)) != entrySliceSize {
t.Errorf("not resized")
}
if len(im.entries) != 2 {
t.Errorf("unexpected len %d", len(im.entries))
}
if im.shrunk {
t.Errorf("shrunk flag not clearaed")
}
}

func TestTryResize(t *testing.T) {
im := inMemory{
markerIndex: 10,
entries: []pb.Entry{
{Index: 10, Term: 1},
{Index: 11, Term: 1},
},
}
initcap := cap(im.entries)
initlen := len(im.entries)
im.tryResize()
if cap(im.entries) != initcap || len(im.entries) != initlen {
t.Errorf("cap/len unexpectedly changed")
}
im.shrunk = true
im.tryResize()
if cap(im.entries) == initcap {
t.Errorf("cap/len unexpectedly not changed")
}
}
19 changes: 18 additions & 1 deletion internal/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ const (
)

var (
emptyState = pb.State{}
emptyState = pb.State{}
maxEntrySize = settings.Soft.MaxEntrySize
inMemGcTimeout = settings.Soft.InMemGCTimeout
)

// State is the state of a raft node defined in the raft paper, possible states
Expand Down Expand Up @@ -207,6 +209,7 @@ type raft struct {
readyToRead []pb.ReadyToRead
droppedEntries []pb.Entry
droppedReadIndexes []pb.SystemCtx
quiesce bool
checkQuorum bool
tickCount uint64
electionTick uint64
Expand Down Expand Up @@ -462,8 +465,18 @@ func (r *raft) timeForRateLimitCheck() bool {
return r.tickCount%r.electionTimeout == 0
}

func (r *raft) timeForInMemGC() bool {
return r.tickCount%inMemGcTimeout == 0
}

func (r *raft) tick() {
r.quiesce = false
r.tickCount++
// this is to work around the language limitation described in
// https://github.com/golang/go/issues/9618
if r.timeForInMemGC() {
r.log.inmem.tryResize()
}
if r.state == leader {
r.leaderTick()
} else {
Expand Down Expand Up @@ -531,6 +544,10 @@ func (r *raft) leaderTick() {
}

func (r *raft) quiescedTick() {
if !r.quiesce {
r.quiesce = true
r.log.inmem.resize()
}
r.electionTick++
}

Expand Down
32 changes: 32 additions & 0 deletions internal/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,3 +2279,35 @@ func TestRateLimitMessageIsSentByNonLeader(t *testing.T) {
testRateLimitMessageIsSentByNonLeader(2, true, t)
testRateLimitMessageIsSentByNonLeader(NoLeader, false, t)
}

func TestInMemoryEntriesSliceCanBeResized(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 5, 1, NewTestLogDB())
oldcap := cap(r.log.inmem.entries)
if oldcap != 0 {
t.Errorf("unexpected cap val: %d", oldcap)
}
r.log.inmem.shrunk = true
for i := uint64(0); i < inMemGcTimeout; i++ {
r.tick()
}
if uint64(cap(r.log.inmem.entries)) != entrySliceSize {
t.Errorf("not resized")
}
}

func TestFirstQuiescedTickResizesInMemoryEntriesSlice(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 5, 1, NewTestLogDB())
oldcap := cap(r.log.inmem.entries)
if oldcap != 0 {
t.Errorf("unexpected cap val: %d", oldcap)
}
r.quiescedTick()
if uint64(cap(r.log.inmem.entries)) != entrySliceSize {
t.Errorf("not resized, cap: %d", oldcap)
}
r.log.inmem.entries = make([]pb.Entry, 0, 0)
r.quiescedTick()
if cap(r.log.inmem.entries) != 0 {
t.Errorf("unexpectedly resized again")
}
}
4 changes: 4 additions & 0 deletions internal/settings/soft.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type soft struct {
// ExpectedMaxInMemLogSize is the minimum MaxInMemLogSize value expected
// in a raft config.
ExpectedMaxInMemLogSize uint64
// InMemGCTimeout defines how often dragonboat collects partial object.
// It is defined in terms of number of ticks.
InMemGCTimeout uint64

//
// Multiraft
Expand Down Expand Up @@ -229,6 +232,7 @@ func getDefaultSoftSettings() soft {
LocalRaftRequestTimeoutMs: 10000,
GetConnectedTimeoutSecond: 5,
MaxEntrySize: 2 * MaxProposalPayloadSize,
InMemGCTimeout: 100,
InMemEntrySliceSize: 512,
MinEntrySliceFreeSize: 96,
ExpectedMaxInMemLogSize: 2 * (MaxProposalPayloadSize + EntryNonCmdFieldsSize),
Expand Down
17 changes: 17 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,17 @@ func TestOnDiskStateMachineCanTakeDummySnapshot(t *testing.T) {
t.Fatalf("dummy snapshot is not recorded as dummy")
}
break
} else if i%100 == 0 {
// this is an ugly hack to workaround RocksDB's incorrect fsync
// implementation on macos.
// fcntl(fd, F_FULLFSYNC) is required for a proper fsync on macos,
// sadly rocksdb is not doing that. this means we can make proposals
// very fast as they are not actually fsynced on macos but making
// snapshots are going to be much much slower as dragonboat properly
// fsyncs its snapshot data. we can end up completing all required
// proposals even before completing the first ongoing snapshotting
// operation.
time.Sleep(200 * time.Millisecond)
}
}
if !snapshotted {
Expand Down Expand Up @@ -1490,6 +1501,9 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) {
if len(snapshots) >= 3 {
snapshotted = true
break
} else if i%50 == 0 {
// see comments in testOnDiskStateMachineCanTakeDummySnapshot
time.Sleep(100 * time.Millisecond)
}
}
if !snapshotted {
Expand Down Expand Up @@ -1549,6 +1563,9 @@ func TestOnDiskSMCanStreamSnapshot(t *testing.T) {
}
}
break
} else if i%50 == 0 {
// see comments in testOnDiskStateMachineCanTakeDummySnapshot
time.Sleep(100 * time.Millisecond)
}
}
if !snapshotted {
Expand Down

0 comments on commit c35c20c

Please sign in to comment.