From a8f48b89f3c72e7578bbecd03f5ab06b396b326f Mon Sep 17 00:00:00 2001 From: Stefano Scafiti Date: Mon, 16 Sep 2024 10:26:45 +0200 Subject: [PATCH] chore(pkg/database): implement database manager Databases are now opened and closed on demand, up to the MaxActiveDatabases limit, to reduce memory consumption when managing a large number of databases. Signed-off-by: Stefano Scafiti --- .../immudbcmdtest/immuServerMock_test.go | 2 +- cmd/immudb/command/init.go | 3 +- cmd/immudb/command/parse_options.go | 5 +- embedded/cache/cache.go | 40 +- embedded/cache/cache_test.go | 56 ++ embedded/store/immustore.go | 84 +- embedded/store/indexer.go | 2 +- embedded/store/options.go | 8 - pkg/database/database.go | 35 +- pkg/database/database_test.go | 28 +- pkg/database/db_manager.go | 476 ++++++++++++ pkg/database/db_manager_test.go | 387 ++++++++++ pkg/database/dboptions.go | 4 +- pkg/database/dboptions_test.go | 8 +- pkg/database/document_database_test.go | 4 +- pkg/database/lazy_db.go | 724 ++++++++++++++++++ pkg/database/replica_test.go | 4 +- pkg/database/scan_test.go | 5 +- pkg/database/sql.go | 27 + pkg/database/truncator.go | 77 +- pkg/database/truncator_test.go | 37 +- pkg/database/types.go | 109 +-- pkg/integration/follower_replication_test.go | 8 +- pkg/replication/replicator.go | 1 - pkg/replication/replicator_test.go | 4 +- pkg/server/db_dummy_closed_test.go | 2 +- pkg/server/db_options.go | 12 +- pkg/server/metrics_funcs_test.go | 71 +- pkg/server/options.go | 9 +- pkg/server/remote_storage.go | 11 - pkg/server/remote_storage_test.go | 12 + pkg/server/server.go | 97 +-- pkg/server/server_test.go | 10 +- .../transactions/transactions_test.go | 2 +- pkg/server/truncator_test.go | 5 +- pkg/server/types.go | 16 +- pkg/server/types_test.go | 2 +- pkg/server/user_test.go | 5 +- pkg/truncator/truncator.go | 4 +- pkg/truncator/truncator_test.go | 18 +- 40 files changed, 2040 insertions(+), 374 deletions(-) create mode 100644 pkg/database/db_manager.go create mode 100644 pkg/database/db_manager_test.go create mode 100644 pkg/database/lazy_db.go diff --git a/cmd/immudb/command/immudbcmdtest/immuServerMock_test.go b/cmd/immudb/command/immudbcmdtest/immuServerMock_test.go index d77abd4297..809762983b 100644 --- a/cmd/immudb/command/immudbcmdtest/immuServerMock_test.go +++ b/cmd/immudb/command/immudbcmdtest/immuServerMock_test.go @@ -55,7 +55,7 @@ func TestImmuServerMock(t *testing.T) { mock.WithStreamServiceFactory(ssf) require.Same(t, ssf, mock.Ssf) - list := database.NewDatabaseList() + list := database.NewDatabaseList(nil) mock.WithDbList(list) require.Same(t, list, mock.DbList) diff --git a/cmd/immudb/command/init.go b/cmd/immudb/command/init.go index 92f58c46f9..bf5e13dbea 100644 --- a/cmd/immudb/command/init.go +++ b/cmd/immudb/command/init.go @@ -44,7 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) { cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary") cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication") cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication") - cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache") + cmd.Flags().Int("max-active-databases", options.MaxActiveDatabases, "the maximum number of databases that can be active simultaneously") cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)") cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid") @@ -162,6 +162,7 @@ func setupDefaults(options *server.Options) { viper.SetDefault("max-sessions", 100) viper.SetDefault("max-session-inactivity-time", 3*time.Minute) viper.SetDefault("max-session-age-time", 0) + viper.SetDefault("max-active-databases", options.MaxActiveDatabases) viper.SetDefault("session-timeout", 2*time.Minute) viper.SetDefault("sessions-guard-check-interval", 1*time.Minute) viper.SetDefault("logformat", logger.LogFormatText) diff --git a/cmd/immudb/command/parse_options.go b/cmd/immudb/command/parse_options.go index 63b6935ea0..27fdce637f 100644 --- a/cmd/immudb/command/parse_options.go +++ b/cmd/immudb/command/parse_options.go @@ -92,6 +92,8 @@ func parseOptions() (options *server.Options, err error) { swaggerUIEnabled := viper.GetBool("swaggerui") logRequestMetadata := viper.GetBool("log-request-metadata") + maxActiveDatabases := viper.GetInt("max-active-databases") + s3Storage := viper.GetBool("s3-storage") s3RoleEnabled := viper.GetBool("s3-role-enabled") s3Role := viper.GetString("s3-role") @@ -165,7 +167,8 @@ func parseOptions() (options *server.Options, err error) { WithLogFormat(logFormat). WithSwaggerUIEnabled(swaggerUIEnabled). WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled). - WithLogRequestMetadata(logRequestMetadata) + WithLogRequestMetadata(logRequestMetadata). + WithMaxActiveDatabases(maxActiveDatabases) return options, nil } diff --git a/embedded/cache/cache.go b/embedded/cache/cache.go index 7e86a473b6..76bde5c26d 100644 --- a/embedded/cache/cache.go +++ b/embedded/cache/cache.go @@ -28,8 +28,12 @@ var ( ErrIllegalArguments = errors.New("illegal arguments") ErrKeyNotFound = errors.New("key not found") ErrIllegalState = errors.New("illegal state") + ErrCannotEvictItem = errors.New("cannot find an item to evict") ) +type EvictFilterFunc func(key interface{}, value interface{}) bool +type EvictCallbackFunc func(key, value interface{}) + // Cache implements the SIEVE cache replacement policy. type Cache struct { data map[interface{}]*entry @@ -40,6 +44,9 @@ type Cache struct { maxWeight int mutex sync.RWMutex + + canEvict EvictFilterFunc + onEvict EvictCallbackFunc } type entry struct { @@ -59,15 +66,34 @@ func NewCache(maxWeight int) (*Cache, error) { list: list.New(), weight: 0, maxWeight: maxWeight, + onEvict: nil, + canEvict: nil, }, nil } +func (c *Cache) SetCanEvict(canEvict EvictFilterFunc) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.canEvict = canEvict +} + +func (c *Cache) SetOnEvict(onEvict EvictCallbackFunc) { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.onEvict = onEvict +} + func (c *Cache) Resize(newWeight int) { c.mutex.Lock() defer c.mutex.Unlock() for c.weight > newWeight { - _, entry, _ := c.evict() + key, entry, _ := c.evict() + if c.onEvict != nil { + c.onEvict(key, entry.value) + } c.weight -= entry.weight } @@ -133,6 +159,9 @@ func (c *Cache) evictWhileFull(weight int) (interface{}, interface{}, error) { rkey = evictedKey rvalue = entry.value + if c.onEvict != nil { + c.onEvict(rkey, rvalue) + } c.weight -= entry.weight } return rkey, rvalue, nil @@ -144,7 +173,7 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) { } curr := c.hand - for { + for i := 0; i < 2*c.list.Len(); i++ { if curr == nil { curr = c.list.Back() } @@ -152,7 +181,7 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) { key := curr.Value e := c.data[key] - if e.visited == 0 { + if e.visited == 0 && c.shouldEvict(key, e.value) { c.hand = curr.Prev() c.list.Remove(curr) @@ -164,6 +193,11 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) { e.visited = 0 curr = curr.Prev() } + return nil, nil, ErrCannotEvictItem +} + +func (c *Cache) shouldEvict(key, value interface{}) bool { + return c.canEvict == nil || c.canEvict(key, value) } func (c *Cache) Get(key interface{}) (interface{}, error) { diff --git a/embedded/cache/cache_test.go b/embedded/cache/cache_test.go index 5b1f3593d2..9494bd33e2 100644 --- a/embedded/cache/cache_test.go +++ b/embedded/cache/cache_test.go @@ -394,5 +394,61 @@ func TestPutWeighted(t *testing.T) { require.Equal(t, 4, cache.Weight()) require.Equal(t, 1, cache.EntriesCount()) }) +} + +func TestOnEvict(t *testing.T) { + cache, err := NewCache(5) + require.NoError(t, err) + + var onEvictCalled int + cache.SetOnEvict(func(_, value interface{}) { + onEvictCalled++ + }) + + for i := 0; i < 5; i++ { + cache.Put(i, i) + } + require.Zero(t, onEvictCalled) + + _, _, err = cache.PutWeighted(6, 6, 3) + require.NoError(t, err) + + require.Equal(t, onEvictCalled, 3) + + _, _, err = cache.PutWeighted(7, 7, 2) + require.NoError(t, err) + require.Equal(t, onEvictCalled, 5) + + cache.Resize(0) + require.Equal(t, onEvictCalled, 7) +} +func TestCanEvict(t *testing.T) { + cache, err := NewCache(5) + require.NoError(t, err) + + for i := 0; i < 5; i++ { + _, _, err := cache.Put(i, i) + require.NoError(t, err) + } + + t.Run("cannot evict any item", func(t *testing.T) { + cache.SetCanEvict(func(_, _ interface{}) bool { + return false + }) + + _, _, err := cache.Put(6, 6) + require.ErrorIs(t, err, ErrCannotEvictItem) + }) + + t.Run("cannot evict any item", func(t *testing.T) { + keyToEvict := rand.Intn(5) + cache.SetCanEvict(func(key, _ interface{}) bool { + return key == keyToEvict + }) + + evictedKey, _, err := cache.Put(6, 6) + require.NoError(t, err) + require.Equal(t, evictedKey, keyToEvict) + }) } diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 648c66cff9..0874eac974 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -216,7 +216,7 @@ type ImmuStore struct { commitWHub *watchers.WatchersHub indexers map[[sha256.Size]byte]*indexer - nextIndexerID uint32 + nextIndexerID uint64 indexCache *cache.Cache memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory @@ -720,47 +720,52 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable } if store.synced { - go func() { - for { - committedTxID := store.LastCommittedTxID() + go store.syncer() + } - // passive wait for one new transaction at least - store.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1) + return store, nil +} - // TODO: waiting on earlier stages of transaction processing may also be possible - prevLatestPrecommitedTx := committedTxID + 1 +func (s *ImmuStore) syncer() { + for { + committedTxID := s.LastCommittedTxID() - // TODO: parametrize concurrency evaluation - for i := 0; i < 4; i++ { - // give some time for more transactions to be precommitted - time.Sleep(store.syncFrequency / 4) + // passive wait for one new transaction at least + err := s.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1) + if errors.Is(err, watchers.ErrAlreadyClosed) { + return + } - latestPrecommitedTx := store.LastPrecommittedTxID() + // TODO: waiting on earlier stages of transaction processing may also be possible + prevLatestPrecommitedTx := committedTxID + 1 - if prevLatestPrecommitedTx == latestPrecommitedTx { - // avoid waiting if there are no new transactions - break - } + // TODO: parametrize concurrency evaluation + for i := 0; i < 4; i++ { + // give some time for more transactions to be precommitted + time.Sleep(s.syncFrequency / 4) - prevLatestPrecommitedTx = latestPrecommitedTx - } + latestPrecommitedTx := s.LastPrecommittedTxID() - // ensure durability - err := store.sync() - if errors.Is(err, ErrAlreadyClosed) || - errors.Is(err, multiapp.ErrAlreadyClosed) || - errors.Is(err, singleapp.ErrAlreadyClosed) || - errors.Is(err, watchers.ErrAlreadyClosed) { - return - } - if err != nil { - store.notify(Error, true, "%s: while syncing transactions", err) - } + if prevLatestPrecommitedTx == latestPrecommitedTx { + // avoid waiting if there are no new transactions + break } - }() - } - return store, nil + prevLatestPrecommitedTx = latestPrecommitedTx + } + + // ensure durability + err = s.sync() + if errors.Is(err, ErrAlreadyClosed) || + errors.Is(err, multiapp.ErrAlreadyClosed) || + errors.Is(err, singleapp.ErrAlreadyClosed) || + errors.Is(err, watchers.ErrAlreadyClosed) { + return + } + if err != nil { + s.notify(Error, true, "%s: while syncing transactions", err) + } + } } type NotificationType = int @@ -855,15 +860,11 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error { } if s.indexCache == nil { - if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil { - s.indexCache = indexFactoryFunc() - } else { - c, err := cache.NewCache(s.opts.IndexOpts.CacheSize) - if err != nil { - return err - } - s.indexCache = c + c, err := cache.NewCache(s.opts.IndexOpts.CacheSize) + if err != nil { + return err } + s.indexCache = c } indexer, err := newIndexer(indexPath, s, s.opts) @@ -3292,7 +3293,6 @@ func (s *ImmuStore) Sync() error { if s.closed { return ErrAlreadyClosed } - return s.sync() } diff --git a/embedded/store/indexer.go b/embedded/store/indexer.go index 71e4566e21..b33e68a1d5 100644 --- a/embedded/store/indexer.go +++ b/embedded/store/indexer.go @@ -106,7 +106,7 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments) } - id := atomic.AddUint32(&store.nextIndexerID, 1) + id := atomic.AddUint64(&store.nextIndexerID, 1) if id-1 > math.MaxUint16 { return nil, ErrMaxIndexersLimitExceeded } diff --git a/embedded/store/options.go b/embedded/store/options.go index a21261b902..f8fd51736c 100644 --- a/embedded/store/options.go +++ b/embedded/store/options.go @@ -206,9 +206,6 @@ type IndexOptions struct { // Maximum time waiting for more transactions to be committed and included into the same bulk BulkPreparationTimeout time.Duration - - // CacheFactory function - CacheFactory IndexCacheFactoryFunc } type AHTOptions struct { @@ -710,11 +707,6 @@ func (opts *IndexOptions) WithMaxGlobalBufferedDataSize(size int) *IndexOptions return opts } -func (opts *IndexOptions) WithCacheFactoryFunc(indexCacheFactory IndexCacheFactoryFunc) *IndexOptions { - opts.CacheFactory = indexCacheFactory - return opts -} - // AHTOptions func (opts *AHTOptions) WithWriteBufferSize(writeBufferSize int) *AHTOptions { diff --git a/pkg/database/database.go b/pkg/database/database.go index 21fea86b33..7b1229810a 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -118,6 +118,8 @@ type DB interface { VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error) + CopySQLCatalog(ctx context.Context, txID uint64) (uint64, error) + ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error) DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error) @@ -134,6 +136,10 @@ type DB interface { VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) + // Truncation + FindTruncationPoint(ctx context.Context, until time.Time) (*schema.TxHeader, error) + TruncateUptoTx(ctx context.Context, txID uint64) error + // Maintenance FlushIndex(req *schema.FlushIndexRequest) error CompactIndex() error @@ -381,7 +387,6 @@ func (d *db) FlushIndex(req *schema.FlushIndexRequest) error { if req == nil { return store.ErrIllegalArguments } - return d.st.FlushIndexes(req.CleanupPercentage, req.Synced) } @@ -1741,3 +1746,31 @@ func (d *db) CopyCatalogToTx(ctx context.Context, tx *store.OngoingTx) error { return nil } + +func (d *db) FindTruncationPoint(ctx context.Context, until time.Time) (*schema.TxHeader, error) { + hdr, err := d.st.LastTxUntil(until) + if errors.Is(err, store.ErrTxNotFound) { + return nil, ErrRetentionPeriodNotReached + } + if err != nil { + return nil, err + } + + // look for the newst transaction with entries + for err == nil { + if hdr.NEntries > 0 { + break + } + + if ctx.Err() != nil { + return nil, err + } + + hdr, err = d.st.ReadTxHeader(hdr.ID-1, false, false) + } + return schema.TxHeaderToProto(hdr), nil +} + +func (d *db) TruncateUptoTx(_ context.Context, txID uint64) error { + return d.st.TruncateUptoTx(txID) +} diff --git a/pkg/database/database_test.go b/pkg/database/database_test.go index d932a6b5c0..1ce41a957f 100644 --- a/pkg/database/database_test.go +++ b/pkg/database/database_test.go @@ -58,7 +58,7 @@ var kvs = []*schema.KeyValue{ func makeDb(t *testing.T) *db { rootPath := t.TempDir() - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)) return makeDbWith(t, "db", options) @@ -144,7 +144,7 @@ func (h *dummyMultidbHandler) ExecPreparedStmts( } func TestDefaultDbCreation(t *testing.T) { - options := DefaultOption().WithDBRootPath(t.TempDir()) + options := DefaultOptions().WithDBRootPath(t.TempDir()) db, err := NewDB("mydb", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestDefaultDbCreation(t *testing.T) { } func TestDbCreationInAlreadyExistentDirectories(t *testing.T) { - options := DefaultOption().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) + options := DefaultOptions().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) err := os.MkdirAll(filepath.Join(options.GetDBRootPath(), "EdithPiaf"), os.ModePerm) require.NoError(t, err) @@ -178,14 +178,14 @@ func TestDbCreationInAlreadyExistentDirectories(t *testing.T) { } func TestDbCreationInInvalidDirectory(t *testing.T) { - options := DefaultOption().WithDBRootPath("/?") + options := DefaultOptions().WithDBRootPath("/?") _, err := NewDB("EdithPiaf", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.Error(t, err) } func TestDbCreation(t *testing.T) { - options := DefaultOption().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) + options := DefaultOptions().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) db, err := NewDB("EdithPiaf", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) @@ -196,19 +196,19 @@ func TestDbCreation(t *testing.T) { } func TestOpenWithMissingDBDirectories(t *testing.T) { - options := DefaultOption().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) + options := DefaultOptions().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) _, err := OpenDB("EdithPiaf", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.ErrorContains(t, err, "missing database directories") } func TestOpenWithIllegalDBName(t *testing.T) { - options := DefaultOption().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) + options := DefaultOptions().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) _, err := OpenDB("", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.ErrorIs(t, err, ErrIllegalArguments) } func TestOpenDB(t *testing.T) { - options := DefaultOption().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) + options := DefaultOptions().WithDBRootPath(filepath.Join(t.TempDir(), "Paris")) db, err := NewDB("EdithPiaf", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) @@ -227,11 +227,11 @@ func TestOpenV1_0_1_DB(t *testing.T) { dir := filepath.Join(t.TempDir(), "db") require.NoError(t, copier.CopyDir("../../test/data_v1.1.0", dir)) - sysOpts := DefaultOption().WithDBRootPath(dir) + sysOpts := DefaultOptions().WithDBRootPath(dir) sysDB, err := OpenDB("systemdb", nil, sysOpts, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) - dbOpts := DefaultOption().WithDBRootPath(dir) + dbOpts := DefaultOptions().WithDBRootPath(dir) db, err := OpenDB("defaultdb", nil, dbOpts, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) @@ -2210,7 +2210,7 @@ db := makeDb(t) */ func Test_database_truncate(t *testing.T) { - options := DefaultOption().WithDBRootPath(t.TempDir()) + options := DefaultOptions().WithDBRootPath(t.TempDir()) options.storeOpts. WithEmbeddedValues(false). WithPreallocFiles(false). @@ -2235,16 +2235,16 @@ func Test_database_truncate(t *testing.T) { } } - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) hdr, err := c.Plan(context.Background(), queryTime) require.NoError(t, err) require.LessOrEqual(t, time.Unix(hdr.Ts, 0), queryTime) - err = c.TruncateUptoTx(context.Background(), hdr.ID) + err = c.TruncateUptoTx(context.Background(), hdr.Id) require.NoError(t, err) - for i := hdr.ID; i <= 20; i++ { + for i := hdr.Id; i <= 20; i++ { tx := store.NewTx(db.st.MaxTxEntries(), db.st.MaxKeyLen()) err = db.st.ReadTx(i, false, tx) diff --git a/pkg/database/db_manager.go b/pkg/database/db_manager.go new file mode 100644 index 0000000000..85ed9647b7 --- /dev/null +++ b/pkg/database/db_manager.go @@ -0,0 +1,476 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package database + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/codenotary/immudb/embedded/cache" + "github.com/codenotary/immudb/embedded/logger" + "github.com/codenotary/immudb/embedded/store" + "github.com/codenotary/immudb/pkg/api/schema" +) + +type DBManager struct { + openDB OpenDBFunc + dbCache *cache.Cache + + logger logger.Logger + + dbMutex sync.RWMutex + databases []*dbInfo + dbIndex map[string]int + + mtx sync.Mutex + waitCond *sync.Cond + + closed bool +} + +type dbInfo struct { + mtx sync.Mutex + + opts *Options + state *schema.ImmutableState + + name string + deleted bool + closed bool +} + +func (db *dbInfo) cacheInfo(s *schema.ImmutableState, opts *Options) { + db.mtx.Lock() + defer db.mtx.Unlock() + + db.state = s + db.opts = opts +} + +func (db *dbInfo) getState() *schema.ImmutableState { + db.mtx.Lock() + defer db.mtx.Unlock() + + return db.state +} + +func (db *dbInfo) getOptions() *Options { + db.mtx.Lock() + defer db.mtx.Unlock() + + return db.opts +} + +func (db *dbInfo) close() error { + db.mtx.Lock() + defer db.mtx.Unlock() + + if db.closed { + return store.ErrAlreadyClosed + } + db.closed = true + + return nil +} + +type dbRef struct { + db DB + count uint32 +} + +type OpenDBFunc func(name string, opts *Options) (DB, error) + +func NewDBManager(openFunc OpenDBFunc, maxActiveDatabases int, log logger.Logger) *DBManager { + m := &DBManager{ + openDB: openFunc, + dbIndex: make(map[string]int), + databases: make([]*dbInfo, 0), + logger: log, + } + m.dbCache = createCache(m, maxActiveDatabases) + m.waitCond = sync.NewCond(&m.mtx) + return m +} + +func createCache(m *DBManager, capacity int) *cache.Cache { + c, _ := cache.NewCache(capacity) + + c.SetCanEvict(func(_, value interface{}) bool { + ref, _ := value.(*dbRef) + + return ref != nil && atomic.LoadUint32(&ref.count) == 0 + }) + + c.SetOnEvict(func(idx, value interface{}) { + ref, _ := value.(*dbRef) + if ref == nil { + return + } + + // NOTE: db cannot be nil at this point, + // since it can only be evicted after it has been successfully opened. + // Moreover, since the reference cannot be altered after it has been set, + // there is not need to acquire the database lock. + if ref.db == nil { + m.logger.Errorf("db not initialised during eviction") + return + } + + state, err := ref.db.CurrentState() + if err != nil { + m.logger.Errorf(`%w: while fetching db %s state`, err, ref.db.GetName()) + } + + opts := ref.db.GetOptions() + + err = ref.db.Close() + if err != nil { + m.logger.Errorf(`%w: while closing db "%s"`, err, ref.db.GetName()) + } + + if i, ok := idx.(int); ok && (i >= 0 && i < len(m.databases)) { + m.databases[i].cacheInfo(state, opts) + } + ref.db = nil + }) + return c +} + +func (m *DBManager) Put(dbName string, opts *Options, closed bool) int { + m.dbMutex.Lock() + defer m.dbMutex.Unlock() + + if idx, has := m.dbIndex[dbName]; has { + ref := m.databases[idx] + ref.deleted = false + ref.closed = closed + ref.opts = opts + return idx + } + + m.dbIndex[dbName] = len(m.databases) + + info := &dbInfo{ + opts: opts, + name: dbName, + deleted: false, + closed: closed, + } + + m.databases = append(m.databases, info) + return len(m.databases) - 1 +} + +func (m *DBManager) Get(idx int) (DB, error) { + db, exists := m.getDB(idx) + if !exists { + return nil, ErrDatabaseNotExists + } + + ref, err := m.allocDB(idx, db) + if err != nil { + return nil, err + } + defer db.mtx.Unlock() + + if ref.db == nil { + d, err := m.openDB(db.name, db.opts) + if err != nil { + m.Release(idx) + return nil, err + } + ref.db = d + } + return ref.db, nil +} + +func (m *DBManager) allocDB(idx int, db *dbInfo) (*dbRef, error) { + m.mtx.Lock() + defer m.mtx.Unlock() + + for { + db.mtx.Lock() + + if m.closed || db.closed || db.deleted { + db.mtx.Unlock() + return nil, store.ErrAlreadyClosed + } + + v, err := m.dbCache.Get(idx) + if err == nil { + ref := v.(*dbRef) + atomic.AddUint32(&ref.count, 1) + return ref, nil + } + + ref := &dbRef{count: 1} + _, _, err = m.dbCache.Put(idx, ref) + if err == nil { + return ref, nil + } + + db.mtx.Unlock() + m.waitCond.Wait() + } +} + +func (m *DBManager) Release(idx int) { + v, err := m.dbCache.Get(idx) + + // NOTE: may occur if the database is closed + // before being fully released + if err != nil { + return + } + + ref, _ := v.(*dbRef) + if ref == nil { + return + } + + if atomic.AddUint32(&ref.count, ^uint32(0)) == 0 { + m.signal() + } +} + +func (m *DBManager) signal() { + m.mtx.Lock() + defer m.mtx.Unlock() + + m.waitCond.Signal() +} + +func (m *DBManager) Has(name string) bool { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + _, has := m.dbIndex[name] + return has +} + +func (m *DBManager) HasIndex(idx int) bool { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + return idx >= 0 && idx < len(m.databases) +} + +func (m *DBManager) GetIndexByName(name string) int { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + idx, exists := m.dbIndex[name] + if !exists { + return -1 + } + return idx +} + +func (m *DBManager) GetNameByIndex(idx int) string { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + if idx < 0 || idx >= len(m.databases) { + return "" + } + return m.databases[idx].name +} + +func (m *DBManager) GetOptionsByIndex(idx int) *Options { + dbInfo, has := m.getDB(idx) + if !has { + return nil + } + + ref, err := m.dbCache.Get(idx) + if err == nil { + dbInfo.mtx.Lock() + defer dbInfo.mtx.Unlock() + + if dbRef := ref.(*dbRef); dbRef != nil && dbRef.db != nil { + return dbInfo.opts + } + return nil + } + return dbInfo.getOptions() +} + +func (m *DBManager) GetState(idx int) (*schema.ImmutableState, error) { + dbInfo, has := m.getDB(idx) + if !has { + return nil, ErrDatabaseNotExists + } + + ref, err := m.dbCache.Get(idx) + if err == nil { + dbInfo.mtx.Lock() + defer dbInfo.mtx.Unlock() + + if dbRef := ref.(*dbRef); dbRef != nil && dbRef.db != nil { + return dbRef.db.CurrentState() + } + // this condition should never happen + return nil, fmt.Errorf("unable to get state") + } + + s := dbInfo.getState() + if s != nil { + return s, nil + } + + db, err := m.Get(idx) + if err != nil { + return nil, err + } + defer m.Release(idx) + + return db.CurrentState() +} + +func (m *DBManager) Delete(name string) error { + m.dbMutex.RLock() + + idx, exists := m.dbIndex[name] + if !exists { + m.dbMutex.RUnlock() + return ErrDatabaseNotExists + } + + db := m.databases[idx] + m.dbMutex.RUnlock() + + db.mtx.Lock() + defer db.mtx.Unlock() + + if !db.closed { + return ErrCannotDeleteAnOpenDatabase + } + db.deleted = true + + // NOTE: a closed database cannot be present in the cache + return nil +} + +func (m *DBManager) Length() int { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + return len(m.databases) +} + +func (m *DBManager) IsLoaded(idx int) bool { + db, exists := m.getDB(idx) + if !exists { + return false + } + + db.mtx.Lock() + defer db.mtx.Unlock() + + return !db.closed +} + +func (m *DBManager) Close(idx int) error { + db, exists := m.getDB(idx) + if !exists { + return nil + } + + if err := db.close(); err != nil { + return err + } + defer m.waitCond.Broadcast() + + v, err := m.dbCache.Pop(idx) + if err != nil { + return nil + } + + ref, _ := v.(*dbRef) + if ref != nil && ref.db != nil { + ref.db.Close() + ref.db = nil + } + return nil +} + +func (m *DBManager) IsClosed(idx int) bool { + db, exists := m.getDB(idx) + if !exists { + return true + } + + db.mtx.Lock() + defer db.mtx.Unlock() + + return db.closed +} + +func (m *DBManager) getDB(idx int) (*dbInfo, bool) { + m.dbMutex.RLock() + defer m.dbMutex.RUnlock() + + if idx < 0 || idx >= len(m.databases) { + return nil, false + } + return m.databases[idx], true +} + +func (m *DBManager) Resize(n int) { + m.dbCache.Resize(n) +} + +func (m *DBManager) CloseAll(ctx context.Context) error { + m.mtx.Lock() + m.closed = true + m.mtx.Unlock() + + m.waitCond.Broadcast() + + tryClose := true + for tryClose { + if err := ctx.Err(); err != nil { + return err + } + + busyDBs := 0 + m.dbCache.Apply(func(_, value interface{}) error { + ref := value.(*dbRef) + + if atomic.LoadUint32(&ref.count) > 0 { + busyDBs++ + return nil + } + + ref.db.Close() + return nil + }) + tryClose = busyDBs > 0 + + time.Sleep(time.Millisecond * 10) + } + m.dbCache.Resize(0) + return nil +} + +func (m *DBManager) IsActive(idx int) bool { + _, err := m.dbCache.Get(idx) + return err == nil +} diff --git a/pkg/database/db_manager_test.go b/pkg/database/db_manager_test.go new file mode 100644 index 0000000000..854d44e9d9 --- /dev/null +++ b/pkg/database/db_manager_test.go @@ -0,0 +1,387 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package database + +import ( + "context" + "fmt" + "math/rand" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/codenotary/immudb/embedded/logger" + "github.com/codenotary/immudb/embedded/sql" + "github.com/codenotary/immudb/embedded/store" + "github.com/codenotary/immudb/pkg/api/schema" + "github.com/stretchr/testify/require" +) + +type mockDB struct { + DB + + name string +} + +func (db *mockDB) GetName() string { + return db.name +} + +func (db *mockDB) Close() error { + return nil +} + +func (db *mockDB) GetOptions() *Options { + return &Options{} +} + +func (db *mockDB) CurrentState() (*schema.ImmutableState, error) { + return &schema.ImmutableState{}, nil +} + +func openMockDB(name string, opts *Options) (DB, error) { + return &mockDB{name: name}, nil +} + +func TestDBManagerConcurrentGet(t *testing.T) { + manager := NewDBManager(openMockDB, 5, logger.NewMemoryLogger()) + + n := 100 + for i := 0; i < n; i++ { + manager.Put(fmt.Sprintf("db%d", i), DefaultOptions(), false) + } + + var wg sync.WaitGroup + wg.Add(n) + + for idx := 0; idx < n; idx++ { + go func(idx int) { + defer wg.Done() + + db, err := manager.Get(idx) + require.NoError(t, err) + require.NotNil(t, db) + defer manager.Release(idx) + + require.LessOrEqual(t, manager.dbCache.EntriesCount(), 5) + + sleepTime := time.Duration(10+rand.Intn(41)) * time.Millisecond + time.Sleep(sleepTime) + }(idx) + } + wg.Wait() +} + +func TestDBManagerOpen(t *testing.T) { + var nCalls uint64 + + openDB := func(name string, opts *Options) (DB, error) { + atomic.AddUint64(&nCalls, 1) + return openMockDB(name, opts) + } + + manager := NewDBManager(openDB, 1, logger.NewMemoryLogger()) + manager.Put("testdb", DefaultOptions(), false) + + n := 1000 + + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + + _, err := manager.Get(0) + require.NoError(t, err) + }() + } + wg.Wait() + + require.Equal(t, nCalls, uint64(1)) + v, err := manager.dbCache.Get(0) + require.NoError(t, err) + + ref, _ := v.(*dbRef) + require.NotNil(t, ref) + require.NotNil(t, ref.db) + require.Equal(t, ref.count, uint32(n)) + + for i := 0; i < n; i++ { + manager.Release(0) + } + require.Zero(t, ref.count) +} + +func TestDBManagerClose(t *testing.T) { + maxActiveDBs := 10 + manager := NewDBManager(openMockDB, maxActiveDBs, logger.NewMemoryLogger()) + + manager.Put("test", DefaultOptions(), false) + + n := 100 + for i := 0; i < n; i++ { + _, err := manager.Get(0) + require.NoError(t, err) + } + + err := manager.Close(0) + require.NoError(t, err) + + err = manager.Close(0) + require.ErrorIs(t, err, store.ErrAlreadyClosed) + + for i := 0; i < n; i++ { + manager.Release(0) + } + + _, err = manager.Get(0) + require.ErrorIs(t, err, store.ErrAlreadyClosed) +} + +func TestDBManagerCloseDuringGet(t *testing.T) { + maxActiveDBs := 10 + manager := NewDBManager(openMockDB, maxActiveDBs, logger.NewMemoryLogger()) + + for i := 0; i <= maxActiveDBs; i++ { + manager.Put(fmt.Sprintf("test%d", i), DefaultOptions(), false) + } + + for i := 0; i < maxActiveDBs; i++ { + _, err := manager.Get(i) + require.NoError(t, err) + } + + n := 100 + + var wg sync.WaitGroup + wg.Add(n) + + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + + _, err := manager.Get(maxActiveDBs) + require.ErrorIs(t, err, store.ErrAlreadyClosed) + }() + } + + // wait for all goroutines to attempt Get(maxActiveDBs) + time.Sleep(time.Millisecond * 100) + + err := manager.Close(maxActiveDBs) + require.NoError(t, err) + + wg.Wait() +} + +func TestDBManagerDelete(t *testing.T) { + manager := NewDBManager(openMockDB, 1, logger.NewMemoryLogger()) + + manager.Put("test", DefaultOptions(), false) + + err := manager.Delete("test") + require.ErrorIs(t, err, ErrCannotDeleteAnOpenDatabase) + + err = manager.Close(0) + require.NoError(t, err) + + err = manager.Delete("test") + require.NoError(t, err) +} + +func TestDBManagerCloseAll(t *testing.T) { + maxActiveDBs := 10 + manager := NewDBManager(openMockDB, maxActiveDBs, logger.NewMemoryLogger()) + + n := 100 + for i := 0; i < n; i++ { + manager.Put(fmt.Sprintf("test%d", i), DefaultOptions(), false) + } + + var wg sync.WaitGroup + wg.Add(maxActiveDBs) + for i := 0; i < maxActiveDBs; i++ { + go func(idx int) { + defer wg.Done() + + _, err := manager.Get(idx) + require.NoError(t, err) + }(i) + } + wg.Wait() + + var wg1 sync.WaitGroup + wg1.Add(n - maxActiveDBs) + for i := maxActiveDBs; i < n; i++ { + go func(idx int) { + defer wg1.Done() + + _, err := manager.Get(idx) + require.ErrorIs(t, err, store.ErrAlreadyClosed) + }(i) + } + + t.Run("close deadline exceeded", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := manager.CloseAll(ctx) + require.ErrorIs(t, err, context.DeadlineExceeded) + + // Goroutines waiting to acquire a database + // should be awakened by CloseAll() + wg1.Wait() + }) + + for i := 0; i < n; i++ { + manager.Release(i) + } + + t.Run("close succeeds", func(t *testing.T) { + err := manager.CloseAll(context.Background()) + require.NoError(t, err) + + for i := 0; i < n; i++ { + _, err := manager.Get(i) + require.ErrorIs(t, err, store.ErrAlreadyClosed) + } + }) +} + +func TestLazyDB(t *testing.T) { + dir := t.TempDir() + + err := os.MkdirAll(filepath.Join(dir, "testdb"), os.ModePerm) + require.NoError(t, err) + + err = os.MkdirAll(filepath.Join(dir, "testdb1"), os.ModePerm) + require.NoError(t, err) + + logger := logger.NewMemoryLogger() + + m := NewDBManager(func(name string, opts *Options) (DB, error) { + return OpenDB(name, nil, opts, logger) + }, 1, logger) + + dbList := NewDatabaseList(m) + _, err = dbList.GetByIndex(0) + require.ErrorIs(t, err, ErrDatabaseNotExists) + + db := dbList.Put("testdb", DefaultOptions().WithDBRootPath(dir)) + db1 := dbList.Put("testdb1", DefaultOptions().WithDBRootPath(dir)) + closedDB := dbList.PutClosed("closeddb", DefaultOptions().WithDBRootPath(dir)) + + require.True(t, m.Has("testdb")) + require.True(t, m.Has("testdb1")) + require.False(t, db.IsClosed()) + require.False(t, db1.IsClosed()) + require.True(t, closedDB.IsClosed()) + + t.Run("isActive", func(t *testing.T) { + require.False(t, m.IsActive(0)) + require.False(t, db.IsReplica()) + require.True(t, m.IsActive(0)) + require.False(t, db1.IsReplica()) + require.False(t, m.IsActive(0)) + require.True(t, m.IsActive(1)) + }) + + t.Run("isReplica", func(t *testing.T) { + require.False(t, db.IsReplica()) + db.AsReplica(true, false, 0) + require.True(t, db.IsReplica()) + + require.False(t, db1.IsReplica()) // force db1 loading + require.True(t, db.IsReplica()) + }) + + t.Run("SetSyncReplication", func(t *testing.T) { + db.SetSyncReplication(true) + require.True(t, db.IsSyncReplicationEnabled()) + require.False(t, db1.IsReplica()) // force db1 loading + require.True(t, db.IsSyncReplicationEnabled()) + }) + + t.Run("CurrentState", func(t *testing.T) { + state, err := db1.CurrentState() + require.NoError(t, err) + require.NotNil(t, state, err) + + s, err := db1.Size() + require.NoError(t, err) + require.NotZero(t, s) + + _, err = db1.Set(context.Background(), &schema.SetRequest{ + KVs: []*schema.KeyValue{ + { + Key: []byte("k1"), Value: []byte("v1"), + }, + }, + }) + require.NoError(t, err) + + err = db1.WaitForTx(context.Background(), 1, true) + require.NoError(t, err) + + err = db1.WaitForIndexingUpto(context.Background(), 1) + require.NoError(t, err) + + s1, err := db1.Size() + require.NoError(t, err) + require.Greater(t, s1, s) + + state1, err := db1.CurrentState() + require.NoError(t, err) + require.NotEqual(t, state, state1) + require.True(t, db.IsReplica()) // force db loading + + // calling CurrentState() again should not force db reloading + state2, err := db1.CurrentState() + require.NoError(t, err) + require.Equal(t, state1, state2) + require.False(t, m.IsActive(1)) + }) + + t.Run("copy catalog", func(t *testing.T) { + _, err := db1.CopySQLCatalog(context.Background(), 1) + require.NoError(t, err) + }) + + t.Run("truncate", func(t *testing.T) { + err := db1.TruncateUptoTx(context.Background(), 1) + require.NoError(t, err) + }) + + t.Run("sql", func(t *testing.T) { + params, err := db.InferParameters(context.Background(), nil, "SELECT * FROM table1") + require.ErrorIs(t, err, sql.ErrTableDoesNotExist) + require.Nil(t, params) + + _, err = db.SQLQueryAll(context.Background(), nil, &schema.SQLQueryRequest{Sql: "SELECT * FROM table1"}) + require.ErrorIs(t, err, sql.ErrTableDoesNotExist) + }) + + t.Run("IsLoaded", func(t *testing.T) { + require.True(t, m.IsLoaded(0)) + err = m.Close(0) + require.NoError(t, err) + require.False(t, m.IsLoaded(0)) + }) +} diff --git a/pkg/database/dboptions.go b/pkg/database/dboptions.go index 744453fd2b..66a3669392 100644 --- a/pkg/database/dboptions.go +++ b/pkg/database/dboptions.go @@ -48,8 +48,8 @@ type Options struct { RetentionPeriod time.Duration } -// DefaultOption Initialise Db Optionts to default values -func DefaultOption() *Options { +// DefaultOptions Initialise Db Optionts to default values +func DefaultOptions() *Options { return &Options{ dbRootPath: DefaultDbRootPath, storeOpts: store.DefaultOptions(), diff --git a/pkg/database/dboptions_test.go b/pkg/database/dboptions_test.go index 00874bd473..a65b0206b0 100644 --- a/pkg/database/dboptions_test.go +++ b/pkg/database/dboptions_test.go @@ -25,16 +25,16 @@ import ( ) func TestDefaultOptions(t *testing.T) { - op := DefaultOption().AsReplica(true) + op := DefaultOptions().AsReplica(true) - require.Equal(t, op.GetDBRootPath(), DefaultOption().dbRootPath) - require.Equal(t, op.GetTxPoolSize(), DefaultOption().readTxPoolSize) + require.Equal(t, op.GetDBRootPath(), DefaultOptions().dbRootPath) + require.Equal(t, op.GetTxPoolSize(), DefaultOptions().readTxPoolSize) require.False(t, op.syncReplication) rootpath := "rootpath" storeOpts := store.DefaultOptions() - op = DefaultOption(). + op = DefaultOptions(). WithDBRootPath(rootpath). WithStoreOptions(storeOpts). WithReadTxPoolSize(789). diff --git a/pkg/database/document_database_test.go b/pkg/database/document_database_test.go index 50a750e310..d06de97908 100644 --- a/pkg/database/document_database_test.go +++ b/pkg/database/document_database_test.go @@ -5,7 +5,7 @@ SPDX-License-Identifier: BUSL-1.1 you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://mariadb.com/bsl11/ + https://mariadb.com/bsl11/ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -37,7 +37,7 @@ func makeDocumentDb(t *testing.T) *db { rootPath := t.TempDir() dbName := "doc_test_db" - options := DefaultOption(). + options := DefaultOptions(). WithDBRootPath(rootPath) options.storeOpts.IndexOpts.WithCompactionThld(2) diff --git a/pkg/database/lazy_db.go b/pkg/database/lazy_db.go new file mode 100644 index 0000000000..6d0a74f1da --- /dev/null +++ b/pkg/database/lazy_db.go @@ -0,0 +1,724 @@ +/* +Copyright 2024 Codenotary Inc. All rights reserved. + +SPDX-License-Identifier: BUSL-1.1 +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://mariadb.com/bsl11/ + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package database + +import ( + "context" + "crypto/sha256" + "io" + "path/filepath" + "time" + + "github.com/codenotary/immudb/embedded/document" + "github.com/codenotary/immudb/embedded/sql" + "github.com/codenotary/immudb/pkg/api/protomodel" + "github.com/codenotary/immudb/pkg/api/schema" +) + +type lazyDB struct { + m *DBManager + + idx int +} + +func (db *lazyDB) GetName() string { + return db.m.GetNameByIndex(db.idx) +} + +func (db *lazyDB) GetOptions() *Options { + return db.m.GetOptionsByIndex(db.idx) +} + +func (db *lazyDB) Path() string { + opts := db.GetOptions() + + return filepath.Join(opts.GetDBRootPath(), db.GetName()) +} + +func (db *lazyDB) AsReplica(asReplica, syncReplication bool, syncAcks int) { + d, err := db.m.Get(db.idx) + if err != nil { + db.m.logger.Errorf("%s: AsReplica", err) + return + } + defer db.m.Release(db.idx) + + d.AsReplica(asReplica, syncReplication, syncAcks) +} + +func (db *lazyDB) IsReplica() bool { + d, err := db.m.Get(db.idx) + if err != nil { + db.m.logger.Errorf("%s: IsReplica", err) + return false + } + defer db.m.Release(db.idx) + + return d.IsReplica() +} + +func (db *lazyDB) IsSyncReplicationEnabled() bool { + d, err := db.m.Get(db.idx) + if err != nil { + db.m.logger.Errorf("%s: IsSyncReplicationEnabled", err) + return false + } + defer db.m.Release(db.idx) + + return d.IsSyncReplicationEnabled() +} + +func (db *lazyDB) SetSyncReplication(enabled bool) { + d, err := db.m.Get(db.idx) + if err != nil { + db.m.logger.Errorf("%s: SetSyncReplication", err) + return + } + defer db.m.Release(db.idx) + + d.SetSyncReplication(enabled) +} + +func (db *lazyDB) MaxResultSize() int { + return db.GetOptions().maxResultSize +} + +func (db *lazyDB) Health() (waitingCount int, lastReleaseAt time.Time) { + d, err := db.m.Get(db.idx) + if err != nil { + db.m.logger.Errorf("%s: Health", err) + return + } + defer db.m.Release(db.idx) + + return d.Health() +} + +func (db *lazyDB) CurrentState() (*schema.ImmutableState, error) { + return db.m.GetState(db.idx) +} + +func (db *lazyDB) Size() (uint64, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return 0, err + } + defer db.m.Release(db.idx) + + return d.Size() +} + +func (db *lazyDB) TxCount() (uint64, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return 0, err + } + defer db.m.Release(db.idx) + + return d.TxCount() +} + +func (db *lazyDB) Set(ctx context.Context, req *schema.SetRequest) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.Set(ctx, req) +} + +func (db *lazyDB) VerifiableSet(ctx context.Context, req *schema.VerifiableSetRequest) (*schema.VerifiableTx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableSet(ctx, req) +} + +func (db *lazyDB) Get(ctx context.Context, req *schema.KeyRequest) (*schema.Entry, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.Get(ctx, req) +} + +func (db *lazyDB) VerifiableGet(ctx context.Context, req *schema.VerifiableGetRequest) (*schema.VerifiableEntry, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableGet(ctx, req) +} + +func (db *lazyDB) GetAll(ctx context.Context, req *schema.KeyListRequest) (*schema.Entries, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.GetAll(ctx, req) +} + +func (db *lazyDB) Delete(ctx context.Context, req *schema.DeleteKeysRequest) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.Delete(ctx, req) +} + +func (db *lazyDB) SetReference(ctx context.Context, req *schema.ReferenceRequest) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.SetReference(ctx, req) +} + +func (db *lazyDB) VerifiableSetReference(ctx context.Context, req *schema.VerifiableReferenceRequest) (*schema.VerifiableTx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableSetReference(ctx, req) +} + +func (db *lazyDB) Scan(ctx context.Context, req *schema.ScanRequest) (*schema.Entries, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.Scan(ctx, req) +} + +func (db *lazyDB) History(ctx context.Context, req *schema.HistoryRequest) (*schema.Entries, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.History(ctx, req) +} + +func (db *lazyDB) ExecAll(ctx context.Context, operations *schema.ExecAllRequest) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ExecAll(ctx, operations) +} + +func (db *lazyDB) Count(ctx context.Context, prefix *schema.KeyPrefix) (*schema.EntryCount, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.Count(ctx, prefix) +} + +func (db *lazyDB) CountAll(ctx context.Context) (*schema.EntryCount, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.CountAll(ctx) +} + +func (db *lazyDB) ZAdd(ctx context.Context, req *schema.ZAddRequest) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ZAdd(ctx, req) +} + +func (db *lazyDB) VerifiableZAdd(ctx context.Context, req *schema.VerifiableZAddRequest) (*schema.VerifiableTx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableZAdd(ctx, req) +} + +func (db *lazyDB) ZScan(ctx context.Context, req *schema.ZScanRequest) (*schema.ZEntries, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ZScan(ctx, req) +} + +func (db *lazyDB) NewSQLTx(ctx context.Context, opts *sql.TxOptions) (*sql.SQLTx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.NewSQLTx(ctx, opts) +} + +func (db *lazyDB) SQLExec(ctx context.Context, tx *sql.SQLTx, req *schema.SQLExecRequest) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, nil, err + } + defer db.m.Release(db.idx) + + return d.SQLExec(ctx, tx, req) +} + +func (db *lazyDB) SQLExecPrepared(ctx context.Context, tx *sql.SQLTx, stmts []sql.SQLStmt, params map[string]interface{}) (ntx *sql.SQLTx, ctxs []*sql.SQLTx, err error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, nil, err + } + defer db.m.Release(db.idx) + + return d.SQLExecPrepared(ctx, tx, stmts, params) +} + +func (db *lazyDB) InferParameters(ctx context.Context, tx *sql.SQLTx, sql string) (map[string]sql.SQLValueType, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.InferParameters(ctx, tx, sql) +} + +func (db *lazyDB) InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.SQLStmt) (map[string]sql.SQLValueType, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.InferParametersPrepared(ctx, tx, stmt) +} + +func (db *lazyDB) SQLQuery(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) (sql.RowReader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.SQLQuery(ctx, tx, req) +} + +func (db *lazyDB) SQLQueryAll(ctx context.Context, tx *sql.SQLTx, req *schema.SQLQueryRequest) ([]*sql.Row, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.SQLQueryAll(ctx, tx, req) +} + +func (db *lazyDB) SQLQueryPrepared(ctx context.Context, tx *sql.SQLTx, stmt sql.DataSource, params map[string]interface{}) (sql.RowReader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.SQLQueryPrepared(ctx, tx, stmt, params) +} + +func (db *lazyDB) VerifiableSQLGet(ctx context.Context, req *schema.VerifiableSQLGetRequest) (*schema.VerifiableSQLEntry, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableSQLGet(ctx, req) +} + +func (db *lazyDB) ListTables(ctx context.Context, tx *sql.SQLTx) (*schema.SQLQueryResult, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ListTables(ctx, tx) +} + +func (db *lazyDB) DescribeTable(ctx context.Context, tx *sql.SQLTx, table string) (*schema.SQLQueryResult, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.DescribeTable(ctx, tx, table) +} + +func (db *lazyDB) WaitForTx(ctx context.Context, txID uint64, allowPrecommitted bool) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.WaitForTx(ctx, txID, allowPrecommitted) +} + +func (db *lazyDB) WaitForIndexingUpto(ctx context.Context, txID uint64) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.WaitForIndexingUpto(ctx, txID) +} + +func (db *lazyDB) TxByID(ctx context.Context, req *schema.TxRequest) (*schema.Tx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.TxByID(ctx, req) +} + +func (db *lazyDB) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) (txbs []byte, mayCommitUpToTxID uint64, mayCommitUpToAlh [sha256.Size]byte, err error) { + state, err := db.CurrentState() + if err != nil { + return nil, 0, [sha256.Size]byte{}, err + } + + if !req.AllowPreCommitted { + if req.Tx > state.TxId { + return nil, 0, [sha256.Size]byte{}, io.EOF + } + } + + d, err := db.m.Get(db.idx) + if err != nil { + return nil, 0, [sha256.Size]byte{}, err + } + defer db.m.Release(db.idx) + + return d.ExportTxByID(ctx, req) +} + +func (db *lazyDB) ReplicateTx(ctx context.Context, exportedTx []byte, skipIntegrityCheck bool, waitForIndexing bool) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ReplicateTx(ctx, exportedTx, skipIntegrityCheck, waitForIndexing) +} + +func (db *lazyDB) AllowCommitUpto(txID uint64, alh [sha256.Size]byte) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.AllowCommitUpto(txID, alh) +} + +func (db *lazyDB) DiscardPrecommittedTxsSince(txID uint64) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.DiscardPrecommittedTxsSince(txID) +} + +func (db *lazyDB) VerifiableTxByID(ctx context.Context, req *schema.VerifiableTxRequest) (*schema.VerifiableTx, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.VerifiableTxByID(ctx, req) +} + +func (db *lazyDB) TxScan(ctx context.Context, req *schema.TxScanRequest) (*schema.TxList, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.TxScan(ctx, req) +} + +func (db *lazyDB) FlushIndex(req *schema.FlushIndexRequest) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.FlushIndex(req) +} + +func (db *lazyDB) CompactIndex() error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.CompactIndex() +} + +func (db *lazyDB) IsClosed() bool { + return db.m.IsClosed(db.idx) +} + +func (db *lazyDB) Close() error { + return db.m.Close(db.idx) +} + +// CreateCollection creates a new collection +func (db *lazyDB) CreateCollection(ctx context.Context, username string, req *protomodel.CreateCollectionRequest) (*protomodel.CreateCollectionResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.CreateCollection(ctx, username, req) +} + +// GetCollection returns the collection schema +func (db *lazyDB) GetCollection(ctx context.Context, req *protomodel.GetCollectionRequest) (*protomodel.GetCollectionResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.GetCollection(ctx, req) +} + +func (db *lazyDB) GetCollections(ctx context.Context, req *protomodel.GetCollectionsRequest) (*protomodel.GetCollectionsResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.GetCollections(ctx, req) +} + +func (db *lazyDB) UpdateCollection(ctx context.Context, username string, req *protomodel.UpdateCollectionRequest) (*protomodel.UpdateCollectionResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.UpdateCollection(ctx, username, req) +} + +func (db *lazyDB) DeleteCollection(ctx context.Context, username string, req *protomodel.DeleteCollectionRequest) (*protomodel.DeleteCollectionResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.DeleteCollection(ctx, username, req) +} + +func (db *lazyDB) AddField(ctx context.Context, username string, req *protomodel.AddFieldRequest) (*protomodel.AddFieldResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.AddField(ctx, username, req) +} + +func (db *lazyDB) RemoveField(ctx context.Context, username string, req *protomodel.RemoveFieldRequest) (*protomodel.RemoveFieldResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.RemoveField(ctx, username, req) +} + +func (db *lazyDB) CreateIndex(ctx context.Context, username string, req *protomodel.CreateIndexRequest) (*protomodel.CreateIndexResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.CreateIndex(ctx, username, req) +} + +func (db *lazyDB) DeleteIndex(ctx context.Context, username string, req *protomodel.DeleteIndexRequest) (*protomodel.DeleteIndexResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.DeleteIndex(ctx, username, req) +} + +func (db *lazyDB) InsertDocuments(ctx context.Context, username string, req *protomodel.InsertDocumentsRequest) (*protomodel.InsertDocumentsResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.InsertDocuments(ctx, username, req) +} + +func (db *lazyDB) ReplaceDocuments(ctx context.Context, username string, req *protomodel.ReplaceDocumentsRequest) (*protomodel.ReplaceDocumentsResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ReplaceDocuments(ctx, username, req) +} + +func (db *lazyDB) AuditDocument(ctx context.Context, req *protomodel.AuditDocumentRequest) (*protomodel.AuditDocumentResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.AuditDocument(ctx, req) +} + +func (db *lazyDB) SearchDocuments(ctx context.Context, query *protomodel.Query, offset int64) (document.DocumentReader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.SearchDocuments(ctx, query, offset) +} + +func (db *lazyDB) CountDocuments(ctx context.Context, req *protomodel.CountDocumentsRequest) (*protomodel.CountDocumentsResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.CountDocuments(ctx, req) +} + +func (db *lazyDB) ProofDocument(ctx context.Context, req *protomodel.ProofDocumentRequest) (*protomodel.ProofDocumentResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.ProofDocument(ctx, req) +} + +func (db *lazyDB) DeleteDocuments(ctx context.Context, username string, req *protomodel.DeleteDocumentsRequest) (*protomodel.DeleteDocumentsResponse, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.DeleteDocuments(ctx, username, req) +} + +func (db *lazyDB) FindTruncationPoint(ctx context.Context, until time.Time) (*schema.TxHeader, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return nil, err + } + defer db.m.Release(db.idx) + + return d.FindTruncationPoint(ctx, until) +} + +func (db *lazyDB) CopySQLCatalog(ctx context.Context, txID uint64) (uint64, error) { + d, err := db.m.Get(db.idx) + if err != nil { + return 0, err + } + defer db.m.Release(db.idx) + + return d.CopySQLCatalog(ctx, txID) +} + +func (db *lazyDB) TruncateUptoTx(ctx context.Context, txID uint64) error { + d, err := db.m.Get(db.idx) + if err != nil { + return err + } + defer db.m.Release(db.idx) + + return d.TruncateUptoTx(ctx, txID) +} diff --git a/pkg/database/replica_test.go b/pkg/database/replica_test.go index 91ae1767d9..3fd624ddd0 100644 --- a/pkg/database/replica_test.go +++ b/pkg/database/replica_test.go @@ -31,7 +31,7 @@ import ( func TestReadOnlyReplica(t *testing.T) { rootPath := t.TempDir() - options := DefaultOption().WithDBRootPath(rootPath).AsReplica(true) + options := DefaultOptions().WithDBRootPath(rootPath).AsReplica(true) replica, err := NewDB("db", nil, options, logger.NewSimpleLogger("immudb ", os.Stderr)) require.NoError(t, err) @@ -97,7 +97,7 @@ func TestReadOnlyReplica(t *testing.T) { func TestSwitchToReplica(t *testing.T) { rootPath := t.TempDir() - options := DefaultOption().WithDBRootPath(rootPath).AsReplica(false) + options := DefaultOptions().WithDBRootPath(rootPath).AsReplica(false) replica := makeDbWith(t, "db", options) diff --git a/pkg/database/scan_test.go b/pkg/database/scan_test.go index a61612fe40..4b6b01d074 100644 --- a/pkg/database/scan_test.go +++ b/pkg/database/scan_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/embedded/store" "github.com/codenotary/immudb/pkg/api/schema" "github.com/stretchr/testify/require" @@ -279,7 +280,7 @@ func TestStoreScanWithTruncation(t *testing.T) { fileSize := 8 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 1 options.storeOpts.MaxConcurrency = 500 @@ -304,7 +305,7 @@ func TestStoreScanWithTruncation(t *testing.T) { deletePointTx := uint64(5) t.Run("ensure data is truncated until the deletion point", func(t *testing.T) { - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), deletePointTx)) for i := deletePointTx; i < 10; i++ { diff --git a/pkg/database/sql.go b/pkg/database/sql.go index 36f024f4c5..8ded2b4747 100644 --- a/pkg/database/sql.go +++ b/pkg/database/sql.go @@ -427,6 +427,33 @@ func (d *db) InferParametersPrepared(ctx context.Context, tx *sql.SQLTx, stmt sq return d.sqlEngine.InferParametersPreparedStmts(ctx, tx, []sql.SQLStmt{stmt}) } +func (d *db) CopySQLCatalog(ctx context.Context, txID uint64) (uint64, error) { + // copy sql catalogue + tx, err := d.st.NewTx(ctx, store.DefaultTxOptions()) + if err != nil { + return 0, err + } + + err = d.CopyCatalogToTx(ctx, tx) + if err != nil { + d.Logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=sql_catalogue_copy}", d.name, err, txID) + return 0, err + } + defer tx.Cancel() + + // setting the metadata to record the transaction upto which the log was truncated + tx.WithMetadata(store.NewTxMetadata().WithTruncatedTxID(txID)) + + tx.RequireMVCCOnFollowingTxs(true) + + // commit catalogue as a new transaction + hdr, err := tx.Commit(ctx) + if err != nil { + return 0, err + } + return hdr.ID, nil +} + type limitRowReader struct { sql.RowReader nRead int diff --git a/pkg/database/truncator.go b/pkg/database/truncator.go index 9183b180f5..47de4cc2b7 100644 --- a/pkg/database/truncator.go +++ b/pkg/database/truncator.go @@ -21,7 +21,8 @@ import ( "errors" "time" - "github.com/codenotary/immudb/embedded/store" + "github.com/codenotary/immudb/embedded/logger" + "github.com/codenotary/immudb/pkg/api/schema" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -38,77 +39,34 @@ type Truncator interface { // When resulting transaction before specified time does not exists // * No transaction header is returned. // * Returns nil TxHeader, and an error. - Plan(ctx context.Context, truncationUntil time.Time) (*store.TxHeader, error) + Plan(ctx context.Context, truncationUntil time.Time) (*schema.TxHeader, error) // TruncateUptoTx runs truncation against the relevant appendable logs. Must // be called after result of Plan(). TruncateUptoTx(ctx context.Context, txID uint64) error } -func NewVlogTruncator(d DB) Truncator { +func NewVlogTruncator(db DB, log logger.Logger) Truncator { return &vlogTruncator{ - db: d.(*db), - metrics: newTruncatorMetrics(d.GetName()), + db: db, + metrics: newTruncatorMetrics(db.GetName()), + logger: log, } } // vlogTruncator implements Truncator for the value-log appendable type vlogTruncator struct { - db *db + db DB metrics *truncatorMetrics + logger logger.Logger } // Plan returns the transaction upto which the value log can be truncated. // When resulting transaction before specified time does not exists // - No transaction header is returned. // - Returns nil TxHeader, and an error. -func (v *vlogTruncator) Plan(ctx context.Context, truncationUntil time.Time) (*store.TxHeader, error) { - hdr, err := v.db.st.LastTxUntil(truncationUntil) - if errors.Is(err, store.ErrTxNotFound) { - return nil, ErrRetentionPeriodNotReached - } - if err != nil { - return nil, err - } - - // look for the newst transaction with entries - for err == nil { - if hdr.NEntries > 0 { - break - } - - if ctx.Err() != nil { - return nil, err - } - - hdr, err = v.db.st.ReadTxHeader(hdr.ID-1, false, false) - } - - return hdr, err -} - -// commitCatalog commits the current sql catalogue as a new transaction. -func (v *vlogTruncator) commitCatalog(ctx context.Context, txID uint64) (*store.TxHeader, error) { - // copy sql catalogue - tx, err := v.db.st.NewTx(ctx, store.DefaultTxOptions()) - if err != nil { - return nil, err - } - - err = v.db.CopyCatalogToTx(ctx, tx) - if err != nil { - v.db.Logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=sql_catalogue_copy}", v.db.name, err, txID) - return nil, err - } - defer tx.Cancel() - - // setting the metadata to record the transaction upto which the log was truncated - tx.WithMetadata(store.NewTxMetadata().WithTruncatedTxID(txID)) - - tx.RequireMVCCOnFollowingTxs(true) - - // commit catalogue as a new transaction - return tx.Commit(ctx) +func (v *vlogTruncator) Plan(ctx context.Context, truncationUntil time.Time) (*schema.TxHeader, error) { + return v.db.FindTruncationPoint(ctx, truncationUntil) } // TruncateUpTo runs truncation against the relevant appendable logs upto the specified transaction offset. @@ -118,22 +76,21 @@ func (v *vlogTruncator) TruncateUptoTx(ctx context.Context, txID uint64) error { v.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - v.db.Logger.Infof("copying sql catalog before truncation for database '%s' at tx %d", v.db.name, txID) + v.logger.Infof("copying sql catalog before truncation for database '%s' at tx %d", v.db.GetName(), txID) // copy sql catalogue - sqlCommitHdr, err := v.commitCatalog(ctx, txID) + sqlCatalogTxID, err := v.db.CopySQLCatalog(ctx, txID) if err != nil { - v.db.Logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=sql_catalogue_commit}", v.db.name, err, txID) + v.logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=sql_catalogue_commit}", v.db.GetName(), err, txID) return err } - v.db.Logger.Infof("committed sql catalog before truncation for database '%s' at tx %d", v.db.name, sqlCommitHdr.ID) + v.logger.Infof("committed sql catalog before truncation for database '%s' at tx %d", v.db.GetName(), sqlCatalogTxID) // truncate upto txID - err = v.db.st.TruncateUptoTx(txID) + err = v.db.TruncateUptoTx(ctx, txID) if err != nil { - v.db.Logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=truncate_upto}", v.db.name, err, txID) + v.logger.Errorf("error during truncation for database '%s' {err = %v, id = %v, type=truncate_upto}", v.db.GetName(), err, txID) } - return err } diff --git a/pkg/database/truncator_test.go b/pkg/database/truncator_test.go index 6861de64d0..3a7554c058 100644 --- a/pkg/database/truncator_test.go +++ b/pkg/database/truncator_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/embedded/store" "github.com/codenotary/immudb/pkg/api/protomodel" "github.com/codenotary/immudb/pkg/api/schema" @@ -76,7 +77,7 @@ func Test_vlogCompactor_WithMultipleIO(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 5 options.storeOpts.MaxConcurrency = 500 @@ -99,7 +100,7 @@ func Test_vlogCompactor_WithMultipleIO(t *testing.T) { hdr, err := db.st.ReadTxHeader(deletePointTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -122,7 +123,7 @@ func Test_vlogCompactor_WithSingleIO(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 1 options.storeOpts.MaxConcurrency = 500 @@ -145,7 +146,7 @@ func Test_vlogCompactor_WithSingleIO(t *testing.T) { hdr, err := db.st.ReadTxHeader(deletePointTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -181,7 +182,7 @@ func Test_vlogCompactor_WithConcurrentWritersOnSingleIO(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 1 options.storeOpts.MaxConcurrency = 500 @@ -216,7 +217,7 @@ func Test_vlogCompactor_WithConcurrentWritersOnSingleIO(t *testing.T) { hdr, err := db.st.ReadTxHeader(deletePointTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -276,7 +277,7 @@ func Test_vlogCompactor_Plan(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 1 options.storeOpts.VLogCacheSize = 0 @@ -296,7 +297,7 @@ func Test_vlogCompactor_Plan(t *testing.T) { } } - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) hdr, err := c.Plan(context.Background(), queryTime) require.NoError(t, err) @@ -306,7 +307,7 @@ func Test_vlogCompactor_Plan(t *testing.T) { func setupCommonTest(t *testing.T) *db { rootPath := t.TempDir() - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(1024) options.storeOpts.VLogCacheSize = 0 options.storeOpts.EmbeddedValues = false @@ -361,7 +362,7 @@ func Test_vlogCompactor_with_sql(t *testing.T) { hdr, err := db.st.ReadTxHeader(deleteUptoTx.Id, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -428,7 +429,7 @@ func Test_vlogCompactor_without_data(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithIndexOptions(options.storeOpts.IndexOpts.WithCompactionThld(2)).WithFileSize(fileSize) options.storeOpts.MaxIOConcurrency = 1 options.storeOpts.VLogCacheSize = 0 @@ -443,7 +444,7 @@ func Test_vlogCompactor_without_data(t *testing.T) { hdr, err := db.st.ReadTxHeader(deletePointTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -505,7 +506,7 @@ func Test_vlogCompactor_with_multiple_truncates(t *testing.T) { hdr, err := db.st.ReadTxHeader(lastCommitTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -537,7 +538,7 @@ func Test_vlogCompactor_with_multiple_truncates(t *testing.T) { hdr, err := db.st.ReadTxHeader(deleteUptoTx.Id, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -562,7 +563,7 @@ func Test_vlogCompactor_for_read_conflict(t *testing.T) { fileSize := 1024 - options := DefaultOption().WithDBRootPath(rootPath) + options := DefaultOptions().WithDBRootPath(rootPath) options.storeOpts.WithFileSize(fileSize) options.storeOpts.VLogCacheSize = 0 @@ -605,7 +606,7 @@ func Test_vlogCompactor_for_read_conflict(t *testing.T) { hdr, err := db.st.ReadTxHeader(deletePointTx, false, false) require.NoError(t, err) - c := NewVlogTruncator(db) + c := NewVlogTruncator(db, logger.NewMemoryLogger()) require.NoError(t, c.TruncateUptoTx(context.Background(), hdr.ID)) @@ -669,7 +670,7 @@ func Test_vlogCompactor_with_document_store(t *testing.T) { hdr, err := db.st.ReadTxHeader(lastCommitTx, false, false) require.NoError(t, err) - err = NewVlogTruncator(db).TruncateUptoTx(context.Background(), hdr.ID) + err = NewVlogTruncator(db, logger.NewMemoryLogger()).TruncateUptoTx(context.Background(), hdr.ID) require.NoError(t, err) // should add two extra transaction with catalogue @@ -724,7 +725,7 @@ func Test_vlogCompactor_with_document_store(t *testing.T) { hdr, err := db.st.ReadTxHeader(lastCommitTx, false, false) require.NoError(t, err) - err = NewVlogTruncator(db).TruncateUptoTx(context.Background(), hdr.ID) + err = NewVlogTruncator(db, logger.NewMemoryLogger()).TruncateUptoTx(context.Background(), hdr.ID) require.NoError(t, err) // should add an extra transaction with catalogue diff --git a/pkg/database/types.go b/pkg/database/types.go index 2c1e41c7f5..0c3c4642d8 100644 --- a/pkg/database/types.go +++ b/pkg/database/types.go @@ -16,118 +16,87 @@ limitations under the License. package database -import ( - "sync" -) +import "context" // DatabaseList interface type DatabaseList interface { - Put(database DB) + Put(name string, opts *Options) DB + PutClosed(name string, opts *Options) DB Delete(string) (DB, error) GetByIndex(index int) (DB, error) GetByName(string) (DB, error) GetId(dbname string) int Length() int + Resize(n int) + CloseAll(ctx context.Context) error } type databaseList struct { - databases []DB - databaseByName map[string]*dbRef - sync.RWMutex -} - -type dbRef struct { - index int - deleted bool + m *DBManager } // NewDatabaseList constructs a new database list -func NewDatabaseList() DatabaseList { +func NewDatabaseList(m *DBManager) DatabaseList { return &databaseList{ - databases: make([]DB, 0), - databaseByName: make(map[string]*dbRef), + m: m, } } -func (d *databaseList) Put(database DB) { - d.Lock() - defer d.Unlock() - - ref, exists := d.databaseByName[database.GetName()] - if exists { - d.databases[ref.index] = database - ref.deleted = false - return - } +func (d *databaseList) Put(dbName string, opts *Options) DB { + return d.put(dbName, opts, false) +} - d.databases = append(d.databases, database) - d.databaseByName[database.GetName()] = &dbRef{index: len(d.databases) - 1} +func (d *databaseList) PutClosed(dbName string, opts *Options) DB { + return d.put(dbName, opts, true) } -func (d *databaseList) Delete(dbname string) (DB, error) { - d.Lock() - defer d.Unlock() +func (d *databaseList) put(dbName string, opts *Options, closed bool) DB { + var newOpts Options = *opts - dbRef, exists := d.databaseByName[dbname] - if !exists || dbRef.deleted { - return nil, ErrDatabaseNotExists - } - - db := d.databases[dbRef.index] + idx := d.m.Put(dbName, &newOpts, closed) - if !db.IsClosed() { - return nil, ErrCannotDeleteAnOpenDatabase + return &lazyDB{ + m: d.m, + idx: idx, } - - dbRef.deleted = true - - return db, nil } -func (d *databaseList) GetByIndex(index int) (DB, error) { - d.RLock() - defer d.RUnlock() - - if index < 0 || index >= len(d.databases) { - return nil, ErrDatabaseNotExists +func (d *databaseList) Delete(dbname string) (DB, error) { + if err := d.m.Delete(dbname); err != nil { + return nil, err } + idx := d.m.GetIndexByName(dbname) + return &lazyDB{m: d.m, idx: idx}, nil +} - db := d.databases[index] - - dbRef := d.databaseByName[db.GetName()] - if dbRef.deleted { +func (d *databaseList) GetByIndex(index int) (DB, error) { + if !d.m.HasIndex(index) { return nil, ErrDatabaseNotExists } - - return db, nil + return &lazyDB{m: d.m, idx: index}, nil } func (d *databaseList) GetByName(dbname string) (DB, error) { - d.RLock() - defer d.RUnlock() - - if dbRef, ok := d.databaseByName[dbname]; !ok || dbRef.deleted { + idx := d.m.GetIndexByName(dbname) + if idx < 0 { return nil, ErrDatabaseNotExists } - - return d.databases[d.databaseByName[dbname].index], nil + return &lazyDB{m: d.m, idx: idx}, nil } func (d *databaseList) Length() int { - d.RLock() - defer d.RUnlock() - - return len(d.databases) + return d.m.Length() } // GetById returns the database id number. -1 if database is not present func (d *databaseList) GetId(dbname string) int { - d.RLock() - defer d.RUnlock() + return d.m.GetIndexByName(dbname) +} - if dbRef, ok := d.databaseByName[dbname]; ok && !dbRef.deleted { - return dbRef.index - } +func (d *databaseList) CloseAll(ctx context.Context) error { + return d.m.CloseAll(ctx) +} - return -1 +func (d *databaseList) Resize(n int) { + d.m.Resize(n) } diff --git a/pkg/integration/follower_replication_test.go b/pkg/integration/follower_replication_test.go index 8a7c3eb46d..155d2c803f 100644 --- a/pkg/integration/follower_replication_test.go +++ b/pkg/integration/follower_replication_test.go @@ -350,9 +350,9 @@ func TestAsyncReplication(t *testing.T) { err = primaryClient.CloseSession(context.Background()) require.NoError(t, err) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) - //keys should exist in replicadb@replica" + // keys should exist in replicadb@replica" for i := 0; i < keyCount; i++ { ei := keyCount + i @@ -511,7 +511,7 @@ func TestReplicationTxDiscarding(t *testing.T) { _, err = primaryClient.Set(context.Background(), []byte("key11"), []byte("value11")) require.NoError(t, err) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) t.Run("key1 should exist in replicadb@replica", func(t *testing.T) { _, err = replicaClient.Get(context.Background(), []byte("key11")) @@ -624,7 +624,7 @@ func TestSystemDBAndDefaultDBReplication(t *testing.T) { _, err = primaryClient.Set(context.Background(), []byte("key1"), []byte("value1")) require.NoError(t, err) - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) t.Run("key1 should exist in replicateddb@replica", func(t *testing.T) { _, err = replicaClient.Get(context.Background(), []byte("key1")) diff --git a/pkg/replication/replicator.go b/pkg/replication/replicator.go index 470e25e7b8..50853d6634 100644 --- a/pkg/replication/replicator.go +++ b/pkg/replication/replicator.go @@ -458,7 +458,6 @@ func (txr *TxReplicator) fetchNextTx() error { } txr.lastTx++ } - return nil } diff --git a/pkg/replication/replicator_test.go b/pkg/replication/replicator_test.go index 9a4d7ce0a4..0e6c906bac 100644 --- a/pkg/replication/replicator_test.go +++ b/pkg/replication/replicator_test.go @@ -43,7 +43,7 @@ func TestReplication(t *testing.T) { logger := logger.NewSimpleLogger("logger", os.Stdout) - db, err := database.NewDB("replicated_defaultdb", nil, database.DefaultOption().AsReplica(true).WithDBRootPath(path), logger) + db, err := database.NewDB("replicated_defaultdb", nil, database.DefaultOptions().AsReplica(true).WithDBRootPath(path), logger) require.NoError(t, err) txReplicator, err := NewTxReplicator(xid.New(), db, rOpts, logger) @@ -80,7 +80,7 @@ func TestReplicationIsAbortedOnServerVersionMismatch(t *testing.T) { logger := logger.NewSimpleLogger("logger", os.Stdout) - db, err := database.NewDB("replicated_defaultdb", nil, database.DefaultOption().AsReplica(true).WithDBRootPath(path), logger) + db, err := database.NewDB("replicated_defaultdb", nil, database.DefaultOptions().AsReplica(true).WithDBRootPath(path), logger) require.NoError(t, err) txReplicator, err := NewTxReplicator(xid.New(), db, rOpts, logger) diff --git a/pkg/server/db_dummy_closed_test.go b/pkg/server/db_dummy_closed_test.go index ee788d10d7..51d5f0eb34 100644 --- a/pkg/server/db_dummy_closed_test.go +++ b/pkg/server/db_dummy_closed_test.go @@ -27,7 +27,7 @@ import ( ) func TestDummyClosedDatabase(t *testing.T) { - cdb := &closedDB{name: "closeddb1", opts: database.DefaultOption()} + cdb := &closedDB{name: "closeddb1", opts: database.DefaultOptions()} require.Equal(t, "data/closeddb1", cdb.Path()) diff --git a/pkg/server/db_options.go b/pkg/server/db_options.go index ab25dd0f7c..d89877165a 100644 --- a/pkg/server/db_options.go +++ b/pkg/server/db_options.go @@ -229,9 +229,9 @@ func (s *ImmuServer) defaultAHTOptions() *ahtOptions { } func (s *ImmuServer) databaseOptionsFrom(opts *dbOptions) *database.Options { - return database.DefaultOption(). + return database.DefaultOptions(). WithDBRootPath(s.Options.Dir). - WithStoreOptions(s.storeOptionsForDB(opts.Database, s.remoteStorage, opts.storeOptions(s))). + WithStoreOptions(s.storeOptionsForDB(opts.Database, s.remoteStorage, opts.storeOptions())). AsReplica(opts.Replica). WithSyncReplication(opts.SyncReplication). WithSyncAcks(opts.SyncAcks). @@ -241,7 +241,7 @@ func (s *ImmuServer) databaseOptionsFrom(opts *dbOptions) *database.Options { WithMaxResultSize(s.Options.MaxResultSize) } -func (opts *dbOptions) storeOptions(s *ImmuServer) *store.Options { +func (opts *dbOptions) storeOptions() *store.Options { indexOpts := store.DefaultIndexOptions() if opts.IndexOptions != nil { @@ -263,10 +263,6 @@ func (opts *dbOptions) storeOptions(s *ImmuServer) *store.Options { WithBulkPreparationTimeout(time.Millisecond * time.Duration(opts.IndexOptions.BulkPreparationTimeout)) } - if s != nil { - indexOpts.WithCacheFactoryFunc(s.indexCacheFactoryFunc()) - } - ahtOpts := store.DefaultAHTOptions() if opts.AHTOptions != nil { @@ -823,7 +819,7 @@ func (opts *dbOptions) Validate() error { ErrIllegalArguments, opts.Database, store.MinimumTruncationFrequency.Hours()) } - return opts.storeOptions(nil).Validate() + return opts.storeOptions().Validate() } func (opts *dbOptions) isReplicatorRequired() bool { diff --git a/pkg/server/metrics_funcs_test.go b/pkg/server/metrics_funcs_test.go index de34915d15..027a97dbb0 100644 --- a/pkg/server/metrics_funcs_test.go +++ b/pkg/server/metrics_funcs_test.go @@ -50,7 +50,7 @@ func (dbm dbMock) GetOptions() *database.Options { if dbm.getOptionsF != nil { return dbm.getOptionsF() } - return database.DefaultOption() + return database.DefaultOptions() } func (dbm dbMock) GetName() string { @@ -61,7 +61,6 @@ func (dbm dbMock) GetName() string { } func TestMetricFuncComputeDBEntries(t *testing.T) { - currentStateSuccessfulOnce := func(callCounter *int) (*schema.ImmutableState, error) { *callCounter++ if *callCounter == 1 { @@ -73,12 +72,15 @@ func TestMetricFuncComputeDBEntries(t *testing.T) { } currentStateCounter := 0 - dbList := database.NewDatabaseList() - dbList.Put(dbMock{ - currentStateF: func() (*schema.ImmutableState, error) { - return currentStateSuccessfulOnce(¤tStateCounter) - }, - }) + dbList := database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + return &dbMock{ + currentStateF: func() (*schema.ImmutableState, error) { + return currentStateSuccessfulOnce(¤tStateCounter) + }, + }, nil + }, 100, logger.NewMemoryLogger())) + + dbList.Put("test", database.DefaultOptions()) currentStateCountersysDB := 0 sysDB := dbMock{ @@ -86,7 +88,7 @@ func TestMetricFuncComputeDBEntries(t *testing.T) { return "systemdb" }, getOptionsF: func() *database.Options { - return database.DefaultOption() + return database.DefaultOptions() }, currentStateF: func() (*schema.ImmutableState, error) { return currentStateSuccessfulOnce(¤tStateCountersysDB) @@ -135,15 +137,17 @@ func TestMetricFuncComputeDBSizes(t *testing.T) { defer file.Close() //<-- - dbList := database.NewDatabaseList() - dbList.Put(dbMock{ - getNameF: func() string { - return "defaultdb" - }, - getOptionsF: func() *database.Options { - return database.DefaultOption() - }, - }) + dbList := database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + return &dbMock{ + getNameF: func() string { + return "defaultdb" + }, + getOptionsF: func() *database.Options { + return database.DefaultOptions() + }, + }, nil + }, 100, logger.NewMemoryLogger())) + dbList.Put("test", database.DefaultOptions()) s := ImmuServer{ Options: &Options{ @@ -153,7 +157,7 @@ func TestMetricFuncComputeDBSizes(t *testing.T) { dbList: dbList, sysDB: dbMock{ getOptionsF: func() *database.Options { - return database.DefaultOption() + return database.DefaultOptions() }, }, } @@ -177,15 +181,20 @@ func TestMetricFuncComputeDBSizes(t *testing.T) { } func TestMetricFuncComputeLoadedDBSize(t *testing.T) { - dbList := database.NewDatabaseList() - dbList.Put(dbMock{ - getNameF: func() string { - return "defaultdb" - }, - getOptionsF: func() *database.Options { - return database.DefaultOption() - }, - }) + dbList := database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + db := dbMock{ + getNameF: func() string { + return name + }, + getOptionsF: func() *database.Options { + return opts + }, + } + return db, nil + }, 10, logger.NewMemoryLogger())) + + dbList.Put("defaultdb", database.DefaultOptions()) + var sw strings.Builder s := ImmuServer{ Options: &Options{ @@ -194,7 +203,7 @@ func TestMetricFuncComputeLoadedDBSize(t *testing.T) { dbList: dbList, sysDB: dbMock{ getOptionsF: func() *database.Options { - return database.DefaultOption() + return database.DefaultOptions() }, }, Logger: logger.NewSimpleLoggerWithLevel( @@ -202,6 +211,6 @@ func TestMetricFuncComputeLoadedDBSize(t *testing.T) { &sw, logger.LogError), } - require.Equal(t,s.metricFuncComputeLoadedDBSize(),1.0) - require.Equal(t,s.metricFuncComputeSessionCount(),0.0) + require.Equal(t, s.metricFuncComputeLoadedDBSize(), 1.0) + require.Equal(t, s.metricFuncComputeSessionCount(), 0.0) } diff --git a/pkg/server/options.go b/pkg/server/options.go index 9de03c97ba..62c3d3cb8a 100644 --- a/pkg/server/options.go +++ b/pkg/server/options.go @@ -83,7 +83,7 @@ type Options struct { GRPCReflectionServerEnabled bool SwaggerUIEnabled bool LogRequestMetadata bool - SharedIndexCacheSize int + MaxActiveDatabases int } type RemoteStorageOptions struct { @@ -157,7 +157,7 @@ func DefaultOptions() *Options { LogRequestMetadata: false, LogDir: "immulog", LogAccess: false, - SharedIndexCacheSize: 1 << 27, // 128MB + MaxActiveDatabases: 100, } } @@ -537,6 +537,11 @@ func (o *Options) WithLogRequestMetadata(enabled bool) *Options { return o } +func (o *Options) WithMaxActiveDatabases(n int) *Options { + o.MaxActiveDatabases = n + return o +} + // RemoteStorageOptions func (opts *RemoteStorageOptions) WithS3Storage(S3Storage bool) *RemoteStorageOptions { diff --git a/pkg/server/remote_storage.go b/pkg/server/remote_storage.go index a036d394c4..36795a10dc 100644 --- a/pkg/server/remote_storage.go +++ b/pkg/server/remote_storage.go @@ -27,7 +27,6 @@ import ( "github.com/codenotary/immudb/embedded/appendable" "github.com/codenotary/immudb/embedded/appendable/multiapp" "github.com/codenotary/immudb/embedded/appendable/remoteapp" - "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/remotestorage" "github.com/codenotary/immudb/embedded/remotestorage/s3" "github.com/codenotary/immudb/embedded/store" @@ -159,16 +158,6 @@ func (s *ImmuServer) updateRemoteUUID(remoteStorage remotestorage.Storage) error return remoteStorage.Put(ctx, IDENTIFIER_FNAME, filepath.Join(s.Options.Dir, IDENTIFIER_FNAME)) } -func (s *ImmuServer) indexCacheFactoryFunc() store.IndexCacheFactoryFunc { - if s.indexCacheFunc == nil { - c, _ := cache.NewCache(s.Options.SharedIndexCacheSize) - s.indexCacheFunc = func() *cache.Cache { - return c - } - } - return s.indexCacheFunc -} - func (s *ImmuServer) storeOptionsForDB(name string, remoteStorage remotestorage.Storage, stOpts *store.Options) *store.Options { if remoteStorage != nil { stOpts.WithAppFactory(func(rootPath, subPath string, opts *multiapp.Options) (appendable.Appendable, error) { diff --git a/pkg/server/remote_storage_test.go b/pkg/server/remote_storage_test.go index d3918de1a0..a018616f1c 100644 --- a/pkg/server/remote_storage_test.go +++ b/pkg/server/remote_storage_test.go @@ -30,6 +30,7 @@ import ( "github.com/codenotary/immudb/embedded/remotestorage/memory" "github.com/codenotary/immudb/embedded/remotestorage/s3" "github.com/codenotary/immudb/embedded/store" + "github.com/codenotary/immudb/embedded/tbtree" "github.com/codenotary/immudb/pkg/api/schema" "github.com/codenotary/immudb/pkg/auth" "github.com/rs/xid" @@ -578,6 +579,17 @@ func TestRemoteStorageUsedForNewDB(t *testing.T) { _, err = s.CreateDatabaseWith(ctx, newdb) require.NoError(t, err) + + // force db loading + repl, err := s.UseDatabase(ctx, &schema.Database{DatabaseName: "newdb"}) + require.NoError(t, err) + + md = metadata.Pairs("authorization", repl.Token) + ctx = metadata.NewIncomingContext(context.Background(), md) + + _, err = s.Get(ctx, &schema.KeyRequest{Key: []byte("test-key")}) + require.ErrorIs(t, err, tbtree.ErrKeyNotFound) + err = s.CloseDatabases() require.NoError(t, err) diff --git a/pkg/server/server.go b/pkg/server/server.go index 1a78c5d481..1c53434b73 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -25,6 +25,7 @@ import ( "net" "os" "os/signal" + "path" "path/filepath" "strings" "syscall" @@ -133,6 +134,9 @@ func (s *ImmuServer) Initialize() error { return logErr(s.Logger, "unable to initialize remote storage: %v", err) } + // NOTE: MaxActiveDatabases might have changed since the server instance was created + s.dbList.Resize(s.Options.MaxActiveDatabases) + if err = s.loadSystemDatabase(dataDir, s.remoteStorage, adminPassword, s.Options.ForceAdminPassword); err != nil { return logErr(s.Logger, "unable to load system database: %v", err) } @@ -564,11 +568,7 @@ func (s *ImmuServer) loadDefaultDatabase(dataDir string, remoteStorage remotesto _, err = s.OS.Stat(defaultDbRootDir) if err == nil { - db, err := database.OpenDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger) - if err != nil { - s.Logger.Errorf("database '%s' was not correctly initialized.\n"+"Use replication to recover from external source or start without data folder.", dbOpts.Database) - return err - } + db := s.dbList.Put(dbOpts.Database, s.databaseOptionsFrom(dbOpts)) if dbOpts.isReplicatorRequired() { err = s.startReplicationFor(db, dbOpts) @@ -577,8 +577,6 @@ func (s *ImmuServer) loadDefaultDatabase(dataDir string, remoteStorage remotesto } } - s.dbList.Put(db) - return nil } @@ -586,10 +584,10 @@ func (s *ImmuServer) loadDefaultDatabase(dataDir string, remoteStorage remotesto return err } - db, err := database.NewDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger) - if err != nil { - return err - } + opts := s.databaseOptionsFrom(dbOpts) + os.MkdirAll(path.Join(opts.GetDBRootPath(), dbOpts.Database), os.ModePerm) + + db := s.dbList.Put(dbOpts.Database, opts) if dbOpts.isReplicatorRequired() { err = s.startReplicationFor(db, dbOpts) @@ -598,8 +596,6 @@ func (s *ImmuServer) loadDefaultDatabase(dataDir string, remoteStorage remotesto } } - s.dbList.Put(db) - return nil } @@ -623,7 +619,7 @@ func (s *ImmuServer) loadUserDatabases(dataDir string, remoteStorage remotestora dirs = append(dirs, f.Name()) } - //load databases that are inside each directory + // load databases that are inside each directory for _, val := range dirs { //dbname is the directory name where it is stored //path iteration above stores the directories as data/db_name @@ -635,18 +631,14 @@ func (s *ImmuServer) loadUserDatabases(dataDir string, remoteStorage remotestora return err } - if !dbOpts.Autoload.isEnabled() { - s.Logger.Infof("database '%s' is closed (autoload is disabled)", dbname) - s.dbList.Put(&closedDB{name: dbname, opts: s.databaseOptionsFrom(dbOpts)}) - continue - } - s.logDBOptions(dbname, dbOpts) - db, err := database.OpenDB(dbname, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger) - if err != nil { - s.Logger.Errorf("database '%s' could not be loaded. Reason: %v", dbname, err) - s.dbList.Put(&closedDB{name: dbname, opts: s.databaseOptionsFrom(dbOpts)}) + var db database.DB + if dbOpts.Autoload.isEnabled() { + db = s.dbList.Put(dbname, s.databaseOptionsFrom(dbOpts)) + } else { + s.Logger.Infof("database '%s' is closed (autoload is disabled)", dbname) + s.dbList.PutClosed(dbname, s.databaseOptionsFrom(dbOpts)) continue } @@ -663,8 +655,6 @@ func (s *ImmuServer) loadUserDatabases(dataDir string, remoteStorage remotestora s.Logger.Errorf("error starting truncation for database '%s'. Reason: %v", db.GetName(), err) } } - - s.dbList.Put(db) } return nil @@ -774,17 +764,16 @@ func (s *ImmuServer) Stop() error { // CloseDatabases closes all opened databases including the consinstency checker func (s *ImmuServer) CloseDatabases() error { - for i := 0; i < s.dbList.Length(); i++ { - val, err := s.dbList.GetByIndex(i) - if err == nil { - val.Close() - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + if err := s.dbList.CloseAll(ctx); err != nil { + return err } if s.sysDB != nil { s.sysDB.Close() } - return nil } @@ -808,7 +797,7 @@ func (s *ImmuServer) updateConfigItem(key string, newOrUpdatedLine string, uncha if strings.HasPrefix(l, key+"=") || strings.HasPrefix(l, key+" =") { kv := strings.Split(l, "=") if unchanged(kv[1]) { - return fmt.Errorf("Server config already has '%s'", newOrUpdatedLine) + return fmt.Errorf("server config already has '%s'", newOrUpdatedLine) } configLines[i] = newOrUpdatedLine write = true @@ -872,11 +861,11 @@ func (s *ImmuServer) numTransactions() (uint64, error) { return 0, err } - dbTxCount, err := db.TxCount() + state, err := db.CurrentState() if err != nil { return 0, err } - count += dbTxCount + count += state.TxId } return count, nil } @@ -895,7 +884,8 @@ func (s *ImmuServer) totalDBSize() (int64, error) { return -1, err } - dbSize, err := db.Size() + dbName := db.GetName() + dbSize, err := dirSize(filepath.Join(s.Options.Dir, dbName)) if err != nil { return -1, err } @@ -1031,12 +1021,10 @@ func (s *ImmuServer) CreateDatabaseV2(ctx context.Context, req *schema.CreateDat return nil, err } - db, err := database.NewDB(dbOpts.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger) - if err != nil { - return nil, err - } + opts := s.databaseOptionsFrom(dbOpts) + os.MkdirAll(path.Join(opts.GetDBRootPath(), dbOpts.Database), os.ModePerm) + db := s.dbList.Put(dbOpts.Database, s.databaseOptionsFrom(dbOpts)) - s.dbList.Put(db) s.multidbmode = true s.logDBOptions(db.GetName(), dbOpts) @@ -1111,12 +1099,7 @@ func (s *ImmuServer) LoadDatabase(ctx context.Context, req *schema.LoadDatabaseR return nil, fmt.Errorf("%w: while loading database settings", err) } - db, err = database.OpenDB(req.Database, s.multidbHandler(), s.databaseOptionsFrom(dbOpts), s.Logger) - if err != nil { - return nil, fmt.Errorf("%w: while opening database", err) - } - - s.dbList.Put(db) + s.dbList.Put(req.Database, s.databaseOptionsFrom(dbOpts)) if dbOpts.isReplicatorRequired() { err = s.startReplicationFor(db, dbOpts) @@ -1466,11 +1449,13 @@ func (s *ImmuServer) DatabaseList(ctx context.Context, _ *empty.Empty) (*schema. } resp := &schema.DatabaseListResponse{} - - for _, db := range dbsWithSettings.Databases { - resp.Databases = append(resp.Databases, &schema.Database{DatabaseName: db.Name}) + if len(dbsWithSettings.Databases) > 0 { + resp.Databases = make([]*schema.Database, len(dbsWithSettings.Databases)) } + for i, db := range dbsWithSettings.Databases { + resp.Databases[i] = &schema.Database{DatabaseName: db.Name} + } return resp, nil } @@ -1488,17 +1473,19 @@ func (s *ImmuServer) DatabaseListV2(ctx context.Context, req *schema.DatabaseLis } for _, db := range databases { - dbOpts, err := s.loadDBOptions(db.GetName(), false) + dbName := db.GetName() + + dbOpts, err := s.loadDBOptions(dbName, false) if err != nil { return nil, err } - size, err := db.Size() + size, err := dirSize(filepath.Join(s.Options.Dir, dbName)) if err != nil { return nil, err } - txCount, err := db.TxCount() + state, err := db.CurrentState() if err != nil { return nil, err } @@ -1507,8 +1494,8 @@ func (s *ImmuServer) DatabaseListV2(ctx context.Context, req *schema.DatabaseLis Name: db.GetName(), Settings: dbOpts.databaseNullableSettings(), Loaded: !db.IsClosed(), - DiskSize: size, - NumTransactions: txCount, + DiskSize: uint64(size), + NumTransactions: state.TxId, CreatedAt: uint64(dbOpts.CreatedAt.Unix()), CreatedBy: dbOpts.CreatedBy, } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 683e8d4792..81bab51aa6 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -98,7 +98,7 @@ func TestServerDefaultDatabaseLoad(t *testing.T) { s, closer := testServer(opts) defer closer() - options := database.DefaultOption().WithDBRootPath(dir) + options := database.DefaultOptions().WithDBRootPath(dir) dbRootpath := options.GetDBRootPath() err := s.loadDefaultDatabase(dbRootpath, nil) @@ -108,7 +108,7 @@ func TestServerDefaultDatabaseLoad(t *testing.T) { func TestServerReOpen(t *testing.T) { serverOptions := DefaultOptions().WithDir(t.TempDir()) - options := database.DefaultOption().WithDBRootPath(serverOptions.Dir) + options := database.DefaultOptions().WithDBRootPath(serverOptions.Dir) dbRootpath := options.GetDBRootPath() s, closer := testServer(serverOptions) @@ -136,7 +136,7 @@ func TestServerReOpen(t *testing.T) { func TestServerSystemDatabaseLoad(t *testing.T) { serverOptions := DefaultOptions().WithDir(t.TempDir()) - options := database.DefaultOption().WithDBRootPath(serverOptions.Dir) + options := database.DefaultOptions().WithDBRootPath(serverOptions.Dir) dbRootpath := options.GetDBRootPath() s, closer := testServer(serverOptions) @@ -153,7 +153,7 @@ func TestServerSystemDatabaseLoad(t *testing.T) { func TestServerResetAdminPassword(t *testing.T) { serverOptions := DefaultOptions().WithDir(t.TempDir()) - options := database.DefaultOption().WithDBRootPath(serverOptions.Dir) + options := database.DefaultOptions().WithDBRootPath(serverOptions.Dir) dbRootpath := options.GetDBRootPath() var txID uint64 @@ -1342,7 +1342,7 @@ func TestServerUpdateConfigItem(t *testing.T) { // Config already having the specified item ioutil.WriteFile(configFile, []byte("key = value"), 0644) err = s.updateConfigItem("key", "key = value", func(string) bool { return true }) - require.ErrorContains(t, err, "Server config already has 'key = value'") + require.ErrorContains(t, err, "server config already has 'key = value'") // Add new config item err = s.updateConfigItem("key2", "key2 = value2", func(string) bool { return false }) diff --git a/pkg/server/sessions/internal/transactions/transactions_test.go b/pkg/server/sessions/internal/transactions/transactions_test.go index 0289a7d552..62309c20c5 100644 --- a/pkg/server/sessions/internal/transactions/transactions_test.go +++ b/pkg/server/sessions/internal/transactions/transactions_test.go @@ -30,7 +30,7 @@ import ( func TestNewTx(t *testing.T) { path := t.TempDir() - db, err := database.NewDB("db1", nil, database.DefaultOption().WithDBRootPath(path), logger.NewSimpleLogger("logger", os.Stdout)) + db, err := database.NewDB("db1", nil, database.DefaultOptions().WithDBRootPath(path), logger.NewSimpleLogger("logger", os.Stdout)) require.NoError(t, err) _, err = NewTransaction(context.Background(), nil, db, "session1") diff --git a/pkg/server/truncator_test.go b/pkg/server/truncator_test.go index e977c8cff0..a80b043582 100644 --- a/pkg/server/truncator_test.go +++ b/pkg/server/truncator_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/pkg/api/schema" "github.com/codenotary/immudb/pkg/auth" "github.com/codenotary/immudb/pkg/database" @@ -133,7 +134,9 @@ func TestServerLoadDatabaseWithRetention(t *testing.T) { err = s.CloseDatabases() require.NoError(t, err) - s.dbList = database.NewDatabaseList() + s.dbList = database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + return database.OpenDB(name, s.multidbHandler(), opts, s.Logger) + }, 10, logger.NewMemoryLogger())) s.sysDB = nil t.Run("attempt to load database should pass", func(t *testing.T) { diff --git a/pkg/server/types.go b/pkg/server/types.go index 19640ebcbf..2792b155fe 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -26,7 +26,6 @@ import ( "github.com/codenotary/immudb/pkg/server/sessions" "github.com/codenotary/immudb/pkg/truncator" - "github.com/codenotary/immudb/embedded/cache" "github.com/codenotary/immudb/embedded/remotestorage" pgsqlsrv "github.com/codenotary/immudb/pkg/pgsql/server" "github.com/codenotary/immudb/pkg/replication" @@ -58,8 +57,8 @@ const ( type ImmuServer struct { OS immuos.OS + dbListMutex sync.Mutex dbList database.DatabaseList - dbListMutex sync.Mutex // TODO: convert dbList into a dbManager capable of opening/closing/deleting dbs replicators map[string]*replication.TxReplicator replicationMutex sync.Mutex @@ -86,16 +85,14 @@ type ImmuServer struct { StreamServiceFactory stream.ServiceFactory PgsqlSrv pgsqlsrv.PGSQLServer - remoteStorage remotestorage.Storage - indexCacheFunc func() *cache.Cache - SessManager sessions.Manager + remoteStorage remotestorage.Storage + SessManager sessions.Manager } // DefaultServer returns a new ImmuServer instance with all configuration options set to their default values. func DefaultServer() *ImmuServer { - return &ImmuServer{ + s := &ImmuServer{ OS: immuos.NewStandardOS(), - dbList: database.NewDatabaseList(), replicators: make(map[string]*replication.TxReplicator), truncators: make(map[string]*truncator.Truncator), Logger: logger.NewSimpleLogger("immudb ", os.Stderr), @@ -105,6 +102,11 @@ func DefaultServer() *ImmuServer { GrpcServer: grpc.NewServer(), StreamServiceFactory: stream.NewStreamServiceFactory(DefaultOptions().StreamChunkSize), } + + s.dbList = database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + return database.OpenDB(name, s.multidbHandler(), opts, s.Logger) + }, s.Options.MaxActiveDatabases, s.Logger)) + return s } type ImmuServerIf interface { diff --git a/pkg/server/types_test.go b/pkg/server/types_test.go index 1ebc6d6b6b..0db0618435 100644 --- a/pkg/server/types_test.go +++ b/pkg/server/types_test.go @@ -53,7 +53,7 @@ func TestWithStreamServiceFactory(t *testing.T) { func TestWithDbList(t *testing.T) { dir := t.TempDir() - dbList := database.NewDatabaseList() + dbList := database.NewDatabaseList(nil) s := DefaultServer() s.WithOptions(DefaultOptions().WithDir(dir)) diff --git a/pkg/server/user_test.go b/pkg/server/user_test.go index 710b3bf0f1..e189b7d3b6 100644 --- a/pkg/server/user_test.go +++ b/pkg/server/user_test.go @@ -21,6 +21,7 @@ import ( "fmt" "testing" + "github.com/codenotary/immudb/embedded/logger" "github.com/codenotary/immudb/embedded/sql" "github.com/codenotary/immudb/pkg/api/schema" "github.com/codenotary/immudb/pkg/auth" @@ -136,7 +137,9 @@ func TestServerListUsersAdmin(t *testing.T) { err = s.CloseDatabases() require.NoError(t, err) - s.dbList = database.NewDatabaseList() + s.dbList = database.NewDatabaseList(database.NewDBManager(func(name string, opts *database.Options) (database.DB, error) { + return database.OpenDB(name, s.multidbHandler(), opts, s.Logger) + }, 10, logger.NewMemoryLogger())) s.sysDB = nil err = s.loadSystemDatabase(s.Options.Dir, nil, auth.SysAdminPassword, false) diff --git a/pkg/truncator/truncator.go b/pkg/truncator/truncator.go index 192a254139..7c613d0db8 100644 --- a/pkg/truncator/truncator.go +++ b/pkg/truncator/truncator.go @@ -60,7 +60,7 @@ func NewTruncator( return &Truncator{ db: db, logger: logger, - truncators: []database.Truncator{database.NewVlogTruncator(db)}, + truncators: []database.Truncator{database.NewVlogTruncator(db, logger)}, donech: make(chan struct{}), stopch: make(chan struct{}), retentionPeriod: retentionPeriod, @@ -169,7 +169,7 @@ func (t *Truncator) Truncate(ctx context.Context, retentionPeriod time.Duration) // Truncate discards the appendable log upto the offset // specified in the transaction hdr - err = c.TruncateUptoTx(ctx, hdr.ID) + err = c.TruncateUptoTx(ctx, hdr.Id) if err != nil { return err } diff --git a/pkg/truncator/truncator_test.go b/pkg/truncator/truncator_test.go index 87479ed4b2..fe6b362877 100644 --- a/pkg/truncator/truncator_test.go +++ b/pkg/truncator/truncator_test.go @@ -46,7 +46,7 @@ func makeDbWith(t *testing.T, dbName string, opts *database.Options) database.DB } func TestDatabase_truncate_with_duration(t *testing.T) { - options := database.DefaultOption().WithDBRootPath(t.TempDir()) + options := database.DefaultOptions().WithDBRootPath(t.TempDir()) so := options.GetStoreOptions() @@ -75,7 +75,7 @@ func TestDatabase_truncate_with_duration(t *testing.T) { } } - c := database.NewVlogTruncator(db) + c := database.NewVlogTruncator(db, logger.NewMemoryLogger()) _, err := c.Plan(ctx, getTruncationTime(queryTime, time.Duration(1*time.Hour))) require.ErrorIs(t, err, database.ErrRetentionPeriodNotReached) @@ -84,7 +84,7 @@ func TestDatabase_truncate_with_duration(t *testing.T) { require.NoError(t, err) require.LessOrEqual(t, time.Unix(hdr.Ts, 0), queryTime) - err = c.TruncateUptoTx(ctx, hdr.ID) + err = c.TruncateUptoTx(ctx, hdr.Id) require.NoError(t, err) // TODO: hard to determine the actual transaction up to which the database was truncated. @@ -98,7 +98,7 @@ func TestDatabase_truncate_with_duration(t *testing.T) { require.Error(t, err) } - for i := hdr.ID; i <= 20; i++ { + for i := hdr.Id; i <= 20; i++ { kv := &schema.KeyValue{ Key: []byte(fmt.Sprintf("key_%d", i)), Value: []byte(fmt.Sprintf("val_%d", i)), @@ -113,7 +113,7 @@ func TestDatabase_truncate_with_duration(t *testing.T) { } func TestTruncator(t *testing.T) { - options := database.DefaultOption().WithDBRootPath(t.TempDir()) + options := database.DefaultOptions().WithDBRootPath(t.TempDir()) so := options.GetStoreOptions(). WithEmbeddedValues(false) @@ -139,7 +139,7 @@ func TestTruncator(t *testing.T) { } func TestTruncator_with_truncation_frequency(t *testing.T) { - options := database.DefaultOption().WithDBRootPath(t.TempDir()) + options := database.DefaultOptions().WithDBRootPath(t.TempDir()) so := options.GetStoreOptions(). WithEmbeddedValues(false) @@ -202,7 +202,7 @@ func Test_getTruncationTime(t *testing.T) { } func TestTruncator_with_retention_period(t *testing.T) { - options := database.DefaultOption().WithDBRootPath(t.TempDir()) + options := database.DefaultOptions().WithDBRootPath(t.TempDir()) so := options.GetStoreOptions(). WithEmbeddedValues(false) @@ -229,7 +229,7 @@ type mockTruncator struct { err error } -func (m *mockTruncator) Plan(context.Context, time.Time) (*store.TxHeader, error) { +func (m *mockTruncator) Plan(context.Context, time.Time) (*schema.TxHeader, error) { return nil, m.err } @@ -240,7 +240,7 @@ func (m *mockTruncator) TruncateUptoTx(context.Context, uint64) error { } func TestTruncator_with_nothing_to_truncate(t *testing.T) { - options := database.DefaultOption().WithDBRootPath(t.TempDir()) + options := database.DefaultOptions().WithDBRootPath(t.TempDir()) so := options.GetStoreOptions(). WithEmbeddedValues(false)