Skip to content

Commit

Permalink
Merge pull request #313 from Comrade88/release-3.3
Browse files Browse the repository at this point in the history
backport "Avoid having the same state machine to be closed/destroyed twice." from master
  • Loading branch information
lni committed Jun 14, 2023
2 parents 014329c + 1ac542d commit 630317e
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 84 deletions.
53 changes: 39 additions & 14 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,9 @@ func (w *closeWorker) workerMain() {
case <-w.stopper.ShouldStop():
return
case req := <-w.requestC:
w.handle(req)
if err := w.handle(req); err != nil {
panicNow(err)
}
w.completed()
}
}
Expand All @@ -773,24 +775,29 @@ func (w *closeWorker) completed() {
w.completedC <- struct{}{}
}

func (w *closeWorker) handle(req closeReq) {
req.node.destroy()
func (w *closeWorker) handle(req closeReq) error {
if req.node.destroyed() {
return nil
}
return req.node.destroy()
}

type closeWorkerPool struct {
workers []*closeWorker
ready chan closeReq
busy map[uint64]struct{}
pending []*node
busy map[uint64]uint64
processing map[uint64]struct{}
workerStopper *syncutil.Stopper
poolStopper *syncutil.Stopper
pending []*node
}

func newCloseWorkerPool(closeWorkerCount uint64) *closeWorkerPool {
w := &closeWorkerPool{
workers: make([]*closeWorker, closeWorkerCount),
ready: make(chan closeReq, 1),
busy: make(map[uint64]struct{}, closeWorkerCount),
busy: make(map[uint64]uint64, closeWorkerCount),
processing: make(map[uint64]struct{}, closeWorkerCount),
pending: make([]*node, 0),
workerStopper: syncutil.NewStopper(),
poolStopper: syncutil.NewStopper(),
Expand Down Expand Up @@ -900,14 +907,20 @@ func (p *closeWorkerPool) isIdle() bool {
}

func (p *closeWorkerPool) completed(workerID uint64) {
if _, ok := p.busy[workerID]; !ok {
plog.Panicf("close worker %d is not in busy state")
clusterID, ok := p.busy[workerID]
if !ok {
plog.Panicf("close worker %d is not in busy state", workerID)
}
if _, ok := p.processing[clusterID]; !ok {
plog.Panicf("cluster %d is not being processed", clusterID)
}
delete(p.processing, clusterID)
delete(p.busy, workerID)
}

func (p *closeWorkerPool) setBusy(workerID uint64) {
p.busy[workerID] = struct{}{}
func (p *closeWorkerPool) setBusy(workerID uint64, clusterID uint64) {
p.processing[clusterID] = struct{}{}
p.busy[workerID] = clusterID
}

func (p *closeWorkerPool) getWorker() *closeWorker {
Expand All @@ -927,21 +940,33 @@ func (p *closeWorkerPool) schedule() {
}
}

func (p *closeWorkerPool) canSchedule(n *node) bool {
_, ok := p.processing[n.clusterID]
return !ok
}

func (p *closeWorkerPool) scheduleWorker() bool {
w := p.getWorker()
if w == nil {
return false
}
if len(p.pending) > 0 {
p.scheduleReq(p.pending[0], w)

for i := 0; i < len(p.pending); i++ {
node := p.pending[0]
p.removeFromPending(0)
return true
if p.canSchedule(node) {
p.scheduleReq(node, w)
return true
} else {
p.pending = append(p.pending, node)
}
}

return false
}

func (p *closeWorkerPool) scheduleReq(n *node, w *closeWorker) {
p.setBusy(w.workerID)
p.setBusy(w.workerID, n.clusterID)
select {
case w.requestC <- closeReq{node: n}:
default:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module github.com/lni/dragonboat/v3

require (
github.com/VictoriaMetrics/metrics v1.6.2
github.com/cockroachdb/errors v1.7.5 // indirect
github.com/cockroachdb/pebble v0.0.0-20210331181633-27fc006b8bfb
github.com/golang/protobuf v1.4.3
github.com/golang/snappy v0.0.3-0.20201103224600-674baa8c7fc3
Expand Down
17 changes: 11 additions & 6 deletions internal/logdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"encoding/binary"
"math"

"github.com/cockroachdb/errors"

"github.com/lni/dragonboat/v3/config"
"github.com/lni/dragonboat/v3/internal/logdb/kv"
"github.com/lni/dragonboat/v3/internal/settings"
Expand Down Expand Up @@ -109,10 +111,8 @@ func (r *db) binaryFormat() uint32 {
return r.entries.binaryFormat()
}

func (r *db) close() {
if err := r.kvs.Close(); err != nil {
panic(err)
}
func (r *db) close() error {
return r.kvs.Close()
}

func (r *db) getWriteBatch(ctx IContext) kv.IWriteBatch {
Expand Down Expand Up @@ -493,8 +493,13 @@ func (r *db) iterateEntries(ents []pb.Entry,
return ents, size, nil
}
if err != nil {
panic(err)
err = errors.Wrapf(err,
"%s failed to get max index", dn(clusterID, nodeID))
return nil, 0, err
}
return r.entries.iterate(ents, maxIndex, size,
entries, sz, err := r.entries.iterate(ents, maxIndex, size,
clusterID, nodeID, low, high, maxSize)
err = errors.Wrapf(err, "%s failed to iterate entries, %d, %d, %d, %d",
dn(clusterID, nodeID), low, high, maxSize, maxIndex)
return entries, sz, err
}
2 changes: 1 addition & 1 deletion internal/logdb/kv/pebble/kv_pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (r *KV) Close() error {
func iteratorIsValid(iter *pebble.Iterator) bool {
v := iter.Valid()
if err := iter.Error(); err != nil {
panic(err)
plog.Panicf("%+v", err)
}
return v
}
Expand Down
24 changes: 17 additions & 7 deletions internal/logdb/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,17 @@ func (s *ShardedDB) ImportSnapshot(ss pb.Snapshot, nodeID uint64) error {
}

// Close closes the ShardedDB instance.
func (s *ShardedDB) Close() {
func (s *ShardedDB) Close() error {
s.stopper.Stop()
for _, v := range s.shards {
v.close()
if err := v.close(); err != nil {
return err
}
}
for _, v := range s.ctxs {
v.Destroy()
}
return nil
}

func (s *ShardedDB) getParititionID(updates []pb.Update) uint64 {
Expand All @@ -308,7 +311,9 @@ func (s *ShardedDB) compactionWorkerMain() {
case <-s.stopper.ShouldStop():
return
case <-s.compactionCh:
s.compact()
if err := s.compact(); err != nil {
panicNow(err)
}
}
select {
case <-s.stopper.ShouldStop():
Expand All @@ -333,25 +338,30 @@ func (s *ShardedDB) addCompaction(clusterID uint64,
return done
}

func (s *ShardedDB) compact() {
func (s *ShardedDB) compact() error {
for {
if t, hasTask := s.compactions.getTask(); hasTask {
idx := s.partitioner.GetPartitionID(t.clusterID)
shard := s.shards[idx]
if err := shard.compact(t.clusterID, t.nodeID, t.index); err != nil {
panic(err)
return err
}
atomic.AddUint64(&s.completedCompactions, 1)
close(t.done)
plog.Infof("%s completed LogDB compaction up to index %d",
dn(t.clusterID, t.nodeID), t.index)
select {
case <-s.stopper.ShouldStop():
return
return nil
default:
}
} else {
return
return nil
}
}
}

func panicNow(err error) {
plog.Panicf("%+v", err)
panic(err)
}
8 changes: 5 additions & 3 deletions internal/logdb/tee/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ func (t *LogDB) Name() string {
}

// Close ...
func (t *LogDB) Close() {
func (t *LogDB) Close() error {
t.stopper.Stop()
t.odb.Close()
t.ndb.Close()
if err := t.odb.Close(); err != nil {
return nil
}
return t.ndb.Close()
}

// BinaryFormat ...
Expand Down
7 changes: 4 additions & 3 deletions internal/rsm/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type IManagedStateMachine interface {
Stream(interface{}, io.Writer) error
Offloaded() bool
Loaded()
Close()
Close() error
DestroyedC() <-chan struct{}
Concurrent() bool
OnDisk() bool
Expand Down Expand Up @@ -146,11 +146,12 @@ func (ds *NativeSM) Loaded() {
}

// Close closes the underlying user state machine and set the destroyed flag.
func (ds *NativeSM) Close() {
func (ds *NativeSM) Close() error {
if err := ds.sm.Close(); err != nil {
panic(err)
return err
}
ds.SetDestroyed()
return nil
}

// DestroyedC returns a chan struct{} used to indicate whether the SM has been
Expand Down
9 changes: 3 additions & 6 deletions internal/rsm/rwv.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func validateBlock(block []byte, h hash.Hash) bool {
payload := block[:uint64(len(block))-checksumSize]
crc := block[uint64(len(block))-checksumSize:]
h.Reset()
_, err := h.Write(payload)
if err != nil {
if _, err := h.Write(payload); err != nil {
panic(err)
}
return bytes.Equal(crc, h.Sum(nil))
Expand Down Expand Up @@ -225,8 +224,7 @@ func (br *blockReader) Read(data []byte) (int, error) {
read := len(br.block)
copy(data, br.block)
for read < want {
_, err := br.readBlock()
if err != nil {
if _, err := br.readBlock(); err != nil {
return read, err
}
toRead := want - read
Expand All @@ -247,8 +245,7 @@ func (br *blockReader) readBlock() (int, error) {
return n, err
}
br.block = br.block[:n]
h := mustGetChecksum(br.t)
if !validateBlock(br.block, h) {
if !validateBlock(br.block, mustGetChecksum(br.t)) {
panic("corrupted block")
}
br.block = br.block[:uint64(len(br.block))-checksumSize]
Expand Down
4 changes: 2 additions & 2 deletions internal/rsm/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@ func (s *StateMachine) TaskChanBusy() bool {
}

// Close closes the state machine.
func (s *StateMachine) Close() {
s.sm.Close()
func (s *StateMachine) Close() error {
return s.sm.Close()
}

// DestroyedC return a chan struct{} used to indicate whether the SM has been
Expand Down
2 changes: 1 addition & 1 deletion internal/rsm/statemachine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,7 +2061,7 @@ func (t *testManagedStateMachine) Recover(io.Reader, []sm.SnapshotFile) error {
func (t *testManagedStateMachine) Stream(interface{}, io.Writer) error { return nil }
func (t *testManagedStateMachine) Offloaded() bool { return false }
func (t *testManagedStateMachine) Loaded() {}
func (t *testManagedStateMachine) Close() {}
func (t *testManagedStateMachine) Close() error { return nil }
func (t *testManagedStateMachine) DestroyedC() <-chan struct{} { return nil }
func (t *testManagedStateMachine) Concurrent() bool { return t.concurrent }
func (t *testManagedStateMachine) OnDisk() bool { return t.onDisk }
Expand Down
13 changes: 9 additions & 4 deletions internal/transport/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package transport

import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/lni/goutils/logutil"

"github.com/cockroachdb/errors"
"github.com/lni/dragonboat/v3/internal/fileutil"
"github.com/lni/dragonboat/v3/internal/rsm"
"github.com/lni/dragonboat/v3/internal/server"
Expand Down Expand Up @@ -260,7 +260,7 @@ func (c *Chunk) addLocked(chunk pb.Chunk) bool {
}
removed, err := c.nodeRemoved(chunk)
if err != nil {
panic(err)
panicNow(err)
}
if removed {
c.removeTempDir(chunk)
Expand All @@ -274,9 +274,9 @@ func (c *Chunk) addLocked(chunk pb.Chunk) bool {
}
}
if err := c.save(chunk); err != nil {
plog.Errorf("failed to save a chunk %s, %v", key, err)
err = errors.Wrapf(err, "failed to save chunk %s", key)
c.removeTempDir(chunk)
panic(err)
panicNow(err)
}
if chunk.IsLastChunk() {
plog.Debugf("last chunk %s received", key)
Expand Down Expand Up @@ -409,3 +409,8 @@ func (c *Chunk) toMessage(chunk pb.Chunk,
func (c *Chunk) ssid(chunk pb.Chunk) string {
return logutil.DescribeSS(chunk.ClusterId, chunk.NodeId, chunk.Index)
}

func panicNow(err error) {
plog.Panicf("%+v", err)
panic(err)
}
Loading

0 comments on commit 630317e

Please sign in to comment.