From c810dca48886d4f4ac99d598d0e591c3378fcfb4 Mon Sep 17 00:00:00 2001 From: manish Date: Mon, 8 Mar 2021 12:08:47 -0500 Subject: [PATCH] This commit upgrades goleveldb. This upgraded version includes a fix for the manifest file corruption issue [FAB-18304]. Backport of PR#2463 Signed-off-by: manish --- Gopkg.lock | 4 +- .../syndtr/goleveldb/leveldb/batch.go | 5 + .../syndtr/goleveldb/leveldb/cache/cache.go | 1 - .../leveldb/comparer/bytes_comparer.go | 4 +- .../goleveldb/leveldb/comparer/comparer.go | 2 +- .../github.com/syndtr/goleveldb/leveldb/db.go | 52 +++- .../syndtr/goleveldb/leveldb/db_compaction.go | 37 +-- .../syndtr/goleveldb/leveldb/db_iter.go | 43 +-- .../syndtr/goleveldb/leveldb/db_snapshot.go | 26 +- .../goleveldb/leveldb/db_transaction.go | 20 +- .../syndtr/goleveldb/leveldb/db_util.go | 2 +- .../syndtr/goleveldb/leveldb/filter/bloom.go | 2 +- .../syndtr/goleveldb/leveldb/iterator/iter.go | 4 +- .../syndtr/goleveldb/leveldb/memdb/memdb.go | 4 + .../syndtr/goleveldb/leveldb/opt/options.go | 59 ++++- .../goleveldb/leveldb/opt/options_darwin.go | 7 + .../goleveldb/leveldb/opt/options_default.go | 7 + .../syndtr/goleveldb/leveldb/session.go | 57 +++- .../goleveldb/leveldb/session_compaction.go | 49 +++- .../syndtr/goleveldb/leveldb/session_util.go | 250 ++++++++++++++++-- .../leveldb/storage/file_storage_windows.go | 2 +- .../syndtr/goleveldb/leveldb/table.go | 121 +++++++-- .../syndtr/goleveldb/leveldb/table/reader.go | 4 + .../syndtr/goleveldb/leveldb/table/table.go | 4 - .../syndtr/goleveldb/leveldb/table/writer.go | 6 +- .../syndtr/goleveldb/leveldb/util/buffer.go | 133 ++++++---- .../goleveldb/leveldb/util/buffer_pool.go | 3 + .../syndtr/goleveldb/leveldb/version.go | 81 ++++-- 28 files changed, 761 insertions(+), 228 deletions(-) create mode 100644 vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go create mode 100644 vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go diff --git a/Gopkg.lock b/Gopkg.lock index 3b588f5e185..3388927d075 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -680,7 +680,7 @@ [[projects]] branch = "master" - digest = "1:f2ffd421680b0a3f7887501b3c6974bcf19217ecd301d0e2c9b681940ec363d5" + digest = "1:cbeb9cb3b34b212ee915695cb9a07d4b71d2281ef1b428d427ae8bdb5a9b3fd5" name = "github.com/syndtr/goleveldb" packages = [ "leveldb", @@ -697,7 +697,7 @@ "leveldb/util", ] pruneopts = "NUT" - revision = "ae2bd5eed72d46b28834ec3f60db3a3ebedd8dbd" + revision = "64b5b1c739545ed311fb9d9924d19d188fabdc83" [[projects]] branch = "master" diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go index 225920002df..823be93f93c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/batch.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/batch.go @@ -238,6 +238,11 @@ func newBatch() interface{} { return &Batch{} } +// MakeBatch returns empty batch with preallocated buffer. +func MakeBatch(n int) *Batch { + return &Batch{data: make([]byte, 0, n)} +} + func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error { var index batchIndex for i, o := 0, 0; o < len(data); i++ { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go index c5940b232cd..c36ad323597 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -331,7 +331,6 @@ func (r *Cache) delete(n *Node) bool { return deleted } } - return false } // Nodes returns number of 'cache node' in the map. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/comparer/bytes_comparer.go b/vendor/github.com/syndtr/goleveldb/leveldb/comparer/bytes_comparer.go index 14dddf88dd2..abf9fb65c7a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/comparer/bytes_comparer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/comparer/bytes_comparer.go @@ -29,7 +29,7 @@ func (bytesComparer) Separator(dst, a, b []byte) []byte { // Do not shorten if one string is a prefix of the other } else if c := a[i]; c < 0xff && c+1 < b[i] { dst = append(dst, a[:i+1]...) - dst[i]++ + dst[len(dst)-1]++ return dst } return nil @@ -39,7 +39,7 @@ func (bytesComparer) Successor(dst, b []byte) []byte { for i, c := range b { if c != 0xff { dst = append(dst, b[:i+1]...) - dst[i]++ + dst[len(dst)-1]++ return dst } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/comparer/comparer.go b/vendor/github.com/syndtr/goleveldb/leveldb/comparer/comparer.go index 14a28f16fce..2c522db23b9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/comparer/comparer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/comparer/comparer.go @@ -36,7 +36,7 @@ type Comparer interface { // by any users of this package. Name() string - // Bellow are advanced functions used used to reduce the space requirements + // Bellow are advanced functions used to reduce the space requirements // for internal data structures such as index blocks. // Separator appends a sequence of bytes x to dst such that a <= x && x < b, diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index e7ac0654187..74e9826956d 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -38,6 +38,12 @@ type DB struct { inWritePaused int32 // The indicator whether write operation is paused by compaction aliveSnaps, aliveIters int32 + // Compaction statistic + memComp uint32 // The cumulative number of memory compaction + level0Comp uint32 // The cumulative number of level0 compaction + nonLevel0Comp uint32 // The cumulative number of non-level0 compaction + seekComp uint32 // The cumulative number of seek compaction + // Session. s *session @@ -182,7 +188,7 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { err = s.recover() if err != nil { - if !os.IsNotExist(err) || s.o.GetErrorIfMissing() { + if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() { return } err = s.create() @@ -468,7 +474,7 @@ func recoverTable(s *session, o *opt.Options) error { } // Commit. - return s.commit(rec) + return s.commit(rec, false) } func (db *DB) recoverJournal() error { @@ -538,7 +544,7 @@ func (db *DB) recoverJournal() error { rec.setJournalNum(fd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec); err != nil { + if err := db.s.commit(rec, false); err != nil { fr.Close() return err } @@ -617,7 +623,7 @@ func (db *DB) recoverJournal() error { // Commit. rec.setJournalNum(db.journalFd.Num) rec.setSeqNum(db.seq) - if err := db.s.commit(rec); err != nil { + if err := db.s.commit(rec, false); err != nil { // Close journal on error. if db.journal != nil { db.journal.Close() @@ -872,6 +878,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. @@ -953,15 +963,29 @@ func (db *DB) GetProperty(name string) (value string, err error) { value = "Compactions\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + "-------+------------+---------------+---------------+---------------+---------------\n" + var totalTables int + var totalSize, totalRead, totalWrite int64 + var totalDuration time.Duration for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) if len(tables) == 0 && duration == 0 { continue } + totalTables += len(tables) + totalSize += tables.size() + totalRead += read + totalWrite += write + totalDuration += duration value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), float64(read)/1048576.0, float64(write)/1048576.0) } + value += "-------+------------+---------------+---------------+---------------+---------------\n" + value += fmt.Sprintf(" Total | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", + totalTables, float64(totalSize)/1048576.0, totalDuration.Seconds(), + float64(totalRead)/1048576.0, float64(totalWrite)/1048576.0) + case p == "compcount": + value = fmt.Sprintf("MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d", atomic.LoadUint32(&db.memComp), atomic.LoadUint32(&db.level0Comp), atomic.LoadUint32(&db.nonLevel0Comp), atomic.LoadUint32(&db.seekComp)) case p == "iostats": value = fmt.Sprintf("Read(MB):%.5f Write(MB):%.5f", float64(db.s.stor.reads())/1048576.0, @@ -1013,11 +1037,16 @@ type DBStats struct { BlockCacheSize int OpenedTablesCount int - LevelSizes []int64 + LevelSizes Sizes LevelTablesCounts []int - LevelRead []int64 - LevelWrite []int64 + LevelRead Sizes + LevelWrite Sizes LevelDurations []time.Duration + + MemComp uint32 + Level0Comp uint32 + NonLevel0Comp uint32 + SeekComp uint32 } // Stats populates s with database statistics. @@ -1054,16 +1083,17 @@ func (db *DB) Stats(s *DBStats) error { for level, tables := range v.levels { duration, read, write := db.compStats.getStat(level) - if len(tables) == 0 && duration == 0 { - continue - } + s.LevelDurations = append(s.LevelDurations, duration) s.LevelRead = append(s.LevelRead, read) s.LevelWrite = append(s.LevelWrite, write) s.LevelSizes = append(s.LevelSizes, tables.size()) s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables)) } - + s.MemComp = atomic.LoadUint32(&db.memComp) + s.Level0Comp = atomic.LoadUint32(&db.level0Comp) + s.NonLevel0Comp = atomic.LoadUint32(&db.nonLevel0Comp) + s.SeekComp = atomic.LoadUint32(&db.seekComp) return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 28e50906adb..6b70eb2c9d3 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -8,6 +8,7 @@ package leveldb import ( "sync" + "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb/errors" @@ -260,7 +261,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) { db.compCommitLk.Lock() defer db.compCommitLk.Unlock() // Defer is necessary. db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error { - return db.s.commit(rec) + return db.s.commit(rec, true) }, nil) } @@ -324,10 +325,12 @@ func (db *DB) memCompaction() { db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) + // Save compaction stats for _, r := range rec.addedTables { stats.write += r.size } db.compStats.addStat(flushLevel, stats) + atomic.AddUint32(&db.memComp, 1) // Drop frozen memdb. db.dropFrozenMem() @@ -588,6 +591,14 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { for i := range stats { db.compStats.addStat(c.sourceLevel+1, &stats[i]) } + switch c.typ { + case level0Compaction: + atomic.AddUint32(&db.level0Comp, 1) + case nonLevel0Compaction: + atomic.AddUint32(&db.nonLevel0Comp, 1) + case seekCompaction: + atomic.AddUint32(&db.seekComp, 1) + } } func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { @@ -663,7 +674,7 @@ type cCmd interface { } type cAuto struct { - // Note for table compaction, an empty ackC represents it's a compaction waiting command. + // Note for table compaction, an non-empty ackC represents it's a compaction waiting command. ackC chan<- error } @@ -777,8 +788,8 @@ func (db *DB) mCompaction() { func (db *DB) tCompaction() { var ( - x cCmd - ackQ, waitQ []cCmd + x cCmd + waitQ []cCmd ) defer func() { @@ -787,10 +798,6 @@ func (db *DB) tCompaction() { panic(x) } } - for i := range ackQ { - ackQ[i].ack(ErrClosed) - ackQ[i] = nil - } for i := range waitQ { waitQ[i].ack(ErrClosed) waitQ[i] = nil @@ -821,11 +828,6 @@ func (db *DB) tCompaction() { waitQ = waitQ[:0] } } else { - for i := range ackQ { - ackQ[i].ack(nil) - ackQ[i] = nil - } - ackQ = ackQ[:0] for i := range waitQ { waitQ[i].ack(nil) waitQ[i] = nil @@ -844,9 +846,12 @@ func (db *DB) tCompaction() { switch cmd := x.(type) { case cAuto: if cmd.ackC != nil { - waitQ = append(waitQ, x) - } else { - ackQ = append(ackQ, x) + // Check the write pause state before caching it. + if db.resumeWrite() { + x.ack(nil) + } else { + waitQ = append(waitQ, x) + } } case cRange: x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go index 03c24cdab50..e6e8ca59d08 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -78,13 +78,17 @@ func (db *DB) newIterator(auxm *memDB, auxt tFiles, seq uint64, slice *util.Rang } rawIter := db.newRawIterator(auxm, auxt, islice, ro) iter := &dbIter{ - db: db, - icmp: db.s.icmp, - iter: rawIter, - seq: seq, - strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), - key: make([]byte, 0), - value: make([]byte, 0), + db: db, + icmp: db.s.icmp, + iter: rawIter, + seq: seq, + strict: opt.GetStrict(db.s.o.Options, ro, opt.StrictReader), + disableSampling: db.s.o.GetDisableSeeksCompaction() || db.s.o.GetIteratorSamplingRate() <= 0, + key: make([]byte, 0), + value: make([]byte, 0), + } + if !iter.disableSampling { + iter.samplingGap = db.iterSamplingRate() } atomic.AddInt32(&db.aliveIters, 1) runtime.SetFinalizer(iter, (*dbIter).Release) @@ -107,13 +111,14 @@ const ( // dbIter represent an interator states over a database session. type dbIter struct { - db *DB - icmp *iComparer - iter iterator.Iterator - seq uint64 - strict bool - - smaplingGap int + db *DB + icmp *iComparer + iter iterator.Iterator + seq uint64 + strict bool + disableSampling bool + + samplingGap int dir dir key []byte value []byte @@ -122,10 +127,14 @@ type dbIter struct { } func (i *dbIter) sampleSeek() { + if i.disableSampling { + return + } + ikey := i.iter.Key() - i.smaplingGap -= len(ikey) + len(i.iter.Value()) - for i.smaplingGap < 0 { - i.smaplingGap += i.db.iterSamplingRate() + i.samplingGap -= len(ikey) + len(i.iter.Value()) + for i.samplingGap < 0 { + i.samplingGap += i.db.iterSamplingRate() i.db.sampleSeek(ikey) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 2c69d2e531d..56c457abe78 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -100,16 +100,16 @@ func (snap *Snapshot) String() string { // The caller should not modify the contents of the returned slice, but // it is safe to modify the contents of the argument after Get returns. func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { - err = snap.db.ok() - if err != nil { - return - } snap.mu.RLock() defer snap.mu.RUnlock() if snap.released { err = ErrSnapshotReleased return } + err = snap.db.ok() + if err != nil { + return + } return snap.db.get(nil, nil, key, snap.elem.seq, ro) } @@ -117,16 +117,16 @@ func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err er // // It is safe to modify the contents of the argument after Get returns. func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) { - err = snap.db.ok() - if err != nil { - return - } snap.mu.RLock() defer snap.mu.RUnlock() if snap.released { err = ErrSnapshotReleased return } + err = snap.db.ok() + if err != nil { + return + } return snap.db.has(nil, nil, key, snap.elem.seq, ro) } @@ -142,20 +142,24 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Value() methods), its content should not be +// modified unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // Releasing the snapshot doesn't mean releasing the iterator too, the // iterator would be still valid until released. // // Also read Iterator documentation of the leveldb/iterator package. func (snap *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - if err := snap.db.ok(); err != nil { - return iterator.NewEmptyIterator(err) - } snap.mu.Lock() defer snap.mu.Unlock() if snap.released { return iterator.NewEmptyIterator(ErrSnapshotReleased) } + if err := snap.db.ok(); err != nil { + return iterator.NewEmptyIterator(err) + } // Since iterator already hold version ref, it doesn't need to // hold snapshot ref. return snap.db.newIterator(nil, nil, snap.elem.seq, slice, ro) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go index b8f7e7d21df..21d1e512f34 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_transaction.go @@ -69,6 +69,13 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// The returned iterator has locks on its own resources, so it can live beyond +// the lifetime of the transaction who creates them. +// +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. @@ -205,7 +212,7 @@ func (tr *Transaction) Commit() error { tr.stats.startTimer() var cerr error for retry := 0; retry < 3; retry++ { - cerr = tr.db.s.commit(&tr.rec) + cerr = tr.db.s.commit(&tr.rec, false) if cerr != nil { tr.db.logf("transaction@commit error R·%d %q", retry, cerr) select { @@ -248,13 +255,14 @@ func (tr *Transaction) discard() { // Discard transaction. for _, t := range tr.tables { tr.db.logf("transaction@discard @%d", t.fd.Num) - if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil { - tr.db.s.reuseFileNum(t.fd.Num) - } + // Iterator may still use the table, so we use tOps.remove here. + tr.db.s.tops.remove(t.fd) } } // Discard discards the transaction. +// This method is noop if transaction is already closed (either committed or +// discarded) // // Other methods should not be called after transaction has been discarded. func (tr *Transaction) Discard() { @@ -278,8 +286,10 @@ func (db *DB) waitCompaction() error { // until in-flight transaction is committed or discarded. // The returned transaction handle is safe for concurrent use. // -// Transaction is expensive and can overwhelm compaction, especially if +// Transaction is very expensive and can overwhelm compaction, especially if // transaction size is small. Use with caution. +// The rule of thumb is if you need to merge at least same amount of +// `Options.WriteBuffer` worth of data then use transaction, otherwise don't. // // The transaction must be closed once done, either by committing or discarding // the transaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go index 7ecd960d2ce..3f0654894b4 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_util.go @@ -84,7 +84,7 @@ func (db *DB) checkAndCleanFiles() error { var mfds []storage.FileDesc for num, present := range tmap { if !present { - mfds = append(mfds, storage.FileDesc{storage.TypeTable, num}) + mfds = append(mfds, storage.FileDesc{Type: storage.TypeTable, Num: num}) db.logf("db@janitor table missing @%d", num) } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go index bab0e99705f..56ccbfbecab 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/filter/bloom.go @@ -16,7 +16,7 @@ func bloomHash(key []byte) uint32 { type bloomFilter int -// The bloom filter serializes its parameters and is backward compatible +// Name: The bloom filter serializes its parameters and is backward compatible // with respect to them. Therefor, its parameters are not added to its // name. func (bloomFilter) Name() string { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go index b16e3a70452..96fb0f6859c 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/iterator/iter.go @@ -40,11 +40,11 @@ type IteratorSeeker interface { Seek(key []byte) bool // Next moves the iterator to the next key/value pair. - // It returns whether the iterator is exhausted. + // It returns false if the iterator is exhausted. Next() bool // Prev moves the iterator to the previous key/value pair. - // It returns whether the iterator is exhausted. + // It returns false if the iterator is exhausted. Prev() bool } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index b661c08a93e..824e47f5f40 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -397,6 +397,10 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) { // DB. And a nil Range.Limit is treated as a key after all keys in // the DB. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go index 44e7d9adce4..dead5fdfbe0 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -37,10 +37,10 @@ var ( DefaultCompressionType = SnappyCompression DefaultIteratorSamplingRate = 1 * MiB DefaultOpenFilesCacher = LRUCacher - DefaultOpenFilesCacheCapacity = 500 DefaultWriteBuffer = 4 * MiB DefaultWriteL0PauseTrigger = 12 DefaultWriteL0SlowdownTrigger = 8 + DefaultFilterBaseLg = 11 ) // Cacher is a caching algorithm. @@ -53,14 +53,12 @@ type CacherFunc struct { } func (f *CacherFunc) New(capacity int) cache.Cacher { - if f.NewFunc != nil { + if f != nil && f.NewFunc != nil { return f.NewFunc(capacity) } return nil } -func noCacher(int) cache.Cacher { return nil } - var ( // LRUCacher is the LRU-cache algorithm. LRUCacher = &CacherFunc{cache.NewLRU} @@ -158,6 +156,12 @@ type Options struct { // The default value is 8MiB. BlockCacheCapacity int + // BlockCacheEvictRemoved allows enable forced-eviction on cached block belonging + // to removed 'sorted table'. + // + // The default if false. + BlockCacheEvictRemoved bool + // BlockRestartInterval is the number of keys between restart points for // delta encoding of keys. // @@ -272,6 +276,14 @@ type Options struct { // The default is false. DisableLargeBatchTransaction bool + // DisableSeeksCompaction allows disabling 'seeks triggered compaction'. + // The purpose of 'seeks triggered compaction' is to optimize database so + // that 'level seeks' can be minimized, however this might generate many + // small compaction which may not preferable. + // + // The default is false. + DisableSeeksCompaction bool + // ErrorIfExist defines whether an error should returned if the DB already // exist. // @@ -303,6 +315,8 @@ type Options struct { // IteratorSamplingRate defines approximate gap (in bytes) between read // sampling of an iterator. The samples will be used to determine when // compaction should be triggered. + // Use negative value to disable iterator sampling. + // The iterator sampling is disabled if DisableSeeksCompaction is true. // // The default is 1MiB. IteratorSamplingRate int @@ -326,7 +340,7 @@ type Options struct { // OpenFilesCacheCapacity defines the capacity of the open files caching. // Use -1 for zero, this has same effect as specifying NoCacher to OpenFilesCacher. // - // The default value is 500. + // The default value is 200 on MacOS and 500 on other. OpenFilesCacheCapacity int // If true then opens DB in read-only mode. @@ -357,6 +371,11 @@ type Options struct { // // The default value is 8. WriteL0SlowdownTrigger int + + // FilterBaseLg is the log size for filter block to create a bloom filter. + // + // The default value is 11(as well as 2KB) + FilterBaseLg int } func (o *Options) GetAltFilters() []filter.Filter { @@ -369,8 +388,6 @@ func (o *Options) GetAltFilters() []filter.Filter { func (o *Options) GetBlockCacher() Cacher { if o == nil || o.BlockCacher == nil { return DefaultBlockCacher - } else if o.BlockCacher == NoCacher { - return nil } return o.BlockCacher } @@ -384,6 +401,13 @@ func (o *Options) GetBlockCacheCapacity() int { return o.BlockCacheCapacity } +func (o *Options) GetBlockCacheEvictRemoved() bool { + if o == nil { + return false + } + return o.BlockCacheEvictRemoved +} + func (o *Options) GetBlockRestartInterval() int { if o == nil || o.BlockRestartInterval <= 0 { return DefaultBlockRestartInterval @@ -513,6 +537,13 @@ func (o *Options) GetDisableLargeBatchTransaction() bool { return o.DisableLargeBatchTransaction } +func (o *Options) GetDisableSeeksCompaction() bool { + if o == nil { + return false + } + return o.DisableSeeksCompaction +} + func (o *Options) GetErrorIfExist() bool { if o == nil { return false @@ -535,8 +566,10 @@ func (o *Options) GetFilter() filter.Filter { } func (o *Options) GetIteratorSamplingRate() int { - if o == nil || o.IteratorSamplingRate <= 0 { + if o == nil || o.IteratorSamplingRate == 0 { return DefaultIteratorSamplingRate + } else if o.IteratorSamplingRate < 0 { + return 0 } return o.IteratorSamplingRate } @@ -559,9 +592,6 @@ func (o *Options) GetOpenFilesCacher() Cacher { if o == nil || o.OpenFilesCacher == nil { return DefaultOpenFilesCacher } - if o.OpenFilesCacher == NoCacher { - return nil - } return o.OpenFilesCacher } @@ -609,6 +639,13 @@ func (o *Options) GetWriteL0SlowdownTrigger() int { return o.WriteL0SlowdownTrigger } +func (o *Options) GetFilterBaseLg() int { + if o == nil || o.FilterBaseLg <= 0 { + return DefaultFilterBaseLg + } + return o.FilterBaseLg +} + // ReadOptions holds the optional parameters for 'read operation'. The // 'read operation' includes Get, Find and NewIterator. type ReadOptions struct { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go new file mode 100644 index 00000000000..67b820427fc --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_darwin.go @@ -0,0 +1,7 @@ +// +build darwin + +package opt + +var ( + DefaultOpenFilesCacheCapacity = 200 +) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go new file mode 100644 index 00000000000..97a14a892ac --- /dev/null +++ b/vendor/github.com/syndtr/goleveldb/leveldb/opt/options_default.go @@ -0,0 +1,7 @@ +// +build !darwin + +package opt + +var ( + DefaultOpenFilesCacheCapacity = 500 +) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index 3f391f93462..e143352176e 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -47,15 +47,24 @@ type session struct { o *cachedOptions icmp *iComparer tops *tOps - fileRef map[int64]int manifest *journal.Writer manifestWriter storage.Writer manifestFd storage.FileDesc - stCompPtrs []internalKey // compaction pointers; need external synchronization - stVersion *version // current version - vmu sync.Mutex + stCompPtrs []internalKey // compaction pointers; need external synchronization + stVersion *version // current version + ntVersionId int64 // next version id to assign + refCh chan *vTask + relCh chan *vTask + deltaCh chan *vDelta + abandon chan int64 + closeC chan struct{} + closeW sync.WaitGroup + vmu sync.Mutex + + // Testing fields + fileRefCh chan chan map[int64]int // channel used to pass current reference stat } // Creates new initialized session instance. @@ -68,13 +77,21 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { return } s = &session{ - stor: newIStorage(stor), - storLock: storLock, - fileRef: make(map[int64]int), + stor: newIStorage(stor), + storLock: storLock, + refCh: make(chan *vTask), + relCh: make(chan *vTask), + deltaCh: make(chan *vDelta), + abandon: make(chan int64), + fileRefCh: make(chan chan map[int64]int), + closeC: make(chan struct{}), } s.setOptions(o) s.tops = newTableOps(s) - s.setVersion(newVersion(s)) + + s.closeW.Add(1) + go s.refLoop() + s.setVersion(nil, newVersion(s)) s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed") return } @@ -90,7 +107,11 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.setVersion(&version{s: s, closing: true}) + s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionId}) + + // Close all background goroutines + close(s.closeC) + s.closeW.Wait() } // Release session lock. @@ -111,7 +132,7 @@ func (s *session) recover() (err error) { // Don't return os.ErrNotExist if the underlying storage contains // other files that belong to LevelDB. So the DB won't get trashed. if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 { - err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}} + err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")} } } }() @@ -180,19 +201,27 @@ func (s *session) recover() (err error) { } s.manifestFd = fd - s.setVersion(staging.finish()) + s.setVersion(rec, staging.finish(false)) s.setNextFileNum(rec.nextFileNum) s.recordCommited(rec) return nil } // Commit session; need external synchronization. -func (s *session) commit(r *sessionRecord) (err error) { +func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() // spawn new version based on current version - nv := v.spawn(r) + nv := v.spawn(r, trivial) + + // abandon useless version id to prevent blocking version processing loop. + defer func() { + if err != nil { + s.abandon <- nv.id + s.logf("commit@abandon useless vid D%d", nv.id) + } + }() if s.manifest == nil { // manifest journal writer not yet created, create one @@ -203,7 +232,7 @@ func (s *session) commit(r *sessionRecord) (err error) { // finally, apply new version if no error rise if err == nil { - s.setVersion(nv) + s.setVersion(r, nv) } return diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go index 089cd00b26d..b46a3e45366 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_compaction.go @@ -7,6 +7,7 @@ package leveldb import ( + "sort" "sync/atomic" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -14,6 +15,13 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" ) +const ( + undefinedCompaction = iota + level0Compaction + nonLevel0Compaction + seekCompaction +) + func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int { v := s.version() defer v.release() @@ -50,31 +58,40 @@ func (s *session) pickCompaction() *compaction { var sourceLevel int var t0 tFiles + var typ int if v.cScore >= 1 { sourceLevel = v.cLevel cptr := s.getCompPtr(sourceLevel) tables := v.levels[sourceLevel] - for _, t := range tables { - if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { - t0 = append(t0, t) - break + if cptr != nil && sourceLevel > 0 { + n := len(tables) + if i := sort.Search(n, func(i int) bool { + return s.icmp.Compare(tables[i].imax, cptr) > 0 + }); i < n { + t0 = append(t0, tables[i]) } } if len(t0) == 0 { t0 = append(t0, tables[0]) } + if sourceLevel == 0 { + typ = level0Compaction + } else { + typ = nonLevel0Compaction + } } else { if p := atomic.LoadPointer(&v.cSeek); p != nil { ts := (*tSet)(p) sourceLevel = ts.level t0 = append(t0, ts.table) + typ = seekCompaction } else { v.release() return nil } } - return newCompaction(s, v, sourceLevel, t0) + return newCompaction(s, v, sourceLevel, t0, typ) } // Create compaction from given level and range; need external synchronization. @@ -109,13 +126,18 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit } } - return newCompaction(s, v, sourceLevel, t0) + typ := level0Compaction + if sourceLevel != 0 { + typ = nonLevel0Compaction + } + return newCompaction(s, v, sourceLevel, t0, typ) } -func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles) *compaction { +func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction { c := &compaction{ s: s, v: v, + typ: typ, sourceLevel: sourceLevel, levels: [2]tFiles{t0, nil}, maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)), @@ -131,6 +153,7 @@ type compaction struct { s *session v *version + typ int sourceLevel int levels [2]tFiles maxGPOverlaps int64 @@ -181,10 +204,14 @@ func (c *compaction) expand() { t0, t1 := c.levels[0], c.levels[1] imin, imax := t0.getRange(c.s.icmp) - // We expand t0 here just incase ukey hop across tables. - t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) - if len(t0) != len(c.levels[0]) { - imin, imax = t0.getRange(c.s.icmp) + + // For non-zero levels, the ukey can't hop across tables at all. + if c.sourceLevel == 0 { + // We expand t0 here just incase ukey hop across tables. + t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0) + if len(t0) != len(c.levels[0]) { + imin, imax = t0.getRange(c.s.icmp) + } } t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) // Get entire range covered by compaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go index 92328933cc6..730bd2cd347 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -9,6 +9,7 @@ package leveldb import ( "fmt" "sync/atomic" + "time" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/storage" @@ -36,22 +37,216 @@ func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf func (s *session) newTemp() storage.FileDesc { num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 - return storage.FileDesc{storage.TypeTemp, num} + return storage.FileDesc{Type: storage.TypeTemp, Num: num} } -func (s *session) addFileRef(fd storage.FileDesc, ref int) int { - ref += s.fileRef[fd.Num] - if ref > 0 { - s.fileRef[fd.Num] = ref - } else if ref == 0 { - delete(s.fileRef, fd.Num) - } else { - panic(fmt.Sprintf("negative ref: %v", fd)) - } - return ref +// Session state. + +const ( + // maxCachedNumber represents the maximum number of version tasks + // that can be cached in the ref loop. + maxCachedNumber = 256 + + // maxCachedTime represents the maximum time for ref loop to cache + // a version task. + maxCachedTime = 5 * time.Minute +) + +// vDelta indicates the change information between the next version +// and the currently specified version +type vDelta struct { + vid int64 + added []int64 + deleted []int64 } -// Session state. +// vTask defines a version task for either reference or release. +type vTask struct { + vid int64 + files []tFiles + created time.Time +} + +func (s *session) refLoop() { + var ( + fileRef = make(map[int64]int) // Table file reference counter + ref = make(map[int64]*vTask) // Current referencing version store + deltas = make(map[int64]*vDelta) + referenced = make(map[int64]struct{}) + released = make(map[int64]*vDelta) // Released version that waiting for processing + abandoned = make(map[int64]struct{}) // Abandoned version id + next, last int64 + ) + // addFileRef adds file reference counter with specified file number and + // reference value + addFileRef := func(fnum int64, ref int) int { + ref += fileRef[fnum] + if ref > 0 { + fileRef[fnum] = ref + } else if ref == 0 { + delete(fileRef, fnum) + } else { + panic(fmt.Sprintf("negative ref: %v", fnum)) + } + return ref + } + // skipAbandoned skips useless abandoned version id. + skipAbandoned := func() bool { + if _, exist := abandoned[next]; exist { + delete(abandoned, next) + return true + } + return false + } + // applyDelta applies version change to current file reference. + applyDelta := func(d *vDelta) { + for _, t := range d.added { + addFileRef(t, 1) + } + for _, t := range d.deleted { + if addFileRef(t, -1) == 0 { + s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t}) + } + } + } + + timer := time.NewTimer(0) + <-timer.C // discard the initial tick + defer timer.Stop() + + // processTasks processes version tasks in strict order. + // + // If we want to use delta to reduce the cost of file references and dereferences, + // we must strictly follow the id of the version, otherwise some files that are + // being referenced will be deleted. + // + // In addition, some db operations (such as iterators) may cause a version to be + // referenced for a long time. In order to prevent such operations from blocking + // the entire processing queue, we will properly convert some of the version tasks + // into full file references and releases. + processTasks := func() { + timer.Reset(maxCachedTime) + // Make sure we don't cache too many version tasks. + for { + // Skip any abandoned version number to prevent blocking processing. + if skipAbandoned() { + next += 1 + continue + } + // Don't bother the version that has been released. + if _, exist := released[next]; exist { + break + } + // Ensure the specified version has been referenced. + if _, exist := ref[next]; !exist { + break + } + if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime { + break + } + // Convert version task into full file references and releases mode. + // Reference version(i+1) first and wait version(i) to release. + // FileRef(i+1) = FileRef(i) + Delta(i) + for _, tt := range ref[next].files { + for _, t := range tt { + addFileRef(t.fd.Num, 1) + } + } + // Note, if some compactions take a long time, even more than 5 minutes, + // we may miss the corresponding delta information here. + // Fortunately it will not affect the correctness of the file reference, + // and we can apply the delta once we receive it. + if d := deltas[next]; d != nil { + applyDelta(d) + } + referenced[next] = struct{}{} + delete(ref, next) + delete(deltas, next) + next += 1 + } + + // Use delta information to process all released versions. + for { + if skipAbandoned() { + next += 1 + continue + } + if d, exist := released[next]; exist { + if d != nil { + applyDelta(d) + } + delete(released, next) + next += 1 + continue + } + return + } + } + + for { + processTasks() + + select { + case t := <-s.refCh: + if _, exist := ref[t.vid]; exist { + panic("duplicate reference request") + } + ref[t.vid] = t + if t.vid > last { + last = t.vid + } + + case d := <-s.deltaCh: + if _, exist := ref[d.vid]; !exist { + if _, exist2 := referenced[d.vid]; !exist2 { + panic("invalid release request") + } + // The reference opt is already expired, apply + // delta here. + applyDelta(d) + continue + } + deltas[d.vid] = d + + case t := <-s.relCh: + if _, exist := referenced[t.vid]; exist { + for _, tt := range t.files { + for _, t := range tt { + if addFileRef(t.fd.Num, -1) == 0 { + s.tops.remove(t.fd) + } + } + } + delete(referenced, t.vid) + continue + } + if _, exist := ref[t.vid]; !exist { + panic("invalid release request") + } + released[t.vid] = deltas[t.vid] + delete(deltas, t.vid) + delete(ref, t.vid) + + case id := <-s.abandon: + if id >= next { + abandoned[id] = struct{}{} + } + + case <-timer.C: + + case r := <-s.fileRefCh: + ref := make(map[int64]int) + for f, c := range fileRef { + ref[f] = c + } + r <- ref + + case <-s.closeC: + s.closeW.Done() + return + } + } +} // Get current version. This will incr version ref, must call // version.release (exactly once) after use. @@ -69,13 +264,30 @@ func (s *session) tLen(level int) int { } // Set current version to v. -func (s *session) setVersion(v *version) { +func (s *session) setVersion(r *sessionRecord, v *version) { s.vmu.Lock() defer s.vmu.Unlock() // Hold by session. It is important to call this first before releasing // current version, otherwise the still used files might get released. v.incref() if s.stVersion != nil { + if r != nil { + var ( + added = make([]int64, 0, len(r.addedTables)) + deleted = make([]int64, 0, len(r.deletedTables)) + ) + for _, t := range r.addedTables { + added = append(added, t.num) + } + for _, t := range r.deletedTables { + deleted = append(deleted, t.num) + } + select { + case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}: + case <-v.s.closeC: + s.log("reference loop already exist") + } + } // Release current version. s.stVersion.releaseNB() } @@ -96,7 +308,7 @@ func (s *session) setNextFileNum(num int64) { func (s *session) markFileNum(num int64) { nextFileNum := num + 1 for { - old, x := s.stNextFileNum, nextFileNum + old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum if old > x { x = old } @@ -114,7 +326,7 @@ func (s *session) allocFileNum() int64 { // Reuse given file number. func (s *session) reuseFileNum(num int64) { for { - old, x := s.stNextFileNum, num + old, x := atomic.LoadInt64(&s.stNextFileNum), num if old != x+1 { x = old } @@ -190,7 +402,7 @@ func (s *session) recordCommited(rec *sessionRecord) { // Create a new manifest file; need external synchronization. func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { - fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()} + fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()} writer, err := s.stor.Create(fd) if err != nil { return @@ -241,6 +453,12 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if err != nil { return } + if !s.o.GetNoSync() { + err = writer.Sync() + if err != nil { + return + } + } err = s.stor.SetMeta(fd) return } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go index 899335fd7e4..9bb33f27db1 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go @@ -37,7 +37,7 @@ func newFileLock(path string, readOnly bool) (fl fileLock, err error) { var access, shareMode uint32 if readOnly { access = syscall.GENERIC_READ - shareMode = syscall.FILE_SHARE_READ + shareMode = syscall.FILE_SHARE_READ | syscall.FILE_SHARE_WRITE } else { access = syscall.GENERIC_READ | syscall.GENERIC_WRITE } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go index adf773f13f8..884be5d3133 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,6 +7,7 @@ package leveldb import ( + "bytes" "fmt" "sort" "sync/atomic" @@ -78,7 +79,7 @@ func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFil } func tableFileFromRecord(r atRecord) *tFile { - return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax) + return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax) } // tFiles hold multiple tFile. @@ -150,6 +151,30 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { }) } +// Searches smallest index of tables whose its file number +// is smaller than the given number. +func (tf tFiles) searchNumLess(num int64) int { + return sort.Search(len(tf), func(i int) bool { + return tf[i].fd.Num < num + }) +} + +// Searches smallest index of tables whose its smallest +// key is after the given key. +func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { + return sort.Search(len(tf), func(i int) bool { + return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0 + }) +} + +// Searches smallest index of tables whose its largest +// key is after the given key. +func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int { + return sort.Search(len(tf), func(i int) bool { + return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0 + }) +} + // Returns true if given key range overlaps with one or more // tables key range. If unsorted is true then binary search will not be used. func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { @@ -181,6 +206,50 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo // expanded. // The dst content will be overwritten. func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { + // Short circuit if tf is empty + if len(tf) == 0 { + return nil + } + // For non-zero levels, there is no ukey hop across at all. + // And what's more, the files in these levels are strictly sorted, + // so use binary search instead of heavy traverse. + if !overlapped { + var begin, end int + // Determine the begin index of the overlapped file + if umin != nil { + index := tf.searchMinUkey(icmp, umin) + if index == 0 { + begin = 0 + } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 { + // The min ukey overlaps with the index-1 file, expand it. + begin = index - 1 + } else { + begin = index + } + } + // Determine the end index of the overlapped file + if umax != nil { + index := tf.searchMaxUkey(icmp, umax) + if index == len(tf) { + end = len(tf) + } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 { + // The max ukey overlaps with the index file, expand it. + end = index + 1 + } else { + end = index + } + } else { + end = len(tf) + } + // Ensure the overlapped file indexes are valid. + if begin >= end { + return nil + } + dst = make([]*tFile, end-begin) + copy(dst, tf[begin:end]) + return dst + } + dst = dst[:0] for i := 0; i < len(tf); { t := tf[i] @@ -193,11 +262,9 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { umax = t.imax.ukey() // Restart search if it is overlapped. - if overlapped { - dst = dst[:0] - i = 0 - continue - } + dst = dst[:0] + i = 0 + continue } dst = append(dst, t) @@ -290,16 +357,17 @@ func (x *tFilesSortByNum) Less(i, j int) bool { // Table operations. type tOps struct { - s *session - noSync bool - cache *cache.Cache - bcache *cache.Cache - bpool *util.BufferPool + s *session + noSync bool + evictRemoved bool + cache *cache.Cache + bcache *cache.Cache + bpool *util.BufferPool } // Creates an empty table and returns table writer. func (t *tOps) create() (*tWriter, error) { - fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()} + fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()} fw, err := t.s.stor.Create(fd) if err != nil { return nil, err @@ -415,16 +483,18 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // Removes table from persistent storage. It waits until // no one use the the table. -func (t *tOps) remove(f *tFile) { - t.cache.Delete(0, uint64(f.fd.Num), func() { - if err := t.s.stor.Remove(f.fd); err != nil { - t.s.logf("table@remove removing @%d %q", f.fd.Num, err) +func (t *tOps) remove(fd storage.FileDesc) { + t.cache.Delete(0, uint64(fd.Num), func() { + if err := t.s.stor.Remove(fd); err != nil { + t.s.logf("table@remove removing @%d %q", fd.Num, err) } else { - t.s.logf("table@remove removed @%d", f.fd.Num) + t.s.logf("table@remove removed @%d", fd.Num) } - if t.bcache != nil { - t.bcache.EvictNS(uint64(f.fd.Num)) + if t.evictRemoved && t.bcache != nil { + t.bcache.EvictNS(uint64(fd.Num)) } + // Try to reuse file num, useful for discarded transaction. + t.s.reuseFileNum(fd.Num) }) } @@ -446,7 +516,7 @@ func newTableOps(s *session) *tOps { bpool *util.BufferPool ) if s.o.GetOpenFilesCacheCapacity() > 0 { - cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) + cacher = s.o.GetOpenFilesCacher().New(s.o.GetOpenFilesCacheCapacity()) } if !s.o.GetDisableBlockCache() { var bcacher cache.Cacher @@ -459,11 +529,12 @@ func newTableOps(s *session) *tOps { bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) } return &tOps{ - s: s, - noSync: s.o.GetNoSync(), - cache: cache.NewCache(cacher), - bcache: bcache, - bpool: bpool, + s: s, + noSync: s.o.GetNoSync(), + evictRemoved: s.o.GetBlockCacheEvictRemoved(), + cache: cache.NewCache(cacher), + bcache: bcache, + bpool: bpool, } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index 16cfbaa0068..496feb6fb45 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -787,6 +787,10 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe // table. And a nil Range.Limit is treated as a key after all keys in // the table. // +// WARNING: Any slice returned by interator (e.g. slice returned by calling +// Iterator.Key() or Iterator.Key() methods), its content should not be modified +// unless noted otherwise. +// // The returned iterator is not safe for concurrent use and should be released // after use. // diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/table.go index beacdc1f024..29f80f8e3c9 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/table.go @@ -151,10 +151,6 @@ const ( // These constants are part of the file format and should not be changed. blockTypeNoCompression = 0 blockTypeSnappyCompression = 1 - - // Generate new filter every 2KB of data - filterBaseLg = 11 - filterBase = 1 << filterBaseLg ) type blockHandle struct { diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go index b96b271d8dd..fda697bdbc6 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -89,6 +89,7 @@ type filterWriter struct { buf util.Buffer nKeys int offsets []uint32 + baseLg uint } func (w *filterWriter) add(key []byte) { @@ -103,7 +104,7 @@ func (w *filterWriter) flush(offset uint64) { if w.generator == nil { return } - for x := int(offset / filterBase); x > len(w.offsets); { + for x := int(offset / uint64(1< len(w.offsets); { w.generate() } } @@ -122,7 +123,7 @@ func (w *filterWriter) finish() { buf4 := w.buf.Alloc(4) binary.LittleEndian.PutUint32(buf4, x) } - w.buf.WriteByte(filterBaseLg) + w.buf.WriteByte(byte(w.baseLg)) } func (w *filterWriter) generate() { @@ -369,6 +370,7 @@ func NewWriter(f io.Writer, o *opt.Options) *Writer { // filter block if w.filter != nil { w.filterBlock.generator = w.filter.NewGenerator() + w.filterBlock.baseLg = uint(o.GetFilterBaseLg()) w.filterBlock.flush(0) } return w diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer.go b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer.go index 21de242552d..c1007d60501 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer.go @@ -12,12 +12,15 @@ import ( "io" ) +// smallBufferSize is an initial allocation minimal capacity. +const smallBufferSize = 64 +const maxInt = int(^uint(0) >> 1) + // A Buffer is a variable-sized buffer of bytes with Read and Write methods. // The zero value for Buffer is an empty buffer ready to use. type Buffer struct { - buf []byte // contents are the bytes buf[off : len(buf)] - off int // read at &buf[off], write at &buf[len(buf)] - bootstrap [64]byte // memory to hold first slice; helps small buffers (Printf) avoid allocation. + buf []byte // contents are the bytes buf[off : len(buf)] + off int // read at &buf[off], write at &buf[len(buf)] } // Bytes returns a slice of the contents of the unread portion of the buffer; @@ -43,19 +46,33 @@ func (b *Buffer) Len() int { return len(b.buf) - b.off } // Truncate discards all but the first n unread bytes from the buffer. // It panics if n is negative or greater than the length of the buffer. func (b *Buffer) Truncate(n int) { - switch { - case n < 0 || n > b.Len(): + if n == 0 { + b.Reset() + return + } + if n < 0 || n > b.Len() { panic("leveldb/util.Buffer: truncation out of range") - case n == 0: - // Reuse buffer space. - b.off = 0 } - b.buf = b.buf[0 : b.off+n] + b.buf = b.buf[:b.off+n] } // Reset resets the buffer so it has no content. // b.Reset() is the same as b.Truncate(0). -func (b *Buffer) Reset() { b.Truncate(0) } +func (b *Buffer) Reset() { + b.buf = b.buf[:0] + b.off = 0 +} + +// tryGrowByReslice is a inlineable version of grow for the fast-case where the +// internal buffer only needs to be resliced. +// It returns the index where bytes should be written and whether it succeeded. +func (b *Buffer) tryGrowByReslice(n int) (int, bool) { + if l := len(b.buf); n <= cap(b.buf)-l { + b.buf = b.buf[:l+n] + return l, true + } + return 0, false +} // grow grows the buffer to guarantee space for n more bytes. // It returns the index where bytes should be written. @@ -64,29 +81,35 @@ func (b *Buffer) grow(n int) int { m := b.Len() // If buffer is empty, reset to recover space. if m == 0 && b.off != 0 { - b.Truncate(0) + b.Reset() } - if len(b.buf)+n > cap(b.buf) { - var buf []byte - if b.buf == nil && n <= len(b.bootstrap) { - buf = b.bootstrap[0:] - } else if m+n <= cap(b.buf)/2 { - // We can slide things down instead of allocating a new - // slice. We only need m+n <= cap(b.buf) to slide, but - // we instead let capacity get twice as large so we - // don't spend all our time copying. - copy(b.buf[:], b.buf[b.off:]) - buf = b.buf[:m] - } else { - // not enough space anywhere - buf = makeSlice(2*cap(b.buf) + n) - copy(buf, b.buf[b.off:]) - } + // Try to grow by means of a reslice. + if i, ok := b.tryGrowByReslice(n); ok { + return i + } + if b.buf == nil && n <= smallBufferSize { + b.buf = make([]byte, n, smallBufferSize) + return 0 + } + c := cap(b.buf) + if n <= c/2-m { + // We can slide things down instead of allocating a new + // slice. We only need m+n <= c to slide, but + // we instead let capacity get twice as large so we + // don't spend all our time copying. + copy(b.buf, b.buf[b.off:]) + } else if c > maxInt-c-n { + panic(bytes.ErrTooLarge) + } else { + // Not enough space anywhere, we need to allocate. + buf := makeSlice(2*c + n) + copy(buf, b.buf[b.off:]) b.buf = buf - b.off = 0 } - b.buf = b.buf[0 : b.off+m+n] - return b.off + m + // Restore b.off and len(b.buf). + b.off = 0 + b.buf = b.buf[:m+n] + return m } // Alloc allocs n bytes of slice from the buffer, growing the buffer as @@ -96,7 +119,10 @@ func (b *Buffer) Alloc(n int) []byte { if n < 0 { panic("leveldb/util.Buffer.Alloc: negative count") } - m := b.grow(n) + m, ok := b.tryGrowByReslice(n) + if !ok { + m = b.grow(n) + } return b.buf[m:] } @@ -110,14 +136,17 @@ func (b *Buffer) Grow(n int) { panic("leveldb/util.Buffer.Grow: negative count") } m := b.grow(n) - b.buf = b.buf[0:m] + b.buf = b.buf[:m] } // Write appends the contents of p to the buffer, growing the buffer as // needed. The return value n is the length of p; err is always nil. If the // buffer becomes too large, Write will panic with bytes.ErrTooLarge. func (b *Buffer) Write(p []byte) (n int, err error) { - m := b.grow(len(p)) + m, ok := b.tryGrowByReslice(len(p)) + if !ok { + m = b.grow(len(p)) + } return copy(b.buf[m:], p), nil } @@ -132,34 +161,23 @@ const MinRead = 512 // error except io.EOF encountered during the read is also returned. If the // buffer becomes too large, ReadFrom will panic with bytes.ErrTooLarge. func (b *Buffer) ReadFrom(r io.Reader) (n int64, err error) { - // If buffer is empty, reset to recover space. - if b.off >= len(b.buf) { - b.Truncate(0) - } for { - if free := cap(b.buf) - len(b.buf); free < MinRead { - // not enough space at end - newBuf := b.buf - if b.off+free < MinRead { - // not enough space using beginning of buffer; - // double buffer capacity - newBuf = makeSlice(2*cap(b.buf) + MinRead) - } - copy(newBuf, b.buf[b.off:]) - b.buf = newBuf[:len(b.buf)-b.off] - b.off = 0 + i := b.grow(MinRead) + b.buf = b.buf[:i] + m, e := r.Read(b.buf[i:cap(b.buf)]) + if m < 0 { + panic("leveldb/util.Buffer.ReadFrom: reader returned negative count from Read") } - m, e := r.Read(b.buf[len(b.buf):cap(b.buf)]) - b.buf = b.buf[0 : len(b.buf)+m] + + b.buf = b.buf[:i+m] n += int64(m) if e == io.EOF { - break + return n, nil // e is EOF, so return nil explicitly } if e != nil { return n, e } } - return n, nil // err is EOF, so return nil explicitly } // makeSlice allocates a slice of size n. If the allocation fails, it panics @@ -197,7 +215,7 @@ func (b *Buffer) WriteTo(w io.Writer) (n int64, err error) { } } // Buffer is now empty; reset. - b.Truncate(0) + b.Reset() return } @@ -206,7 +224,10 @@ func (b *Buffer) WriteTo(w io.Writer) (n int64, err error) { // WriteByte. If the buffer becomes too large, WriteByte will panic with // bytes.ErrTooLarge. func (b *Buffer) WriteByte(c byte) error { - m := b.grow(1) + m, ok := b.tryGrowByReslice(1) + if !ok { + m = b.grow(1) + } b.buf[m] = c return nil } @@ -218,7 +239,7 @@ func (b *Buffer) WriteByte(c byte) error { func (b *Buffer) Read(p []byte) (n int, err error) { if b.off >= len(b.buf) { // Buffer is empty, reset to recover space. - b.Truncate(0) + b.Reset() if len(p) == 0 { return } @@ -248,7 +269,7 @@ func (b *Buffer) Next(n int) []byte { func (b *Buffer) ReadByte() (c byte, err error) { if b.off >= len(b.buf) { // Buffer is empty, reset to recover space. - b.Truncate(0) + b.Reset() return 0, io.EOF } c = b.buf[b.off] diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index 2f3db974a79..5ab1f86825a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -195,6 +195,9 @@ func (p *BufferPool) String() string { return "" } + p.mu.Lock() + defer p.mu.Unlock() + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}", p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index 73f272af5ff..9535e359145 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -9,6 +9,7 @@ package leveldb import ( "fmt" "sync/atomic" + "time" "unsafe" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -22,7 +23,8 @@ type tSet struct { } type version struct { - s *session + id int64 // unique monotonous increasing version id + s *session levels []tFiles @@ -39,8 +41,11 @@ type version struct { released bool } +// newVersion creates a new version with an unique monotonous increasing id. func newVersion(s *session) *version { - return &version{s: s} + id := atomic.AddInt64(&s.ntVersionId, 1) + nv := &version{s: s, id: id - 1} + return nv } func (v *version) incref() { @@ -50,11 +55,11 @@ func (v *version) incref() { v.ref++ if v.ref == 1 { - // Incr file ref. - for _, tt := range v.levels { - for _, t := range tt { - v.s.addFileRef(t.fd, 1) - } + select { + case v.s.refCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: + // We can use v.levels directly here since it is immutable. + case <-v.s.closeC: + v.s.log("reference loop already exist") } } } @@ -66,13 +71,11 @@ func (v *version) releaseNB() { } else if v.ref < 0 { panic("negative version ref") } - - for _, tt := range v.levels { - for _, t := range tt { - if v.s.addFileRef(t.fd, -1) == 0 { - v.s.tops.remove(t) - } - } + select { + case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: + // We can use v.levels directly here since it is immutable. + case <-v.s.closeC: + v.s.log("reference loop already exist") } v.released = true @@ -141,6 +144,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue } ukey := ikey.ukey() + sampleSeeks := !v.s.o.GetDisableSeeksCompaction() var ( tset *tSet @@ -158,7 +162,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue // Since entries never hop across level, finding key/value // in smaller level make later levels irrelevant. v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { - if level >= 0 && !tseek { + if sampleSeeks && level >= 0 && !tseek { if tset == nil { tset = &tSet{level, t} } else { @@ -273,10 +277,10 @@ func (v *version) newStaging() *versionStaging { } // Spawn a new version based on this version. -func (v *version) spawn(r *sessionRecord) *version { +func (v *version) spawn(r *sessionRecord, trivial bool) *version { staging := v.newStaging() staging.commit(r) - return staging.finish() + return staging.finish(trivial) } func (v *version) fillRecord(r *sessionRecord) { @@ -446,7 +450,7 @@ func (p *versionStaging) commit(r *sessionRecord) { } } -func (p *versionStaging) finish() *version { +func (p *versionStaging) finish(trivial bool) *version { // Build new version. nv := newVersion(p.base.s) numLevel := len(p.levels) @@ -463,6 +467,12 @@ func (p *versionStaging) finish() *version { if level < len(p.levels) { scratch := p.levels[level] + // Short circuit if there is no change at all. + if len(scratch.added) == 0 && len(scratch.deleted) == 0 { + nv.levels[level] = baseTabels + continue + } + var nt tFiles // Prealloc list if possible. if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 { @@ -480,6 +490,41 @@ func (p *versionStaging) finish() *version { nt = append(nt, t) } + // Avoid resort if only files in this level are deleted + if len(scratch.added) == 0 { + nv.levels[level] = nt + continue + } + + // For normal table compaction, one compaction will only involve two levels + // of files. And the new files generated after merging the source level and + // source+1 level related files can be inserted as a whole into source+1 level + // without any overlap with the other source+1 files. + // + // When the amount of data maintained by leveldb is large, the number of files + // per level will be very large. While qsort is very inefficient for sorting + // already ordered arrays. Therefore, for the normal table compaction, we use + // binary search here to find the insert index to insert a batch of new added + // files directly instead of using qsort. + if trivial && len(scratch.added) > 0 { + added := make(tFiles, 0, len(scratch.added)) + for _, r := range scratch.added { + added = append(added, tableFileFromRecord(r)) + } + if level == 0 { + added.sortByNum() + index := nt.searchNumLess(added[len(added)-1].fd.Num) + nt = append(nt[:index], append(added, nt[index:]...)...) + } else { + added.sortByKey(p.base.s.icmp) + _, amax := added.getRange(p.base.s.icmp) + index := nt.searchMin(p.base.s.icmp, amax) + nt = append(nt[:index], append(added, nt[index:]...)...) + } + nv.levels[level] = nt + continue + } + // New tables. for _, r := range scratch.added { nt = append(nt, tableFileFromRecord(r))