Skip to content

Commit

Permalink
chore(pkg/database): implement database manager
Browse files Browse the repository at this point in the history
Databases are now opened and closed on demand, up to the MaxActiveDatabases limit,
to reduce memory consumption when managing a large number of databases.

Signed-off-by: Stefano Scafiti <stefano.scafiti96@gmail.com>
  • Loading branch information
ostafen committed Sep 16, 2024
1 parent 1e78388 commit a8f48b8
Show file tree
Hide file tree
Showing 40 changed files with 2,040 additions and 374 deletions.
2 changes: 1 addition & 1 deletion cmd/immudb/command/immudbcmdtest/immuServerMock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestImmuServerMock(t *testing.T) {
mock.WithStreamServiceFactory(ssf)
require.Same(t, ssf, mock.Ssf)

list := database.NewDatabaseList()
list := database.NewDatabaseList(nil)
mock.WithDbList(list)
require.Same(t, list, mock.DbList)

Expand Down
3 changes: 2 additions & 1 deletion cmd/immudb/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")
cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication")
cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication")
cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache")
cmd.Flags().Int("max-active-databases", options.MaxActiveDatabases, "the maximum number of databases that can be active simultaneously")

cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)")
cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid")
Expand Down Expand Up @@ -162,6 +162,7 @@ func setupDefaults(options *server.Options) {
viper.SetDefault("max-sessions", 100)
viper.SetDefault("max-session-inactivity-time", 3*time.Minute)
viper.SetDefault("max-session-age-time", 0)
viper.SetDefault("max-active-databases", options.MaxActiveDatabases)
viper.SetDefault("session-timeout", 2*time.Minute)
viper.SetDefault("sessions-guard-check-interval", 1*time.Minute)
viper.SetDefault("logformat", logger.LogFormatText)
Expand Down
5 changes: 4 additions & 1 deletion cmd/immudb/command/parse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func parseOptions() (options *server.Options, err error) {
swaggerUIEnabled := viper.GetBool("swaggerui")
logRequestMetadata := viper.GetBool("log-request-metadata")

maxActiveDatabases := viper.GetInt("max-active-databases")

s3Storage := viper.GetBool("s3-storage")
s3RoleEnabled := viper.GetBool("s3-role-enabled")
s3Role := viper.GetString("s3-role")
Expand Down Expand Up @@ -165,7 +167,8 @@ func parseOptions() (options *server.Options, err error) {
WithLogFormat(logFormat).
WithSwaggerUIEnabled(swaggerUIEnabled).
WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled).
WithLogRequestMetadata(logRequestMetadata)
WithLogRequestMetadata(logRequestMetadata).
WithMaxActiveDatabases(maxActiveDatabases)

return options, nil
}
40 changes: 37 additions & 3 deletions embedded/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ var (
ErrIllegalArguments = errors.New("illegal arguments")
ErrKeyNotFound = errors.New("key not found")
ErrIllegalState = errors.New("illegal state")
ErrCannotEvictItem = errors.New("cannot find an item to evict")
)

type EvictFilterFunc func(key interface{}, value interface{}) bool
type EvictCallbackFunc func(key, value interface{})

// Cache implements the SIEVE cache replacement policy.
type Cache struct {
data map[interface{}]*entry
Expand All @@ -40,6 +44,9 @@ type Cache struct {
maxWeight int

mutex sync.RWMutex

canEvict EvictFilterFunc
onEvict EvictCallbackFunc
}

type entry struct {
Expand All @@ -59,15 +66,34 @@ func NewCache(maxWeight int) (*Cache, error) {
list: list.New(),
weight: 0,
maxWeight: maxWeight,
onEvict: nil,
canEvict: nil,
}, nil
}

func (c *Cache) SetCanEvict(canEvict EvictFilterFunc) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.canEvict = canEvict
}

func (c *Cache) SetOnEvict(onEvict EvictCallbackFunc) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.onEvict = onEvict
}

func (c *Cache) Resize(newWeight int) {
c.mutex.Lock()
defer c.mutex.Unlock()

for c.weight > newWeight {
_, entry, _ := c.evict()
key, entry, _ := c.evict()
if c.onEvict != nil {
c.onEvict(key, entry.value)
}
c.weight -= entry.weight
}

Expand Down Expand Up @@ -133,6 +159,9 @@ func (c *Cache) evictWhileFull(weight int) (interface{}, interface{}, error) {
rkey = evictedKey
rvalue = entry.value

if c.onEvict != nil {
c.onEvict(rkey, rvalue)
}
c.weight -= entry.weight
}
return rkey, rvalue, nil
Expand All @@ -144,15 +173,15 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
}

