diff --git a/common/ledger/util/leveldbhelper/leveldb_helper.go b/common/ledger/util/leveldbhelper/leveldb_helper.go index 67a667278f0..9820f366af0 100644 --- a/common/ledger/util/leveldbhelper/leveldb_helper.go +++ b/common/ledger/util/leveldbhelper/leveldb_helper.go @@ -107,14 +107,16 @@ func (dbInst *DB) Get(key []byte) ([]byte, error) { dbInst.mutex.RLock() defer dbInst.mutex.RUnlock() value, err := dbInst.db.Get(key, dbInst.readOpts) - if err == leveldb.ErrNotFound { - value = nil - err = nil + if errors.Is(err, leveldb.ErrNotFound) { + return nil, nil } if err != nil { logger.Errorf("Error retrieving leveldb key [%#v]: %s", key, err) return nil, errors.Wrapf(err, "error retrieving leveldb key [%#v]", key) } + if value == nil { + value = []byte{} + } return value, nil } diff --git a/go.mod b/go.mod index 3db5200ef7a..20bf2af7242 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.20.0-alpha.6 github.com/stretchr/testify v1.9.0 // includes ErrorContains - github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 + github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d github.com/tedsuo/ifrit v0.0.0-20230516164442-7862c310ad26 go.etcd.io/etcd/client/pkg/v3 v3.5.14 go.etcd.io/etcd/raft/v3 v3.5.14 diff --git a/go.sum b/go.sum index 6ae890a95c7..f761bda93ea 100644 --- a/go.sum +++ b/go.sum @@ -726,6 +726,7 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fsouza/go-dockerclient v1.12.0 h1:S2f2crEUbBNCFiF06kR/GvioEB8EMsb3Td/bpawD+aU= @@ -798,7 +799,6 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -967,7 +967,6 @@ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= @@ -1075,6 +1074,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -1084,8 +1084,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/sykesm/zap-logfmt v0.0.4 h1:U2WzRvmIWG1wDLCFY3sz8UeEmsdHQjHFNlIdmroVFaI= github.com/sykesm/zap-logfmt v0.0.4/go.mod h1:AuBd9xQjAe3URrWT1BBDk2v2onAZHkZkWRMiYZXiZWA= -github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= -github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= +github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= +github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= github.com/tedsuo/ifrit v0.0.0-20230330192023-5cba443a66c4/go.mod h1:eyZnKCc955uh98WQvzOm0dgAeLnf2O0Rz0LPoC5ze+0= github.com/tedsuo/ifrit v0.0.0-20230516164442-7862c310ad26 h1:mWCRvpoEMVlslxEvvptKgIUb35va9yj9Oq5wGw/er5I= github.com/tedsuo/ifrit v0.0.0-20230516164442-7862c310ad26/go.mod h1:0uD3VMXkZ7Bw0ojGCwDzebBBzPBXtzEZeXai+56BLX4= @@ -1226,7 +1226,6 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= -golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1340,10 +1339,8 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201101102859-da207088b7d1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 823be93f93c..d5ecf721bd9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -32,8 +32,7 @@ func newErrBatchCorrupted(reason string) error { const ( batchHeaderLen = 8 + 4 - batchGrowRec = 3000 - batchBufioSize = 16 + batchGrowLimit = 3000 ) // BatchReplay wraps basic batch operations. @@ -59,10 +58,6 @@ func (index batchIndex) v(data []byte) []byte { return nil } -func (index batchIndex) kv(data []byte) (key, value []byte) { - return index.k(data), index.v(data) -} - // Batch is a write batch. type Batch struct { data []byte @@ -70,14 +65,24 @@ type Batch struct { // internalLen is sums of key/value pair length plus 8-bytes internal key. internalLen int + + // growLimit is the threshold in order to slow down the memory allocation + // for batch when the number of accumulated entries exceeds value. + // + // batchGrowLimit is used as the default threshold if it's not configured. + growLimit int } func (b *Batch) grow(n int) { o := len(b.data) if cap(b.data)-o < n { + limit := batchGrowLimit + if b.growLimit > 0 { + limit = b.growLimit + } div := 1 - if len(b.index) > batchGrowRec { - div = len(b.index) / batchGrowRec + if len(b.index) > limit { + div = len(b.index) / limit } ndata := make([]byte, o, o+n+o/div) copy(ndata, b.data) @@ -223,17 +228,6 @@ func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error { return nil } -func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error { - var ik []byte - for i, index := range b.index { - ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType) - if err := mdb.Delete(ik); err != nil { - return err - } - } - return nil -} - func newBatch() interface{} { return &Batch{} } @@ -243,6 +237,42 @@ func MakeBatch(n int) *Batch { return &Batch{data: make([]byte, 0, n)} } +// BatchConfig contains the config options for batch. +type BatchConfig struct { + // InitialCapacity is the batch initial capacity to preallocate. + // + // The default value is 0. + InitialCapacity int + + // GrowLimit is the limit (in terms of entry) of how much buffer + // can grow each cycle. + // + // Initially the buffer will grow twice its current size until + // GrowLimit threshold is reached, after that the buffer will grow + // up to GrowLimit each cycle. This buffer grow size in bytes is + // loosely calculated from average entry size multiplied by GrowLimit. + // + // Generally, the memory allocation step is larger if this value + // is configured large, vice versa. + // + // The default value is 3000. + GrowLimit int +} + +// MakeBatchWithConfig initializes a batch object with the given configs. +func MakeBatchWithConfig(config *BatchConfig) *Batch { + var batch = new(Batch) + if config != nil { + if config.InitialCapacity > 0 { + batch.data = make([]byte, 0, config.InitialCapacity) + } + if config.GrowLimit > 0 { + batch.growLimit = config.GrowLimit + } + } + return batch +} + func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { var index batchIndex for i, o := 0, 0; o < len(data); i++ { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go index c36ad323597..8e4f397ce3b 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -8,6 +8,7 @@ package cache import ( + "sort" "sync" "sync/atomic" "unsafe" @@ -32,18 +33,9 @@ type Cacher interface { // Evict evicts the 'cache node'. Evict(n *Node) - - // EvictNS evicts 'cache node' with the given namespace. - EvictNS(ns uint64) - - // EvictAll evicts all 'cache node'. - EvictAll() - - // Close closes the 'cache tree' - Close() error } -// Value is a 'cacheable object'. It may implements util.Releaser, if +// Value is a 'cache-able object'. It may implements util.Releaser, if // so the the Release method will be called once object is released. type Value interface{} @@ -69,32 +61,76 @@ const ( mOverflowGrowThreshold = 1 << 7 ) +const ( + bucketUninitialized = iota + bucketInitialized + bucketFrozen +) + +type mNodes []*Node + +func (x mNodes) Len() int { return len(x) } +func (x mNodes) Less(i, j int) bool { + a, b := x[i].ns, x[j].ns + if a == b { + return x[i].key < x[j].key + } + return a < b +} +func (x mNodes) Swap(i, j int) { x[i], x[j] = x[j], x[i] } + +func (x mNodes) sort() { sort.Sort(x) } + +func (x mNodes) search(ns, key uint64) int { + return sort.Search(len(x), func(i int) bool { + a := x[i].ns + if a == ns { + return x[i].key >= key + } + return a > ns + }) +} + type mBucket struct { - mu sync.Mutex - node []*Node - frozen bool + mu sync.Mutex + nodes mNodes + state int8 } -func (b *mBucket) freeze() []*Node { +func (b *mBucket) freeze() mNodes { b.mu.Lock() defer b.mu.Unlock() - if !b.frozen { - b.frozen = true + if b.state == bucketInitialized { + b.state = bucketFrozen + } else if b.state == bucketUninitialized { + panic("BUG: freeze uninitialized bucket") } - return b.node + return b.nodes } -func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) { +func (b *mBucket) frozen() bool { + if b.state == bucketFrozen { + return true + } + if b.state == bucketUninitialized { + panic("BUG: accessing uninitialized bucket") + } + return false +} + +func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly bool) (done, created bool, n *Node) { b.mu.Lock() - if b.frozen { + if b.frozen() { b.mu.Unlock() return } - // Scan the node. - for _, n := range b.node { - if n.hash == hash && n.ns == ns && n.key == key { + // Find the node. + i := b.nodes.search(ns, key) + if i < len(b.nodes) { + n = b.nodes[i] + if n.ns == ns && n.key == key { atomic.AddInt32(&n.ref, 1) b.mu.Unlock() return true, false, n @@ -102,7 +138,7 @@ func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset boo } // Get only. - if noset { + if getOnly { b.mu.Unlock() return true, false, nil } @@ -116,99 +152,106 @@ func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset boo ref: 1, } // Add node to bucket. - b.node = append(b.node, n) - bLen := len(b.node) + if i == len(b.nodes) { + b.nodes = append(b.nodes, n) + } else { + b.nodes = append(b.nodes[:i+1], b.nodes[i:]...) + b.nodes[i] = n + } + bLen := len(b.nodes) b.mu.Unlock() // Update counter. - grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold + grow := atomic.AddInt64(&r.statNodes, 1) >= h.growThreshold if bLen > mOverflowThreshold { grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold } // Grow. - if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { + if grow && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) { nhLen := len(h.buckets) << 1 - nh := &mNode{ - buckets: make([]unsafe.Pointer, nhLen), + nh := &mHead{ + buckets: make([]mBucket, nhLen), mask: uint32(nhLen) - 1, - pred: unsafe.Pointer(h), - growThreshold: int32(nhLen * mOverflowThreshold), - shrinkThreshold: int32(nhLen >> 1), + predecessor: unsafe.Pointer(h), + growThreshold: int64(nhLen * mOverflowThreshold), + shrinkThreshold: int64(nhLen >> 1), } ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) if !ok { panic("BUG: failed swapping head") } + atomic.AddInt32(&r.statGrow, 1) go nh.initBuckets() } return true, true, n } -func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) { +func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, deleted bool) { b.mu.Lock() - if b.frozen { + if b.frozen() { b.mu.Unlock() return } - // Scan the node. - var ( - n *Node - bLen int - ) - for i := range b.node { - n = b.node[i] - if n.ns == ns && n.key == key { - if atomic.LoadInt32(&n.ref) == 0 { - deleted = true + // Find the node. + i := b.nodes.search(ns, key) + if i == len(b.nodes) { + b.mu.Unlock() + return true, false + } + n := b.nodes[i] + var bLen int + if n.ns == ns && n.key == key { + if atomic.LoadInt32(&n.ref) == 0 { + deleted = true + // Save and clear value. + if n.value != nil { // Call releaser. - if n.value != nil { - if r, ok := n.value.(util.Releaser); ok { - r.Release() - } - n.value = nil + if r, ok := n.value.(util.Releaser); ok { + r.Release() } - - // Remove node from bucket. - b.node = append(b.node[:i], b.node[i+1:]...) - bLen = len(b.node) + n.value = nil } - break + + // Remove node from bucket. + b.nodes = append(b.nodes[:i], b.nodes[i+1:]...) + bLen = len(b.nodes) } } b.mu.Unlock() if deleted { - // Call OnDel. - for _, f := range n.onDel { + // Call delete funcs. + for _, f := range n.delFuncs { f() } // Update counter. - atomic.AddInt32(&r.size, int32(n.size)*-1) - shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold + atomic.AddInt64(&r.statSize, int64(n.size)*-1) + shrink := atomic.AddInt64(&r.statNodes, -1) < h.shrinkThreshold if bLen >= mOverflowThreshold { atomic.AddInt32(&h.overflow, -1) } // Shrink. - if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) { + if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgress, 0, 1) { nhLen := len(h.buckets) >> 1 - nh := &mNode{ - buckets: make([]unsafe.Pointer, nhLen), + nh := &mHead{ + buckets: make([]mBucket, nhLen), mask: uint32(nhLen) - 1, - pred: unsafe.Pointer(h), - growThreshold: int32(nhLen * mOverflowThreshold), - shrinkThreshold: int32(nhLen >> 1), + predecessor: unsafe.Pointer(h), + growThreshold: int64(nhLen * mOverflowThreshold), + shrinkThreshold: int64(nhLen >> 1), } ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh)) if !ok { panic("BUG: failed swapping head") } + atomic.AddInt32(&r.statShrink, 1) go nh.initBuckets() } } @@ -216,95 +259,134 @@ func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, return true, deleted } -type mNode struct { - buckets []unsafe.Pointer // []*mBucket - mask uint32 - pred unsafe.Pointer // *mNode - resizeInProgess int32 +type mHead struct { + buckets []mBucket + mask uint32 + predecessor unsafe.Pointer // *mNode + resizeInProgress int32 overflow int32 - growThreshold int32 - shrinkThreshold int32 + growThreshold int64 + shrinkThreshold int64 } -func (n *mNode) initBucket(i uint32) *mBucket { - if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil { +func (h *mHead) initBucket(i uint32) *mBucket { + b := &h.buckets[i] + b.mu.Lock() + if b.state >= bucketInitialized { + b.mu.Unlock() return b } - p := (*mNode)(atomic.LoadPointer(&n.pred)) - if p != nil { - var node []*Node - if n.mask > p.mask { - // Grow. - pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask])) - if pb == nil { - pb = p.initBucket(i & p.mask) - } - m := pb.freeze() - // Split nodes. - for _, x := range m { - if x.hash&n.mask == i { - node = append(node, x) - } - } - } else { - // Shrink. - pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i])) - if pb0 == nil { - pb0 = p.initBucket(i) - } - pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))])) - if pb1 == nil { - pb1 = p.initBucket(i + uint32(len(n.buckets))) - } - m0 := pb0.freeze() - m1 := pb1.freeze() - // Merge nodes. - node = make([]*Node, 0, len(m0)+len(m1)) - node = append(node, m0...) - node = append(node, m1...) - } - b := &mBucket{node: node} - if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) { - if len(node) > mOverflowThreshold { - atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold)) + p := (*mHead)(atomic.LoadPointer(&h.predecessor)) + if p == nil { + panic("BUG: uninitialized bucket doesn't have predecessor") + } + + var nodes mNodes + if h.mask > p.mask { + // Grow. + m := p.initBucket(i & p.mask).freeze() + // Split nodes. + for _, x := range m { + if x.hash&h.mask == i { + nodes = append(nodes, x) } - return b } + } else { + // Shrink. + m0 := p.initBucket(i).freeze() + m1 := p.initBucket(i + uint32(len(h.buckets))).freeze() + // Merge nodes. + nodes = make(mNodes, 0, len(m0)+len(m1)) + nodes = append(nodes, m0...) + nodes = append(nodes, m1...) + nodes.sort() + } + b.nodes = nodes + b.state = bucketInitialized + b.mu.Unlock() + return b +} + +func (h *mHead) initBuckets() { + for i := range h.buckets { + h.initBucket(uint32(i)) } + atomic.StorePointer(&h.predecessor, nil) +} - return (*mBucket)(atomic.LoadPointer(&n.buckets[i])) +func (h *mHead) enumerateNodesWithCB(f func([]*Node)) { + var nodes []*Node + for x := range h.buckets { + b := h.initBucket(uint32(x)) + + b.mu.Lock() + nodes = append(nodes, b.nodes...) + b.mu.Unlock() + f(nodes) + } } -func (n *mNode) initBuckets() { - for i := range n.buckets { - n.initBucket(uint32(i)) +func (h *mHead) enumerateNodesByNS(ns uint64) []*Node { + var nodes []*Node + for x := range h.buckets { + b := h.initBucket(uint32(x)) + + b.mu.Lock() + i := b.nodes.search(ns, 0) + for ; i < len(b.nodes); i++ { + n := b.nodes[i] + if n.ns != ns { + break + } + nodes = append(nodes, n) + } + b.mu.Unlock() } - atomic.StorePointer(&n.pred, nil) + return nodes +} + +type Stats struct { + Buckets int + Nodes int64 + Size int64 + GrowCount int32 + ShrinkCount int32 + HitCount int64 + MissCount int64 + SetCount int64 + DelCount int64 } // Cache is a 'cache map'. type Cache struct { mu sync.RWMutex mHead unsafe.Pointer // *mNode - nodes int32 - size int32 cacher Cacher closed bool + + statNodes int64 + statSize int64 + statGrow int32 + statShrink int32 + statHit int64 + statMiss int64 + statSet int64 + statDel int64 } // NewCache creates a new 'cache map'. The cacher is optional and // may be nil. func NewCache(cacher Cacher) *Cache { - h := &mNode{ - buckets: make([]unsafe.Pointer, mInitialSize), + h := &mHead{ + buckets: make([]mBucket, mInitialSize), mask: mInitialSize - 1, - growThreshold: int32(mInitialSize * mOverflowThreshold), + growThreshold: int64(mInitialSize * mOverflowThreshold), shrinkThreshold: 0, } for i := range h.buckets { - h.buckets[i] = unsafe.Pointer(&mBucket{}) + h.buckets[i].state = bucketInitialized } r := &Cache{ mHead: unsafe.Pointer(h), @@ -313,14 +395,20 @@ func NewCache(cacher Cacher) *Cache { return r } -func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) { - h := (*mNode)(atomic.LoadPointer(&r.mHead)) +func (r *Cache) getBucket(hash uint32) (*mHead, *mBucket) { + h := (*mHead)(atomic.LoadPointer(&r.mHead)) i := hash & h.mask - b := (*mBucket)(atomic.LoadPointer(&h.buckets[i])) - if b == nil { - b = h.initBucket(i) - } - return h, b + return h, h.initBucket(i) +} + +func (r *Cache) enumerateNodesWithCB(f func([]*Node)) { + h := (*mHead)(atomic.LoadPointer(&r.mHead)) + h.enumerateNodesWithCB(f) +} + +func (r *Cache) enumerateNodesByNS(ns uint64) []*Node { + h := (*mHead)(atomic.LoadPointer(&r.mHead)) + return h.enumerateNodesByNS(ns) } func (r *Cache) delete(n *Node) bool { @@ -333,14 +421,29 @@ func (r *Cache) delete(n *Node) bool { } } +// GetStats returns cache statistics. +func (r *Cache) GetStats() Stats { + return Stats{ + Buckets: len((*mHead)(atomic.LoadPointer(&r.mHead)).buckets), + Nodes: atomic.LoadInt64(&r.statNodes), + Size: atomic.LoadInt64(&r.statSize), + GrowCount: atomic.LoadInt32(&r.statGrow), + ShrinkCount: atomic.LoadInt32(&r.statShrink), + HitCount: atomic.LoadInt64(&r.statHit), + MissCount: atomic.LoadInt64(&r.statMiss), + SetCount: atomic.LoadInt64(&r.statSet), + DelCount: atomic.LoadInt64(&r.statDel), + } +} + // Nodes returns number of 'cache node' in the map. func (r *Cache) Nodes() int { - return int(atomic.LoadInt32(&r.nodes)) + return int(atomic.LoadInt64(&r.statNodes)) } // Size returns sums of 'cache node' size in the map. func (r *Cache) Size() int { - return int(atomic.LoadInt32(&r.size)) + return int(atomic.LoadInt64(&r.statSize)) } // Capacity returns cache capacity. @@ -374,14 +477,20 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han hash := murmur32(ns, key, 0xf00) for { h, b := r.getBucket(hash) - done, _, n := b.get(r, h, hash, ns, key, setFunc == nil) + done, created, n := b.get(r, h, hash, ns, key, setFunc == nil) if done { + if created || n == nil { + atomic.AddInt64(&r.statMiss, 1) + } else { + atomic.AddInt64(&r.statHit, 1) + } + if n != nil { n.mu.Lock() if n.value == nil { if setFunc == nil { n.mu.Unlock() - n.unref() + n.unRefInternal(false) return nil } @@ -389,10 +498,11 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han if n.value == nil { n.size = 0 n.mu.Unlock() - n.unref() + n.unRefInternal(false) return nil } - atomic.AddInt32(&r.size, int32(n.size)) + atomic.AddInt64(&r.statSet, 1) + atomic.AddInt64(&r.statSize, int64(n.size)) } n.mu.Unlock() if r.cacher != nil { @@ -412,11 +522,11 @@ func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Han // only attributed to the particular 'cache node', so when a 'cache node' // is recreated it will not be banned. // -// If onDel is not nil, then it will be executed if such 'cache node' +// If delFunc is not nil, then it will be executed if such 'cache node' // doesn't exist or once the 'cache node' is released. // // Delete return true is such 'cache node' exist. -func (r *Cache) Delete(ns, key uint64, onDel func()) bool { +func (r *Cache) Delete(ns, key uint64, delFunc func()) bool { r.mu.RLock() defer r.mu.RUnlock() if r.closed { @@ -429,15 +539,15 @@ func (r *Cache) Delete(ns, key uint64, onDel func()) bool { done, _, n := b.get(r, h, hash, ns, key, true) if done { if n != nil { - if onDel != nil { + if delFunc != nil { n.mu.Lock() - n.onDel = append(n.onDel, onDel) + n.delFuncs = append(n.delFuncs, delFunc) n.mu.Unlock() } if r.cacher != nil { r.cacher.Ban(n) } - n.unref() + n.unRefInternal(true) return true } @@ -445,8 +555,8 @@ func (r *Cache) Delete(ns, key uint64, onDel func()) bool { } } - if onDel != nil { - onDel() + if delFunc != nil { + delFunc() } return false @@ -472,7 +582,7 @@ func (r *Cache) Evict(ns, key uint64) bool { if r.cacher != nil { r.cacher.Evict(n) } - n.unref() + n.unRefInternal(true) return true } @@ -484,7 +594,7 @@ func (r *Cache) Evict(ns, key uint64) bool { } // EvictNS evicts 'cache node' with the given namespace. This will -// simply call Cacher.EvictNS. +// simply call Cacher.Evict on all nodes with the given namespace. func (r *Cache) EvictNS(ns uint64) { r.mu.RLock() defer r.mu.RUnlock() @@ -493,10 +603,21 @@ func (r *Cache) EvictNS(ns uint64) { } if r.cacher != nil { - r.cacher.EvictNS(ns) + nodes := r.enumerateNodesByNS(ns) + for _, n := range nodes { + r.cacher.Evict(n) + } } } +func (r *Cache) evictAll() { + r.enumerateNodesWithCB(func(nodes []*Node) { + for _, n := range nodes { + r.cacher.Evict(n) + } + }) +} + // EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll. func (r *Cache) EvictAll() { r.mu.RLock() @@ -506,66 +627,46 @@ func (r *Cache) EvictAll() { } if r.cacher != nil { - r.cacher.EvictAll() + r.evictAll() } } -// Close closes the 'cache map' and forcefully releases all 'cache node'. -func (r *Cache) Close() error { +// Close closes the 'cache map'. +// All 'Cache' method is no-op after 'cache map' is closed. +// All 'cache node' will be evicted from 'cacher'. +// +// If 'force' is true then all 'cache node' will be forcefully released +// even if the 'node ref' is not zero. +func (r *Cache) Close(force bool) { + var head *mHead + // Hold RW-lock to make sure no more in-flight operations. r.mu.Lock() if !r.closed { r.closed = true + head = (*mHead)(atomic.LoadPointer(&r.mHead)) + atomic.StorePointer(&r.mHead, nil) + } + r.mu.Unlock() - h := (*mNode)(r.mHead) - h.initBuckets() + if head != nil { + head.enumerateNodesWithCB(func(nodes []*Node) { + for _, n := range nodes { + // Zeroing ref. Prevent unRefExternal to call finalizer. + if force { + atomic.StoreInt32(&n.ref, 0) + } - for i := range h.buckets { - b := (*mBucket)(h.buckets[i]) - for _, n := range b.node { - // Call releaser. - if n.value != nil { - if r, ok := n.value.(util.Releaser); ok { - r.Release() - } - n.value = nil + // Evict from cacher. + if r.cacher != nil { + r.cacher.Evict(n) } - // Call OnDel. - for _, f := range n.onDel { - f() + if force { + n.callFinalizer() } - n.onDel = nil } - } + }) } - r.mu.Unlock() - - // Avoid deadlock. - if r.cacher != nil { - if err := r.cacher.Close(); err != nil { - return err - } - } - return nil -} - -// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but -// unlike Close it doesn't forcefully releases 'cache node'. -func (r *Cache) CloseWeak() error { - r.mu.Lock() - if !r.closed { - r.closed = true - } - r.mu.Unlock() - - // Avoid deadlock. - if r.cacher != nil { - r.cacher.EvictAll() - if err := r.cacher.Close(); err != nil { - return err - } - } - return nil } // Node is a 'cache node'. @@ -579,8 +680,8 @@ type Node struct { size int value Value - ref int32 - onDel []func() + ref int32 + delFuncs []func() CacheData unsafe.Pointer } @@ -618,17 +719,39 @@ func (n *Node) GetHandle() *Handle { return &Handle{unsafe.Pointer(n)} } -func (n *Node) unref() { +func (n *Node) callFinalizer() { + // Call releaser. + if n.value != nil { + if r, ok := n.value.(util.Releaser); ok { + r.Release() + } + n.value = nil + } + + // Call delete funcs. + for _, f := range n.delFuncs { + f() + } + n.delFuncs = nil +} + +func (n *Node) unRefInternal(updateStat bool) { if atomic.AddInt32(&n.ref, -1) == 0 { n.r.delete(n) + if updateStat { + atomic.AddInt64(&n.r.statDel, 1) + } } } -func (n *Node) unrefLocked() { +func (n *Node) unRefExternal() { if atomic.AddInt32(&n.ref, -1) == 0 { n.r.mu.RLock() - if !n.r.closed { + if n.r.closed { + n.callFinalizer() + } else { n.r.delete(n) + atomic.AddInt64(&n.r.statDel, 1) } n.r.mu.RUnlock() } @@ -654,7 +777,7 @@ func (h *Handle) Release() { nPtr := atomic.LoadPointer(&h.n) if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) { n := (*Node)(nPtr) - n.unrefLocked() + n.unRefExternal() } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/lru.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/lru.go index d9a84cde15e..383ad5a5666 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/lru.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/lru.go @@ -142,51 +142,14 @@ func (r *lru) Evict(n *Node) { r.mu.Unlock() return } + rn.remove() + r.used -= n.Size() n.CacheData = nil r.mu.Unlock() rn.h.Release() } -func (r *lru) EvictNS(ns uint64) { - var evicted []*lruNode - - r.mu.Lock() - for e := r.recent.prev; e != &r.recent; { - rn := e - e = e.prev - if rn.n.NS() == ns { - rn.remove() - rn.n.CacheData = nil - r.used -= rn.n.Size() - evicted = append(evicted, rn) - } - } - r.mu.Unlock() - - for _, rn := range evicted { - rn.h.Release() - } -} - -func (r *lru) EvictAll() { - r.mu.Lock() - back := r.recent.prev - for rn := back; rn != &r.recent; rn = rn.prev { - rn.n.CacheData = nil - } - r.reset() - r.mu.Unlock() - - for rn := back; rn != &r.recent; rn = rn.prev { - rn.h.Release() - } -} - -func (r *lru) Close() error { - return nil -} - // NewLRU create a new LRU-cache. func NewLRU(capacity int) Cacher { r := &lru{capacity: capacity} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index 74e9826956d..b2724cd9e3c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/journal" @@ -141,7 +142,6 @@ func openDB(s *session) (*DB, error) { } return nil, err } - } // Doesn't need to be included in the wait group. @@ -149,7 +149,9 @@ func openDB(s *session) (*DB, error) { go db.mpoolDrain() if readOnly { - db.SetReadOnly() + if err := db.SetReadOnly(); err != nil { + return nil, err + } } else { db.closeW.Add(2) go db.tCompaction() @@ -311,15 +313,23 @@ func recoverTable(s *session, o *opt.Options) error { return } defer func() { - writer.Close() + if cerr := writer.Close(); cerr != nil { + if err == nil { + err = cerr + } else { + err = fmt.Errorf("error recovering table (%v); error closing (%v)", err, cerr) + } + } if err != nil { - s.stor.Remove(tmpFd) + if rerr := s.stor.Remove(tmpFd); rerr != nil { + err = fmt.Errorf("error recovering table (%v); error removing (%v)", err, rerr) + } tmpFd = storage.FileDesc{} } }() // Copy entries. - tw := table.NewWriter(writer, o) + tw := table.NewWriter(writer, o, nil, 0) for iter.Next() { key := iter.Key() if validInternalKey(key) { @@ -397,7 +407,7 @@ func recoverTable(s *session, o *opt.Options) error { tSeq = seq } if imin == nil { - imin = append([]byte{}, key...) + imin = append([]byte(nil), key...) } imax = append(imax[:0], key...) } @@ -530,7 +540,8 @@ func (db *DB) recoverJournal() error { if jr == nil { jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) } else { - jr.Reset(fr, dropper{db.s, fd}, strict, checksum) + // Ignore the error here + _ = jr.Reset(fr, dropper{db.s, fd}, strict, checksum) } // Flush memdb and remove obsolete journal file. @@ -550,7 +561,10 @@ func (db *DB) recoverJournal() error { } rec.resetAddedTables() - db.s.stor.Remove(ofd) + if err := db.s.stor.Remove(ofd); err != nil { + fr.Close() + return err + } ofd = storage.FileDesc{} } @@ -634,7 +648,9 @@ func (db *DB) recoverJournal() error { // Remove the last obsolete journal file. if !ofd.Zero() { - db.s.stor.Remove(ofd) + if err := db.s.stor.Remove(ofd); err != nil { + return err + } } return nil @@ -688,7 +704,9 @@ func (db *DB) recoverJournalRO() error { if jr == nil { jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum) } else { - jr.Reset(fr, dropper{db.s, fd}, strict, checksum) + if err := jr.Reset(fr, dropper{db.s, fd}, strict, checksum); err != nil { + return err + } } // Replay journal to memdb. @@ -765,7 +783,7 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R if auxm != nil { if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok { - return append([]byte{}, mv...), me + return append([]byte(nil), mv...), me } } @@ -777,7 +795,7 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R defer m.decref() if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { - return append([]byte{}, mv...), me + return append([]byte(nil), mv...), me } } @@ -1002,15 +1020,15 @@ func (db *DB) GetProperty(name string) (value string, err error) { } } case p == "blockpool": - value = fmt.Sprintf("%v", db.s.tops.bpool) + value = fmt.Sprintf("%v", db.s.tops.blockBuffer) case p == "cachedblock": - if db.s.tops.bcache != nil { - value = fmt.Sprintf("%d", db.s.tops.bcache.Size()) + if db.s.tops.blockCache != nil { + value = fmt.Sprintf("%d", db.s.tops.blockCache.Size()) } else { value = "" } case p == "openedtables": - value = fmt.Sprintf("%d", db.s.tops.cache.Size()) + value = fmt.Sprintf("%d", db.s.tops.fileCache.Size()) case p == "alivesnaps": value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) case p == "aliveiters": @@ -1037,6 +1055,9 @@ type DBStats struct { BlockCacheSize int OpenedTablesCount int + FileCache cache.Stats + BlockCache cache.Stats + LevelSizes Sizes LevelTablesCounts []int LevelRead Sizes @@ -1062,13 +1083,20 @@ func (db *DB) Stats(s *DBStats) error { s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay)) s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1 - s.OpenedTablesCount = db.s.tops.cache.Size() - if db.s.tops.bcache != nil { - s.BlockCacheSize = db.s.tops.bcache.Size() + s.OpenedTablesCount = db.s.tops.fileCache.Size() + if db.s.tops.blockCache != nil { + s.BlockCacheSize = db.s.tops.blockCache.Size() } else { s.BlockCacheSize = 0 } + s.FileCache = db.s.tops.fileCache.GetStats() + if db.s.tops.blockCache != nil { + s.BlockCache = db.s.tops.blockCache.GetStats() + } else { + s.BlockCache = cache.Stats{} + } + s.AliveIterators = atomic.LoadInt32(&db.aliveIters) s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 6b70eb2c9d3..cc275ace1dc 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -7,6 +7,7 @@ package leveldb import ( + "fmt" "sync" "sync/atomic" "time" @@ -272,7 +273,7 @@ func (db *DB) memCompaction() { } defer mdb.decref() - db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size())) + db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(int64(mdb.Size()))) // Don't compact empty memdb. if mdb.Len() == 0 { @@ -350,11 +351,11 @@ func (db *DB) memCompaction() { } type tableCompactionBuilder struct { - db *DB - s *session - c *compaction - rec *sessionRecord - stat0, stat1 *cStatStaging + db *DB + s *session + c *compaction + rec *sessionRecord + stat1 *cStatStaging snapHasLastUkey bool snapLastUkey []byte @@ -389,7 +390,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error { // Create new table. var err error - b.tw, err = b.s.tops.create() + b.tw, err = b.s.tops.create(b.tableSize) if err != nil { return err } @@ -410,29 +411,40 @@ func (b *tableCompactionBuilder) flush() error { } b.rec.addTableFile(b.c.sourceLevel+1, t) b.stat1.write += t.size - b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) + b.s.logf("table@build created L%d@%d N·%d S·%s %q:%q", b.c.sourceLevel+1, t.fd.Num, b.tw.tw.EntriesLen(), shortenb(t.size), t.imin, t.imax) b.tw = nil return nil } -func (b *tableCompactionBuilder) cleanup() { +func (b *tableCompactionBuilder) cleanup() error { if b.tw != nil { - b.tw.drop() + if err := b.tw.drop(); err != nil { + return err + } b.tw = nil } + return nil } -func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) error { +func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) { snapResumed := b.snapIter > 0 hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary. - lastUkey := append([]byte{}, b.snapLastUkey...) + lastUkey := append([]byte(nil), b.snapLastUkey...) lastSeq := b.snapLastSeq b.kerrCnt = b.snapKerrCnt b.dropCnt = b.snapDropCnt // Restore compaction state. b.c.restore() - defer b.cleanup() + defer func() { + if cerr := b.cleanup(); cerr != nil { + if err == nil { + err = cerr + } else { + err = fmt.Errorf("tableCompactionBuilder error: %v, cleanup error (%v)", err, cerr) + } + } + }() b.stat1.startTimer() defer b.stat1.stopTimer() @@ -563,7 +575,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { rec.delTable(c.sourceLevel+i, t.fd.Num) } } - sourceSize := int(stats[0].read + stats[1].read) + sourceSize := stats[0].read + stats[1].read minSeq := db.minSeq() db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.sourceLevel, len(c.levels[0]), c.sourceLevel+1, len(c.levels[1]), shortenb(sourceSize), minSeq) @@ -584,7 +596,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { db.compactionCommit("table", rec) stats[1].stopTimer() - resultSize := int(stats[1].write) + resultSize := stats[1].write db.logf("table@compaction committed F%s S%s Ke·%d D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), b.kerrCnt, b.dropCnt, stats[1].duration) // Save compaction stats @@ -655,10 +667,7 @@ func (db *DB) tableNeedCompaction() bool { func (db *DB) resumeWrite() bool { v := db.s.version() defer v.release() - if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() { - return true - } - return false + return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() } func (db *DB) pauseCompaction(ch chan<- struct{}) { @@ -681,7 +690,7 @@ type cAuto struct { func (r cAuto) ack(err error) { if r.ackC != nil { defer func() { - recover() + _ = recover() }() r.ackC <- err } @@ -696,7 +705,7 @@ type cRange struct { func (r cRange) ack(err error) { if r.ackC != nil { defer func() { - recover() + _ = recover() }() r.ackC <- err } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go index e6e8ca59d08..ded13d3eb26 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -7,7 +7,6 @@ package leveldb import ( - "errors" "math/rand" "runtime" "sync" @@ -18,10 +17,6 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -var ( - errInvalidInternalKey = errors.New("leveldb: Iterator: invalid internal key") -) - type memdbReleaser struct { once sync.Once m *memDB diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go index 65e1c54bb41..29430fee9c9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -137,8 +137,12 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { if db.journal == nil { db.journal = journal.NewWriter(w) } else { - db.journal.Reset(w) - db.journalWriter.Close() + if err := db.journal.Reset(w); err != nil { + return nil, err + } + if err := db.journalWriter.Close(); err != nil { + return nil, err + } db.frozenJournalFd = db.journalFd } db.journalWriter = w @@ -181,13 +185,6 @@ func (db *DB) getEffectiveMem() *memDB { return db.mem } -// Check whether we has frozen memdb. -func (db *DB) hasFrozenMem() bool { - db.memMu.RLock() - defer db.memMu.RUnlock() - return db.frozenMem != nil -} - // Get frozen memdb. func (db *DB) getFrozenMem() *memDB { db.memMu.RLock() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index 21d1e512f34..b7b82fd8431 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -110,7 +110,7 @@ func (tr *Transaction) flush() error { tr.tables = append(tr.tables, t) tr.rec.addTableFile(0, t) tr.stats.write += t.size - tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax) + tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(t.size), t.imin, t.imax) } return nil } @@ -244,7 +244,7 @@ func (tr *Transaction) Commit() error { // Additionally, wait compaction when certain threshold reached. // Ignore error, returns error only if transaction can't be committed. - tr.db.waitCompaction() + _ = tr.db.waitCompaction() } // Only mark as done if transaction committed successfully. tr.setDone() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go index db0c1bece1d..18eddbe1e0b 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -246,7 +246,10 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { // Rotate memdb if it's reach the threshold. if batch.internalLen >= mdbFree { - db.rotateMem(0, false) + if _, err := db.rotateMem(0, false); err != nil { + db.unlockWrite(overflow, merged, err) + return err + } } db.unlockWrite(overflow, merged, nil) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go b/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go index 8d6146b6f5c..0c7f64b284a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/errors/errors.go @@ -73,6 +73,7 @@ func SetFd(err error, fd storage.FileDesc) error { case *ErrCorrupted: x.Fd = fd return x + default: + return err } - return err } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go index a23ab05f70f..1e4fe4edbd6 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/array_iter.go @@ -88,10 +88,7 @@ func (i *basicArrayIterator) Seek(key []byte) bool { return false } i.pos = i.array.Search(key) - if i.pos >= n { - return false - } - return true + return i.pos < n } func (i *basicArrayIterator) Next() bool { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go index 939adbb9332..fd0b55adbd3 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/indexed_iter.go @@ -26,10 +26,9 @@ type indexedIterator struct { index IteratorIndexer strict bool - data Iterator - err error - errf func(err error) - closed bool + data Iterator + err error + errf func(err error) } func (i *indexedIterator) setData() { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go index 1a7e29df8fb..374e82b66e2 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/merged_iter.go @@ -7,6 +7,8 @@ package iterator import ( + "container/heap" + "github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/util" @@ -33,6 +35,9 @@ type mergedIterator struct { err error errf func(err error) releaser util.Releaser + + indexes []int // the heap of iterator indexes + reverse bool //nolint: structcheck // if true, indexes is a max-heap } func assertKey(key []byte) []byte { @@ -67,16 +72,20 @@ func (i *mergedIterator) First() bool { return false } + h := i.indexHeap() + h.Reset(false) for x, iter := range i.iters { switch { case iter.First(): i.keys[x] = assertKey(iter.Key()) + h.Push(x) case i.iterErr(iter): return false default: i.keys[x] = nil } } + heap.Init(h) i.dir = dirSOI return i.next() } @@ -89,16 +98,20 @@ func (i *mergedIterator) Last() bool { return false } + h := i.indexHeap() + h.Reset(true) for x, iter := range i.iters { switch { case iter.Last(): i.keys[x] = assertKey(iter.Key()) + h.Push(x) case i.iterErr(iter): return false default: i.keys[x] = nil } } + heap.Init(h) i.dir = dirEOI return i.prev() } @@ -111,35 +124,31 @@ func (i *mergedIterator) Seek(key []byte) bool { return false } + h := i.indexHeap() + h.Reset(false) for x, iter := range i.iters { switch { case iter.Seek(key): i.keys[x] = assertKey(iter.Key()) + h.Push(x) case i.iterErr(iter): return false default: i.keys[x] = nil } } + heap.Init(h) i.dir = dirSOI return i.next() } func (i *mergedIterator) next() bool { - var key []byte - if i.dir == dirForward { - key = i.keys[i.index] - } - for x, tkey := range i.keys { - if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) < 0) { - key = tkey - i.index = x - } - } - if key == nil { + h := i.indexHeap() + if h.Len() == 0 { i.dir = dirEOI return false } + i.index = heap.Pop(h).(int) i.dir = dirForward return true } @@ -156,7 +165,7 @@ func (i *mergedIterator) Next() bool { case dirSOI: return i.First() case dirBackward: - key := append([]byte{}, i.keys[i.index]...) + key := append([]byte(nil), i.keys[i.index]...) if !i.Seek(key) { return false } @@ -168,6 +177,7 @@ func (i *mergedIterator) Next() bool { switch { case iter.Next(): i.keys[x] = assertKey(iter.Key()) + heap.Push(i.indexHeap(), x) case i.iterErr(iter): return false default: @@ -177,20 +187,12 @@ func (i *mergedIterator) Next() bool { } func (i *mergedIterator) prev() bool { - var key []byte - if i.dir == dirBackward { - key = i.keys[i.index] - } - for x, tkey := range i.keys { - if tkey != nil && (key == nil || i.cmp.Compare(tkey, key) > 0) { - key = tkey - i.index = x - } - } - if key == nil { + h := i.indexHeap() + if h.Len() == 0 { i.dir = dirSOI return false } + i.index = heap.Pop(h).(int) i.dir = dirBackward return true } @@ -207,7 +209,9 @@ func (i *mergedIterator) Prev() bool { case dirEOI: return i.Last() case dirForward: - key := append([]byte{}, i.keys[i.index]...) + key := append([]byte(nil), i.keys[i.index]...) + h := i.indexHeap() + h.Reset(true) for x, iter := range i.iters { if x == i.index { continue @@ -216,12 +220,14 @@ func (i *mergedIterator) Prev() bool { switch { case seek && iter.Prev(), !seek && iter.Last(): i.keys[x] = assertKey(iter.Key()) + h.Push(x) case i.iterErr(iter): return false default: i.keys[x] = nil } } + heap.Init(h) } x := i.index @@ -229,6 +235,7 @@ func (i *mergedIterator) Prev() bool { switch { case iter.Prev(): i.keys[x] = assertKey(iter.Key()) + heap.Push(i.indexHeap(), x) case i.iterErr(iter): return false default: @@ -259,6 +266,7 @@ func (i *mergedIterator) Release() { } i.iters = nil i.keys = nil + i.indexes = nil if i.releaser != nil { i.releaser.Release() i.releaser = nil @@ -284,6 +292,10 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) { i.errf = f } +func (i *mergedIterator) indexHeap() *indexHeap { + return (*indexHeap)(i) +} + // NewMergedIterator returns an iterator that merges its input. Walking the // resultant iterator will return all key/value pairs of all input iterators // in strictly increasing key order, as defined by cmp. @@ -296,9 +308,43 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) { // continue to the next 'input iterator'. func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator { return &mergedIterator{ - iters: iters, - cmp: cmp, - strict: strict, - keys: make([][]byte, len(iters)), + iters: iters, + cmp: cmp, + strict: strict, + keys: make([][]byte, len(iters)), + indexes: make([]int, 0, len(iters)), } } + +// indexHeap implements heap.Interface. +type indexHeap mergedIterator + +func (h *indexHeap) Len() int { return len(h.indexes) } +func (h *indexHeap) Less(i, j int) bool { + i, j = h.indexes[i], h.indexes[j] + r := h.cmp.Compare(h.keys[i], h.keys[j]) + if h.reverse { + return r > 0 + } + return r < 0 +} + +func (h *indexHeap) Swap(i, j int) { + h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i] +} + +func (h *indexHeap) Push(value interface{}) { + h.indexes = append(h.indexes, value.(int)) +} + +func (h *indexHeap) Pop() interface{} { + e := len(h.indexes) - 1 + popped := h.indexes[e] + h.indexes = h.indexes[:e] + return popped +} + +func (h *indexHeap) Reset(reverse bool) { + h.reverse = reverse + h.indexes = h.indexes[:0] +} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go b/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go index d094c3d0f8a..f7f8b540ed4 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/journal/journal.go @@ -354,6 +354,8 @@ type Writer struct { // buf[:written] has already been written to w. // written is zero unless Flush has been called. written int + // blockNumber is the zero based block number currently held in buf. + blockNumber int64 // first is whether the current chunk is the first chunk of the journal. first bool // pending is whether a chunk is buffered but not yet written. @@ -402,6 +404,7 @@ func (w *Writer) writeBlock() { w.i = 0 w.j = headerSize w.written = 0 + w.blockNumber++ } // writePending finishes the current journal and writes the buffer to the @@ -457,6 +460,7 @@ func (w *Writer) Reset(writer io.Writer) (err error) { w.i = 0 w.j = 0 w.written = 0 + w.blockNumber = 0 w.first = false w.pending = false w.err = nil @@ -474,7 +478,7 @@ func (w *Writer) Next() (io.Writer, error) { w.fillHeader(true) } w.i = w.j - w.j = w.j + headerSize + w.j += headerSize // Check if there is room in the block for the header. if w.j > blockSize { // Fill in the rest of the block with zeroes. @@ -491,6 +495,14 @@ func (w *Writer) Next() (io.Writer, error) { return singleWriter{w, w.seq}, nil } +// Size returns the current size of the file. +func (w *Writer) Size() int64 { + if w == nil { + return 0 + } + return w.blockNumber*blockSize + int64(w.j) +} + type singleWriter struct { w *Writer seq int diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/key.go b/vendor/github.com/syndtr/goleveldb/leveldb/key.go index ad8f51ec85d..dc7be1fad96 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/key.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/key.go @@ -25,7 +25,7 @@ func (e *ErrInternalKeyCorrupted) Error() string { } func newErrInternalKeyCorrupted(ikey []byte, reason string) error { - return errors.NewErrCorrupted(storage.FileDesc{}, &ErrInternalKeyCorrupted{append([]byte{}, ikey...), reason}) + return errors.NewErrCorrupted(storage.FileDesc{}, &ErrInternalKeyCorrupted{append([]byte(nil), ikey...), reason}) } type keyType uint @@ -90,7 +90,7 @@ func parseInternalKey(ik []byte) (ukey []byte, seq uint64, kt keyType, err error return nil, 0, 0, newErrInternalKeyCorrupted(ik, "invalid length") } num := binary.LittleEndian.Uint64(ik[len(ik)-8:]) - seq, kt = uint64(num>>8), keyType(num&0xff) + seq, kt = num>>8, keyType(num&0xff) if kt > keyTypeVal { return nil, 0, 0, newErrInternalKeyCorrupted(ik, "invalid type") } @@ -124,7 +124,7 @@ func (ik internalKey) num() uint64 { func (ik internalKey) parseNum() (seq uint64, kt keyType) { num := ik.num() - seq, kt = uint64(num>>8), keyType(num&0xff) + seq, kt = num>>8, keyType(num&0xff) if kt > keyTypeVal { panic(fmt.Sprintf("leveldb: internal key %q, len=%d: invalid type %#x", []byte(ik), len(ik), kt)) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index dead5fdfbe0..48fb0416dc9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -41,6 +41,7 @@ var ( DefaultWriteL0PauseTrigger = 12 DefaultWriteL0SlowdownTrigger = 8 DefaultFilterBaseLg = 11 + DefaultMaxManifestFileSize = int64(64 * MiB) ) // Cacher is a caching algorithm. @@ -48,23 +49,60 @@ type Cacher interface { New(capacity int) cache.Cacher } -type CacherFunc struct { +type cacherFunc struct { NewFunc func(capacity int) cache.Cacher } -func (f *CacherFunc) New(capacity int) cache.Cacher { +func (f *cacherFunc) New(capacity int) cache.Cacher { if f != nil && f.NewFunc != nil { return f.NewFunc(capacity) } return nil } +func CacherFunc(f func(capacity int) cache.Cacher) Cacher { + return &cacherFunc{f} +} + +type passthroughCacher struct { + Cacher cache.Cacher +} + +func (p *passthroughCacher) New(capacity int) cache.Cacher { + return p.Cacher +} + +// PassthroughCacher can be used to passthrough pre-initialized +// 'cacher instance'. This is useful for sharing cache over multiple +// DB instances. +// +// Shared cache example: +// +// fileCache := opt.NewLRU(500) +// blockCache := opt.NewLRU(8 * opt.MiB) +// options := &opt.Options{ +// OpenFilesCacher: fileCache, +// BlockCacher: blockCache, +// } +// db1, err1 := leveldb.OpenFile("path/to/db1", options) +// ... +// db2, err2 := leveldb.OpenFile("path/to/db2", options) +// ... +func PassthroughCacher(x cache.Cacher) Cacher { + return &passthroughCacher{x} +} + +// NewLRU creates LRU 'passthrough cacher'. +func NewLRU(capacity int) Cacher { + return PassthroughCacher(cache.NewLRU(capacity)) +} + var ( // LRUCacher is the LRU-cache algorithm. - LRUCacher = &CacherFunc{cache.NewLRU} + LRUCacher = CacherFunc(cache.NewLRU) // NoCacher is the value to disable caching algorithm. - NoCacher = &CacherFunc{} + NoCacher = CacherFunc(nil) ) // Compression is the 'sorted table' block compression algorithm to use. @@ -376,6 +414,13 @@ type Options struct { // // The default value is 11(as well as 2KB) FilterBaseLg int + + // MaxManifestFileSize is the maximum size limit of the MANIFEST-****** file. + // When the MANIFEST-****** file grows beyond this size, LevelDB will create + // a new MANIFEST file. + // + // The default value is 64 MiB. + MaxManifestFileSize int64 } func (o *Options) GetAltFilters() []filter.Filter { @@ -715,7 +760,13 @@ func (wo *WriteOptions) GetSync() bool { func GetStrict(o *Options, ro *ReadOptions, strict Strict) bool { if ro.GetStrict(StrictOverride) { return ro.GetStrict(strict) - } else { - return o.GetStrict(strict) || ro.GetStrict(strict) } + return o.GetStrict(strict) || ro.GetStrict(strict) +} + +func (o *Options) GetMaxManifestFileSize() int64 { + if o == nil || o.MaxManifestFileSize <= 0 { + return DefaultMaxManifestFileSize + } + return o.MaxManifestFileSize } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go index 67b820427fc..e7490816e70 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go @@ -1,3 +1,4 @@ +//go:build darwin // +build darwin package opt diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go index 97a14a892ac..4c9f4b05baa 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go @@ -1,3 +1,4 @@ +//go:build !darwin // +build !darwin package opt diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index e143352176e..036570e0f1f 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -54,7 +54,7 @@ type session struct { stCompPtrs []internalKey // compaction pointers; need external synchronization stVersion *version // current version - ntVersionId int64 // next version id to assign + ntVersionID int64 // next version id to assign refCh chan *vTask relCh chan *vTask deltaCh chan *vDelta @@ -107,7 +107,7 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) + s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionID}) // Close all background goroutines close(s.closeC) @@ -171,7 +171,7 @@ func (s *session) recover() (err error) { if err == nil { // save compact pointers for _, r := range rec.compPtrs { - s.setCompPtr(r.level, internalKey(r.ikey)) + s.setCompPtr(r.level, r.ikey) } // commit record to version staging staging.commit(rec) @@ -226,6 +226,9 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) { if s.manifest == nil { // manifest journal writer not yet created, create one err = s.newManifest(r, nv) + } else if s.manifest.Size() >= s.o.GetMaxManifestFileSize() { + // pass nil sessionRecord to avoid over-reference table file + err = s.newManifest(nil, nv) } else { err = s.flushManifest(r) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go index b46a3e45366..2fd5f32e662 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go @@ -48,7 +48,7 @@ func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (i flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel) rec.addTableFile(flushLevel, t) - s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax) + s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(t.size), t.imin, t.imax) return flushLevel, nil } @@ -226,8 +226,8 @@ func (c *compaction) expand() { exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) if len(exp1) == len(t1) { c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", - c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), - len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) + c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(t0.size()), len(t1), shortenb(t1.size()), + len(exp0), shortenb(exp0.size()), len(exp1), shortenb(exp1.size())) imin, imax = xmin, xmax t0, t1 = exp0, exp1 amin, amax = append(t0, t1...).getRange(c.s.icmp) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_record.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_record.go index 854e1aa6f9b..b1a352f6718 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_record.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_record.go @@ -201,7 +201,7 @@ func (p *sessionRecord) readUvarintMayEOF(field string, r io.ByteReader, mayEOF } x, err := binary.ReadUvarint(r) if err != nil { - if err == io.ErrUnexpectedEOF || (mayEOF == false && err == io.EOF) { + if err == io.ErrUnexpectedEOF || (!mayEOF && err == io.EOF) { p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, "short read"}) } else if strings.HasPrefix(err.Error(), "binary:") { p.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrManifestCorrupted{field, err.Error()}) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index 730bd2cd347..f467f2d4bc9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -24,7 +24,7 @@ type dropper struct { func (d dropper) Drop(err error) { if e, ok := err.(*journal.ErrCorrupted); ok { - d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(e.Size), e.Reason) + d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(int64(e.Size)), e.Reason) } else { d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err) } @@ -130,7 +130,7 @@ func (s *session) refLoop() { for { // Skip any abandoned version number to prevent blocking processing. if skipAbandoned() { - next += 1 + next++ continue } // Don't bother the version that has been released. @@ -162,13 +162,13 @@ func (s *session) refLoop() { referenced[next] = struct{}{} delete(ref, next) delete(deltas, next) - next += 1 + next++ } // Use delta information to process all released versions. for { if skipAbandoned() { - next += 1 + next++ continue } if d, exist := released[next]; exist { @@ -176,7 +176,7 @@ func (s *session) refLoop() { applyDelta(d) } delete(released, next) - next += 1 + next++ continue } return @@ -396,7 +396,7 @@ func (s *session) recordCommited(rec *sessionRecord) { } for _, r := range rec.compPtrs { - s.setCompPtr(r.level, internalKey(r.ikey)) + s.setCompPtr(r.level, r.ikey) } } @@ -429,14 +429,16 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { s.manifestWriter.Close() } if !s.manifestFd.Zero() { - s.stor.Remove(s.manifestFd) + err = s.stor.Remove(s.manifestFd) } s.manifestFd = fd s.manifestWriter = writer s.manifest = jw } else { writer.Close() - s.stor.Remove(fd) + if rerr := s.stor.Remove(fd); err != nil { + err = fmt.Errorf("newManifest error: %v, cleanup error (%v)", err, rerr) + } s.reuseFileNum(fd.Num) } }() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index 9ba71fd6d10..3c5e70a0e31 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -111,7 +111,9 @@ func OpenFile(path string, readOnly bool) (Storage, error) { defer func() { if err != nil { - flock.release() + if ferr := flock.release(); ferr != nil { + err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, ferr) + } } }() @@ -175,12 +177,13 @@ func itoa(buf []byte, i int, wid int) []byte { return append(buf, b[bp:]...) } -func (fs *fileStorage) printDay(t time.Time) { +func (fs *fileStorage) printDay(t time.Time) error { if fs.day == t.Day() { - return + return nil } fs.day = t.Day() - fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n")) + _, err := fs.logw.Write([]byte("=============== " + t.Format("Jan 2, 2006 (MST)") + " ===============\n")) + return err } func (fs *fileStorage) doLog(t time.Time, str string) { @@ -189,7 +192,9 @@ func (fs *fileStorage) doLog(t time.Time, str string) { fs.logw.Close() fs.logw = nil fs.logSize = 0 - rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")) + if err := rename(filepath.Join(fs.path, "LOG"), filepath.Join(fs.path, "LOG.old")); err != nil { + return + } } if fs.logw == nil { var err error @@ -200,7 +205,9 @@ func (fs *fileStorage) doLog(t time.Time, str string) { // Force printDay on new log file. fs.day = 0 } - fs.printDay(t) + if err := fs.printDay(t); err != nil { + return + } hour, min, sec := t.Clock() msec := t.Nanosecond() / 1e3 // time @@ -634,8 +641,9 @@ func fsGenOldName(fd FileDesc) string { switch fd.Type { case TypeTable: return fmt.Sprintf("%06d.sst", fd.Num) + default: + return fsGenName(fd) } - return fsGenName(fd) } func fsParseName(name string) (fd FileDesc, ok bool) { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go index 5545aeef2a8..b23d4652b33 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_nacl.go @@ -4,6 +4,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +//go:build nacl // +build nacl package storage diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_solaris.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_solaris.go index 79901ee4a7a..cd84ce2e953 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_solaris.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_solaris.go @@ -4,6 +4,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +//go:build solaris // +build solaris package storage diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go index d75f66a9efc..601ffe39975 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go @@ -4,6 +4,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd // +build darwin dragonfly freebsd linux netbsd openbsd package storage diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go index 838f1bee1ba..a32972ad66a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/mem_storage.go @@ -29,7 +29,6 @@ func (lock *memStorageLock) Unlock() { if ms.slock == lock { ms.slock = nil } - return } // memStorage is a memory-backed storage. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go index 4e4a724258d..b385fc6faf7 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -59,8 +59,9 @@ func isCorrupted(err error) bool { switch err.(type) { case *ErrCorrupted: return true + default: + return false } - return false } func (e *ErrCorrupted) Error() string { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go index 884be5d3133..d0fab40c40c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -88,18 +88,6 @@ type tFiles []*tFile func (tf tFiles) Len() int { return len(tf) } func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } -func (tf tFiles) nums() string { - x := "[ " - for i, f := range tf { - if i != 0 { - x += ", " - } - x += fmt.Sprint(f.fd.Num) - } - x += " ]" - return x -} - // Returns true if i smallest key is less than j. // This used for sort by key in ascending order. func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { @@ -360,13 +348,13 @@ type tOps struct { s *session noSync bool evictRemoved bool - cache *cache.Cache - bcache *cache.Cache - bpool *util.BufferPool + fileCache *cache.Cache + blockCache *cache.Cache + blockBuffer *util.BufferPool } // Creates an empty table and returns table writer. -func (t *tOps) create() (*tWriter, error) { +func (t *tOps) create(tSize int) (*tWriter, error) { fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()} fw, err := t.s.stor.Create(fd) if err != nil { @@ -376,20 +364,22 @@ func (t *tOps) create() (*tWriter, error) { t: t, fd: fd, w: fw, - tw: table.NewWriter(fw, t.s.o.Options), + tw: table.NewWriter(fw, t.s.o.Options, t.blockBuffer, tSize), }, nil } // Builds table from src iterator. func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { - w, err := t.create() + w, err := t.create(0) if err != nil { return } defer func() { if err != nil { - w.drop() + if derr := w.drop(); derr != nil { + err = fmt.Errorf("error createFrom (%v); error dropping (%v)", err, derr) + } } }() @@ -412,22 +402,22 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { // Opens table. It returns a cache handle, which should // be released after use. func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { - ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { + ch = t.fileCache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { var r storage.Reader r, err = t.s.stor.Open(f.fd) if err != nil { return 0, nil } - var bcache *cache.NamespaceGetter - if t.bcache != nil { - bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)} + var blockCache *cache.NamespaceGetter + if t.blockCache != nil { + blockCache = &cache.NamespaceGetter{Cache: t.blockCache, NS: uint64(f.fd.Num)} } var tr *table.Reader - tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options) + tr, err = table.NewReader(r, f.size, f.fd, blockCache, t.blockBuffer, t.s.o.Options) if err != nil { - r.Close() + _ = r.Close() return 0, nil } return 1, tr @@ -484,14 +474,14 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. func (t *tOps) remove(fd storage.FileDesc) { - t.cache.Delete(0, uint64(fd.Num), func() { + t.fileCache.Delete(0, uint64(fd.Num), func() { if err := t.s.stor.Remove(fd); err != nil { t.s.logf("table@remove removing @%d %q", fd.Num, err) } else { t.s.logf("table@remove removed @%d", fd.Num) } - if t.evictRemoved && t.bcache != nil { - t.bcache.EvictNS(uint64(fd.Num)) + if t.evictRemoved && t.blockCache != nil { + t.blockCache.EvictNS(uint64(fd.Num)) } // Try to reuse file num, useful for discarded transaction. t.s.reuseFileNum(fd.Num) @@ -501,40 +491,39 @@ func (t *tOps) remove(fd storage.FileDesc) { // Closes the table ops instance. It will close all tables, // regadless still used or not. func (t *tOps) close() { - t.bpool.Close() - t.cache.Close() - if t.bcache != nil { - t.bcache.CloseWeak() + t.fileCache.Close(true) + if t.blockCache != nil { + t.blockCache.Close(false) } } // Creates new initialized table ops instance. func newTableOps(s *session) *tOps { var ( - cacher cache.Cacher - bcache *cache.Cache - bpool *util.BufferPool + fileCacher cache.Cacher + blockCache *cache.Cache + blockBuffer *util.BufferPool ) if s.o.GetOpenFilesCacheCapacity() > 0 { - cacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity()) + fileCacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity()) } if !s.o.GetDisableBlockCache() { - var bcacher cache.Cacher + var blockCacher cache.Cacher if s.o.GetBlockCacheCapacity() > 0 { - bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity()) + blockCacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity()) } - bcache = cache.NewCache(bcacher) + blockCache = cache.NewCache(blockCacher) } if !s.o.GetDisableBufferPool() { - bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) + blockBuffer = util.NewBufferPool(s.o.GetBlockSize() + 5) } return &tOps{ s: s, noSync: s.o.GetNoSync(), evictRemoved: s.o.GetBlockCacheEvictRemoved(), - cache: cache.NewCache(cacher), - bcache: bcache, - bpool: bpool, + fileCache: cache.NewCache(fileCacher), + blockCache: blockCache, + blockBuffer: blockBuffer, } } @@ -553,7 +542,7 @@ type tWriter struct { // Append key/value pair to the table. func (w *tWriter) append(key, value []byte) error { if w.first == nil { - w.first = append([]byte{}, key...) + w.first = append([]byte(nil), key...) } w.last = append(w.last[:0], key...) return w.tw.Append(key, value) @@ -565,16 +554,27 @@ func (w *tWriter) empty() bool { } // Closes the storage.Writer. -func (w *tWriter) close() { +func (w *tWriter) close() error { if w.w != nil { - w.w.Close() + if err := w.w.Close(); err != nil { + return err + } w.w = nil } + return nil } // Finalizes the table and returns table file. func (w *tWriter) finish() (f *tFile, err error) { - defer w.close() + defer func() { + if cerr := w.close(); cerr != nil { + if err == nil { + err = cerr + } else { + err = fmt.Errorf("error opening file (%v); error unlocking file (%v)", err, cerr) + } + } + }() err = w.tw.Close() if err != nil { return @@ -590,11 +590,16 @@ func (w *tWriter) finish() (f *tFile, err error) { } // Drops the table. -func (w *tWriter) drop() { - w.close() - w.t.s.stor.Remove(w.fd) - w.t.s.reuseFileNum(w.fd.Num) +func (w *tWriter) drop() error { + if err := w.close(); err != nil { + return err + } w.tw = nil w.first = nil w.last = nil + if err := w.t.s.stor.Remove(w.fd); err != nil { + return err + } + w.t.s.reuseFileNum(w.fd.Num) + return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index 496feb6fb45..8128794c227 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -901,7 +901,7 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo } else { // Value does use block buffer, and since the buffer will be // recycled, it need to be copied. - value = append([]byte{}, data.Value()...) + value = append([]byte(nil), data.Value()...) } } data.Release() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go index fda697bdbc6..ea89d600e71 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -40,7 +40,7 @@ type blockWriter struct { scratch []byte } -func (w *blockWriter) append(key, value []byte) { +func (w *blockWriter) append(key, value []byte) (err error) { nShared := 0 if w.nEntries%w.restartInterval == 0 { w.restarts = append(w.restarts, uint32(w.buf.Len())) @@ -50,14 +50,21 @@ func (w *blockWriter) append(key, value []byte) { n := binary.PutUvarint(w.scratch[0:], uint64(nShared)) n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared)) n += binary.PutUvarint(w.scratch[n:], uint64(len(value))) - w.buf.Write(w.scratch[:n]) - w.buf.Write(key[nShared:]) - w.buf.Write(value) + if _, err = w.buf.Write(w.scratch[:n]); err != nil { + return err + } + if _, err = w.buf.Write(key[nShared:]); err != nil { + return err + } + if _, err = w.buf.Write(value); err != nil { + return err + } w.prevKey = append(w.prevKey[:0], key...) w.nEntries++ + return nil } -func (w *blockWriter) finish() { +func (w *blockWriter) finish() error { // Write restarts entry. if w.nEntries == 0 { // Must have at least one restart entry. @@ -68,6 +75,7 @@ func (w *blockWriter) finish() { buf4 := w.buf.Alloc(4) binary.LittleEndian.PutUint32(buf4, x) } + return nil } func (w *blockWriter) reset() { @@ -109,9 +117,9 @@ func (w *filterWriter) flush(offset uint64) { } } -func (w *filterWriter) finish() { +func (w *filterWriter) finish() error { if w.generator == nil { - return + return nil } // Generate last keys. @@ -123,7 +131,7 @@ func (w *filterWriter) finish() { buf4 := w.buf.Alloc(4) binary.LittleEndian.PutUint32(buf4, x) } - w.buf.WriteByte(byte(w.baseLg)) + return w.buf.WriteByte(byte(w.baseLg)) } func (w *filterWriter) generate() { @@ -146,6 +154,7 @@ type Writer struct { compression opt.Compression blockSize int + bpool *util.BufferPool dataBlock blockWriter indexBlock blockWriter filterBlock filterWriter @@ -193,9 +202,9 @@ func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh b return } -func (w *Writer) flushPendingBH(key []byte) { +func (w *Writer) flushPendingBH(key []byte) error { if w.pendingBH.length == 0 { - return + return nil } var separator []byte if len(key) == 0 { @@ -210,15 +219,20 @@ func (w *Writer) flushPendingBH(key []byte) { } n := encodeBlockHandle(w.scratch[:20], w.pendingBH) // Append the block handle to the index block. - w.indexBlock.append(separator, w.scratch[:n]) + if err := w.indexBlock.append(separator, w.scratch[:n]); err != nil { + return err + } // Reset prev key of the data block. w.dataBlock.prevKey = w.dataBlock.prevKey[:0] // Clear pending block handle. w.pendingBH = blockHandle{} + return nil } func (w *Writer) finishBlock() error { - w.dataBlock.finish() + if err := w.dataBlock.finish(); err != nil { + return err + } bh, err := w.writeBlock(&w.dataBlock.buf, w.compression) if err != nil { return err @@ -244,9 +258,13 @@ func (w *Writer) Append(key, value []byte) error { return w.err } - w.flushPendingBH(key) + if err := w.flushPendingBH(key); err != nil { + return err + } // Append key/value pair to the data block. - w.dataBlock.append(key, value) + if err := w.dataBlock.append(key, value); err != nil { + return err + } // Add key to the filter block. w.filterBlock.add(key) @@ -285,6 +303,16 @@ func (w *Writer) BytesLen() int { // after Close, but calling BlocksLen, EntriesLen and BytesLen // is still possible. func (w *Writer) Close() error { + defer func() { + if w.bpool != nil { + // Buffer.Bytes() returns [offset:] of the buffer. + // We need to Reset() so that the offset = 0, resulting + // in buf.Bytes() returning the whole allocated bytes. + w.dataBlock.buf.Reset() + w.bpool.Put(w.dataBlock.buf.Bytes()) + } + }() + if w.err != nil { return w.err } @@ -297,11 +325,15 @@ func (w *Writer) Close() error { return w.err } } - w.flushPendingBH(nil) + if err := w.flushPendingBH(nil); err != nil { + return err + } // Write the filter block. var filterBH blockHandle - w.filterBlock.finish() + if err := w.filterBlock.finish(); err != nil { + return err + } if buf := &w.filterBlock.buf; buf.Len() > 0 { filterBH, w.err = w.writeBlock(buf, opt.NoCompression) if w.err != nil { @@ -313,9 +345,13 @@ func (w *Writer) Close() error { if filterBH.length > 0 { key := []byte("filter." + w.filter.Name()) n := encodeBlockHandle(w.scratch[:20], filterBH) - w.dataBlock.append(key, w.scratch[:n]) + if err := w.dataBlock.append(key, w.scratch[:n]); err != nil { + return err + } + } + if err := w.dataBlock.finish(); err != nil { + return err } - w.dataBlock.finish() metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression) if err != nil { w.err = err @@ -323,7 +359,9 @@ func (w *Writer) Close() error { } // Write the index block. - w.indexBlock.finish() + if err := w.indexBlock.finish(); err != nil { + return err + } indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression) if err != nil { w.err = err @@ -351,7 +389,15 @@ func (w *Writer) Close() error { // NewWriter creates a new initialized table writer for the file. // // Table writer is not safe for concurrent use. -func NewWriter(f io.Writer, o *opt.Options) *Writer { +func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer { + var bufBytes []byte + if pool == nil { + bufBytes = make([]byte, size) + } else { + bufBytes = pool.Get(size) + } + bufBytes = bufBytes[:0] + w := &Writer{ writer: f, cmp: o.GetComparer(), @@ -359,6 +405,8 @@ func NewWriter(f io.Writer, o *opt.Options) *Writer { compression: o.GetCompression(), blockSize: o.GetBlockSize(), comparerScratch: make([]byte, 0), + bpool: pool, + dataBlock: blockWriter{buf: *util.NewBuffer(bufBytes)}, } // data block w.dataBlock.restartInterval = o.GetBlockRestartInterval() diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util.go b/vendor/github.com/syndtr/goleveldb/leveldb/util.go index 0e2b519e5c7..1ef859d4db1 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/util.go @@ -22,7 +22,7 @@ func shorten(str string) string { var bunits = [...]string{"", "Ki", "Mi", "Gi", "Ti"} -func shortenb(bytes int) string { +func shortenb(bytes int64) string { i := 0 for ; bytes > 1024 && i < 4; i++ { bytes /= 1024 @@ -30,7 +30,7 @@ func shortenb(bytes int) string { return fmt.Sprintf("%d%sB", bytes, bunits[i]) } -func sshortenb(bytes int) string { +func sshortenb(bytes int64) string { if bytes == 0 { return "~" } @@ -58,13 +58,6 @@ func sint(x int) string { return fmt.Sprintf("%s%d", sign, x) } -func minInt(a, b int) int { - if a < b { - return a - } - return b -} - func maxInt(a, b int) int { if a > b { return a diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index 5ab1f86825a..4f512f6d3fe 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -10,30 +10,15 @@ import ( "fmt" "sync" "sync/atomic" - "time" ) -type buffer struct { - b []byte - miss int -} - // BufferPool is a 'buffer pool'. type BufferPool struct { - pool [6]chan []byte - size [5]uint32 - sizeMiss [5]uint32 - sizeHalf [5]uint32 - baseline [4]int - baseline0 int - - mu sync.RWMutex - closed bool - closeC chan struct{} + pool [6]sync.Pool + baseline [5]int get uint32 put uint32 - half uint32 less uint32 equal uint32 greater uint32 @@ -41,15 +26,12 @@ type BufferPool struct { } func (p *BufferPool) poolNum(n int) int { - if n <= p.baseline0 && n > p.baseline0/2 { - return 0 - } for i, x := range p.baseline { if n <= x { - return i + 1 + return i } } - return len(p.baseline) + 1 + return len(p.baseline) } // Get returns buffer with length of n. @@ -57,101 +39,47 @@ func (p *BufferPool) Get(n int) []byte { if p == nil { return make([]byte, n) } + atomic.AddUint32(&p.get, 1) - p.mu.RLock() - defer p.mu.RUnlock() + poolNum := p.poolNum(n) - if p.closed { - return make([]byte, n) - } + b := p.pool[poolNum].Get().(*[]byte) - atomic.AddUint32(&p.get, 1) + if cap(*b) == 0 { + // If we grabbed nothing, increment the miss stats. + atomic.AddUint32(&p.miss, 1) - poolNum := p.poolNum(n) - pool := p.pool[poolNum] - if poolNum == 0 { - // Fast path. - select { - case b := <-pool: - switch { - case cap(b) > n: - if cap(b)-n >= n { - atomic.AddUint32(&p.half, 1) - select { - case pool <- b: - default: - } - return make([]byte, n) - } else { - atomic.AddUint32(&p.less, 1) - return b[:n] - } - case cap(b) == n: - atomic.AddUint32(&p.equal, 1) - return b[:n] - default: - atomic.AddUint32(&p.greater, 1) - } - default: - atomic.AddUint32(&p.miss, 1) + if poolNum == len(p.baseline) { + *b = make([]byte, n) + return *b } - return make([]byte, n, p.baseline0) + *b = make([]byte, p.baseline[poolNum]) + *b = (*b)[:n] + return *b } else { - sizePtr := &p.size[poolNum-1] - - select { - case b := <-pool: - switch { - case cap(b) > n: - if cap(b)-n >= n { - atomic.AddUint32(&p.half, 1) - sizeHalfPtr := &p.sizeHalf[poolNum-1] - if atomic.AddUint32(sizeHalfPtr, 1) == 20 { - atomic.StoreUint32(sizePtr, uint32(cap(b)/2)) - atomic.StoreUint32(sizeHalfPtr, 0) - } else { - select { - case pool <- b: - default: - } - } - return make([]byte, n) - } else { - atomic.AddUint32(&p.less, 1) - return b[:n] - } - case cap(b) == n: - atomic.AddUint32(&p.equal, 1) - return b[:n] - default: - atomic.AddUint32(&p.greater, 1) - if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) { - select { - case pool <- b: - default: - } - } - } - default: - atomic.AddUint32(&p.miss, 1) + // If there is enough capacity in the bytes grabbed, resize the length + // to n and return. + if n < cap(*b) { + atomic.AddUint32(&p.less, 1) + *b = (*b)[:n] + return *b + } else if n == cap(*b) { + atomic.AddUint32(&p.equal, 1) + *b = (*b)[:n] + return *b + } else if n > cap(*b) { + atomic.AddUint32(&p.greater, 1) } + } - if size := atomic.LoadUint32(sizePtr); uint32(n) > size { - if size == 0 { - atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n)) - } else { - sizeMissPtr := &p.sizeMiss[poolNum-1] - if atomic.AddUint32(sizeMissPtr, 1) == 20 { - atomic.StoreUint32(sizePtr, uint32(n)) - atomic.StoreUint32(sizeMissPtr, 0) - } - } - return make([]byte, n) - } else { - return make([]byte, n, size) - } + if poolNum == len(p.baseline) { + *b = make([]byte, n) + return *b } + *b = make([]byte, p.baseline[poolNum]) + *b = (*b)[:n] + return *b } // Put adds given buffer to the pool. @@ -160,68 +88,18 @@ func (p *BufferPool) Put(b []byte) { return } - p.mu.RLock() - defer p.mu.RUnlock() - - if p.closed { - return - } + poolNum := p.poolNum(cap(b)) atomic.AddUint32(&p.put, 1) - - pool := p.pool[p.poolNum(cap(b))] - select { - case pool <- b: - default: - } - -} - -func (p *BufferPool) Close() { - if p == nil { - return - } - - p.mu.Lock() - if !p.closed { - p.closed = true - p.closeC <- struct{}{} - } - p.mu.Unlock() + p.pool[poolNum].Put(&b) } func (p *BufferPool) String() string { if p == nil { return "" } - - p.mu.Lock() - defer p.mu.Unlock() - - return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}", - p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) -} - -func (p *BufferPool) drain() { - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - for _, ch := range p.pool { - select { - case <-ch: - default: - } - } - case <-p.closeC: - close(p.closeC) - for _, ch := range p.pool { - close(ch) - } - return - } - } + return fmt.Sprintf("BufferPool{B·%d G·%d P·%d <·%d =·%d >·%d M·%d}", + p.baseline, p.get, p.put, p.less, p.equal, p.greater, p.miss) } // NewBufferPool creates a new initialized 'buffer pool'. @@ -229,14 +107,29 @@ func NewBufferPool(baseline int) *BufferPool { if baseline <= 0 { panic("baseline can't be <= 0") } - p := &BufferPool{ - baseline0: baseline, - baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4}, - closeC: make(chan struct{}, 1), + bufPool := &BufferPool{ + baseline: [...]int{baseline / 4, baseline / 2, baseline, baseline * 2, baseline * 4}, + pool: [6]sync.Pool{ + { + New: func() interface{} { return new([]byte) }, + }, + { + New: func() interface{} { return new([]byte) }, + }, + { + New: func() interface{} { return new([]byte) }, + }, + { + New: func() interface{} { return new([]byte) }, + }, + { + New: func() interface{} { return new([]byte) }, + }, + { + New: func() interface{} { return new([]byte) }, + }, + }, } - for i, cap := range []int{2, 2, 4, 4, 2, 1} { - p.pool[i] = make(chan []byte, cap) - } - go p.drain() - return p + + return bufPool } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index 9535e359145..467250917b2 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -43,7 +43,7 @@ type version struct { // newVersion creates a new version with an unique monotonous increasing id. func newVersion(s *session) *version { - id := atomic.AddInt64(&s.ntVersionId, 1) + id := atomic.AddInt64(&s.ntVersionID, 1) nv := &version{s: s, id: id - 1} return nv } @@ -388,7 +388,7 @@ func (v *version) computeCompaction() { } statFiles[level] = len(tables) - statSizes[level] = shortenb(int(size)) + statSizes[level] = shortenb(size) statScore[level] = fmt.Sprintf("%.2f", score) statTotSize += size } @@ -396,7 +396,7 @@ func (v *version) computeCompaction() { v.cLevel = bestLevel v.cScore = bestScore - v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(int(statTotSize)), statSizes, statScore) + v.s.logf("version@stat F·%v S·%s%v Sc·%v", statFiles, shortenb(statTotSize), statSizes, statScore) } func (v *version) needCompaction() bool { diff --git a/vendor/modules.txt b/vendor/modules.txt index ba069a32ca1..74d778eada8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -488,7 +488,7 @@ github.com/subosito/gotenv # github.com/sykesm/zap-logfmt v0.0.4 ## explicit; go 1.13 github.com/sykesm/zap-logfmt -# github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 +# github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d ## explicit; go 1.14 github.com/syndtr/goleveldb/leveldb github.com/syndtr/goleveldb/leveldb/cache