curr := c.hand
for {
for i := 0; i < 2*c.list.Len(); i++ {
if curr == nil {
curr = c.list.Back()
}

key := curr.Value

e := c.data[key]
if e.visited == 0 {
if e.visited == 0 && c.shouldEvict(key, e.value) {
c.hand = curr.Prev()

c.list.Remove(curr)
Expand All @@ -164,6 +193,11 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
e.visited = 0
curr = curr.Prev()
}
return nil, nil, ErrCannotEvictItem
}

func (c *Cache) shouldEvict(key, value interface{}) bool {
return c.canEvict == nil || c.canEvict(key, value)
}

func (c *Cache) Get(key interface{}) (interface{}, error) {
Expand Down
56 changes: 56 additions & 0 deletions embedded/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,5 +394,61 @@ func TestPutWeighted(t *testing.T) {
require.Equal(t, 4, cache.Weight())
require.Equal(t, 1, cache.EntriesCount())
})
}

func TestOnEvict(t *testing.T) {
cache, err := NewCache(5)
require.NoError(t, err)

var onEvictCalled int
cache.SetOnEvict(func(_, value interface{}) {
onEvictCalled++
})

for i := 0; i < 5; i++ {
cache.Put(i, i)
}
require.Zero(t, onEvictCalled)

_, _, err = cache.PutWeighted(6, 6, 3)
require.NoError(t, err)

require.Equal(t, onEvictCalled, 3)

_, _, err = cache.PutWeighted(7, 7, 2)
require.NoError(t, err)
require.Equal(t, onEvictCalled, 5)

cache.Resize(0)
require.Equal(t, onEvictCalled, 7)
}

func TestCanEvict(t *testing.T) {
cache, err := NewCache(5)
require.NoError(t, err)

for i := 0; i < 5; i++ {
_, _, err := cache.Put(i, i)
require.NoError(t, err)
}

t.Run("cannot evict any item", func(t *testing.T) {
cache.SetCanEvict(func(_, _ interface{}) bool {
return false
})

_, _, err := cache.Put(6, 6)
require.ErrorIs(t, err, ErrCannotEvictItem)
})

t.Run("cannot evict any item", func(t *testing.T) {
keyToEvict := rand.Intn(5)
cache.SetCanEvict(func(key, _ interface{}) bool {
return key == keyToEvict
})

evictedKey, _, err := cache.Put(6, 6)
require.NoError(t, err)
require.Equal(t, evictedKey, keyToEvict)
})
}
84 changes: 42 additions & 42 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ type ImmuStore struct {
commitWHub *watchers.WatchersHub

indexers map[[sha256.Size]byte]*indexer
nextIndexerID uint32
nextIndexerID uint64
indexCache *cache.Cache

memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory
Expand Down Expand Up @@ -720,47 +720,52 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
}

if store.synced {
go func() {
for {
committedTxID := store.LastCommittedTxID()
go store.syncer()
}

// passive wait for one new transaction at least
store.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
return store, nil
}

// TODO: waiting on earlier stages of transaction processing may also be possible
prevLatestPrecommitedTx := committedTxID + 1
func (s *ImmuStore) syncer() {
for {
committedTxID := s.LastCommittedTxID()

// TODO: parametrize concurrency evaluation
for i := 0; i < 4; i++ {
// give some time for more transactions to be precommitted
time.Sleep(store.syncFrequency / 4)
// passive wait for one new transaction at least
err := s.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
if errors.Is(err, watchers.ErrAlreadyClosed) {
return
}

latestPrecommitedTx := store.LastPrecommittedTxID()
// TODO: waiting on earlier stages of transaction processing may also be possible
prevLatestPrecommitedTx := committedTxID + 1

if prevLatestPrecommitedTx == latestPrecommitedTx {
// avoid waiting if there are no new transactions
break
}
// TODO: parametrize concurrency evaluation
for i := 0; i < 4; i++ {
// give some time for more transactions to be precommitted
time.Sleep(s.syncFrequency / 4)

prevLatestPrecommitedTx = latestPrecommitedTx
}
latestPrecommitedTx := s.LastPrecommittedTxID()

// ensure durability
err := store.sync()
if errors.Is(err, ErrAlreadyClosed) ||
errors.Is(err, multiapp.ErrAlreadyClosed) ||
errors.Is(err, singleapp.ErrAlreadyClosed) ||
errors.Is(err, watchers.ErrAlreadyClosed) {
return
}
if err != nil {
store.notify(Error, true, "%s: while syncing transactions", err)
}
if prevLatestPrecommitedTx == latestPrecommitedTx {
// avoid waiting if there are no new transactions
break
}
}()
}

return store, nil
prevLatestPrecommitedTx = latestPrecommitedTx
}

// ensure durability
err = s.sync()
if errors.Is(err, ErrAlreadyClosed) ||
errors.Is(err, multiapp.ErrAlreadyClosed) ||
errors.Is(err, singleapp.ErrAlreadyClosed) ||
errors.Is(err, watchers.ErrAlreadyClosed) {
return
}
if err != nil {
s.notify(Error, true, "%s: while syncing transactions", err)
}
}
}

type NotificationType = int
Expand Down Expand Up @@ -855,15 +860,11 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error {
}

if s.indexCache == nil {
if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil {
s.indexCache = indexFactoryFunc()
} else {
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
if err != nil {
return err
}
s.indexCache = c
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
if err != nil {
return err
}
s.indexCache = c
}

indexer, err := newIndexer(indexPath, s, s.opts)
Expand Down Expand Up @@ -3292,7 +3293,6 @@ func (s *ImmuStore) Sync() error {
if s.closed {
return ErrAlreadyClosed
}

return s.sync()
}

Expand Down
2 changes: 1 addition & 1 deletion embedded/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments)
}

id := atomic.AddUint32(&store.nextIndexerID, 1)
id := atomic.AddUint64(&store.nextIndexerID, 1)
if id-1 > math.MaxUint16 {
return nil, ErrMaxIndexersLimitExceeded
}
Expand Down
8 changes: 0 additions & 8 deletions embedded/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ type IndexOptions struct {

// Maximum time waiting for more transactions to be committed and included into the same bulk
BulkPreparationTimeout time.Duration

// CacheFactory function
CacheFactory IndexCacheFactoryFunc
}

type AHTOptions struct {
Expand Down Expand Up @@ -710,11 +707,6 @@ func (opts *IndexOptions) WithMaxGlobalBufferedDataSize(size int) *IndexOptions
return opts
}

func (opts *IndexOptions) WithCacheFactoryFunc(indexCacheFactory IndexCacheFactoryFunc) *IndexOptions {
opts.CacheFactory = indexCacheFactory
return opts
}

// AHTOptions

func (opts *AHTOptions) WithWriteBufferSize(writeBufferSize int) *AHTOptions {
Expand Down
Loading

0 comments on commit a8f48b8

Please sign in to comment.