Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(embedded/database): implement database manager #2023

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/immudb/command/immudbcmdtest/immuServerMock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestImmuServerMock(t *testing.T) {
mock.WithStreamServiceFactory(ssf)
require.Same(t, ssf, mock.Ssf)

list := database.NewDatabaseList()
list := database.NewDatabaseList(nil)
mock.WithDbList(list)
require.Same(t, list, mock.DbList)

Expand Down
3 changes: 2 additions & 1 deletion cmd/immudb/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")
cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication")
cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication")
cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache")
cmd.Flags().Int("max-active-databases", options.MaxActiveDatabases, "the maximum number of databases that can be active simultaneously")

cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)")
cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid")
Expand Down Expand Up @@ -162,6 +162,7 @@ func setupDefaults(options *server.Options) {
viper.SetDefault("max-sessions", 100)
viper.SetDefault("max-session-inactivity-time", 3*time.Minute)
viper.SetDefault("max-session-age-time", 0)
viper.SetDefault("max-active-databases", options.MaxActiveDatabases)
viper.SetDefault("session-timeout", 2*time.Minute)
viper.SetDefault("sessions-guard-check-interval", 1*time.Minute)
viper.SetDefault("logformat", logger.LogFormatText)
Expand Down
5 changes: 4 additions & 1 deletion cmd/immudb/command/parse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ func parseOptions() (options *server.Options, err error) {
swaggerUIEnabled := viper.GetBool("swaggerui")
logRequestMetadata := viper.GetBool("log-request-metadata")

maxActiveDatabases := viper.GetInt("max-active-databases")

s3Storage := viper.GetBool("s3-storage")
s3RoleEnabled := viper.GetBool("s3-role-enabled")
s3Role := viper.GetString("s3-role")
Expand Down Expand Up @@ -165,7 +167,8 @@ func parseOptions() (options *server.Options, err error) {
WithLogFormat(logFormat).
WithSwaggerUIEnabled(swaggerUIEnabled).
WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled).
WithLogRequestMetadata(logRequestMetadata)
WithLogRequestMetadata(logRequestMetadata).
WithMaxActiveDatabases(maxActiveDatabases)

return options, nil
}
40 changes: 37 additions & 3 deletions embedded/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ var (
ErrIllegalArguments = errors.New("illegal arguments")
ErrKeyNotFound = errors.New("key not found")
ErrIllegalState = errors.New("illegal state")
ErrCannotEvictItem = errors.New("cannot find an item to evict")
)

type EvictFilterFunc func(key interface{}, value interface{}) bool
type EvictCallbackFunc func(key, value interface{})

// Cache implements the SIEVE cache replacement policy.
type Cache struct {
data map[interface{}]*entry
Expand All @@ -40,6 +44,9 @@ type Cache struct {
maxWeight int

mutex sync.RWMutex

canEvict EvictFilterFunc
onEvict EvictCallbackFunc
}

type entry struct {
Expand All @@ -59,15 +66,34 @@ func NewCache(maxWeight int) (*Cache, error) {
list: list.New(),
weight: 0,
maxWeight: maxWeight,
onEvict: nil,
canEvict: nil,
}, nil
}

func (c *Cache) SetCanEvict(canEvict EvictFilterFunc) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.canEvict = canEvict
}

func (c *Cache) SetOnEvict(onEvict EvictCallbackFunc) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.onEvict = onEvict
}

func (c *Cache) Resize(newWeight int) {
c.mutex.Lock()
defer c.mutex.Unlock()

for c.weight > newWeight {
_, entry, _ := c.evict()
key, entry, _ := c.evict()
if c.onEvict != nil {
c.onEvict(key, entry.value)
}
c.weight -= entry.weight
}

Expand Down Expand Up @@ -133,6 +159,9 @@ func (c *Cache) evictWhileFull(weight int) (interface{}, interface{}, error) {
rkey = evictedKey
rvalue = entry.value

if c.onEvict != nil {
c.onEvict(rkey, rvalue)
}
c.weight -= entry.weight
}
return rkey, rvalue, nil
Expand All @@ -144,15 +173,15 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
}

curr := c.hand
for {
for i := 0; i < 2*c.list.Len(); i++ {
if curr == nil {
curr = c.list.Back()
}

key := curr.Value

e := c.data[key]
if e.visited == 0 {
if e.visited == 0 && c.shouldEvict(key, e.value) {
c.hand = curr.Prev()

c.list.Remove(curr)
Expand All @@ -164,6 +193,11 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
e.visited = 0
curr = curr.Prev()
}
return nil, nil, ErrCannotEvictItem
}

func (c *Cache) shouldEvict(key, value interface{}) bool {
return c.canEvict == nil || c.canEvict(key, value)
}

func (c *Cache) Get(key interface{}) (interface{}, error) {
Expand Down
56 changes: 56 additions & 0 deletions embedded/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,5 +394,61 @@ func TestPutWeighted(t *testing.T) {
require.Equal(t, 4, cache.Weight())
require.Equal(t, 1, cache.EntriesCount())
})
}

func TestOnEvict(t *testing.T) {
cache, err := NewCache(5)
require.NoError(t, err)

var onEvictCalled int
cache.SetOnEvict(func(_, value interface{}) {
onEvictCalled++
})

for i := 0; i < 5; i++ {
cache.Put(i, i)
}
require.Zero(t, onEvictCalled)

_, _, err = cache.PutWeighted(6, 6, 3)
require.NoError(t, err)

require.Equal(t, onEvictCalled, 3)

_, _, err = cache.PutWeighted(7, 7, 2)
require.NoError(t, err)
require.Equal(t, onEvictCalled, 5)

cache.Resize(0)
require.Equal(t, onEvictCalled, 7)
}

func TestCanEvict(t *testing.T) {
cache, err := NewCache(5)
require.NoError(t, err)

for i := 0; i < 5; i++ {
_, _, err := cache.Put(i, i)
require.NoError(t, err)
}

t.Run("cannot evict any item", func(t *testing.T) {
cache.SetCanEvict(func(_, _ interface{}) bool {
return false
})

_, _, err := cache.Put(6, 6)
require.ErrorIs(t, err, ErrCannotEvictItem)
})

t.Run("cannot evict any item", func(t *testing.T) {
keyToEvict := rand.Intn(5)
cache.SetCanEvict(func(key, _ interface{}) bool {
return key == keyToEvict
})

evictedKey, _, err := cache.Put(6, 6)
require.NoError(t, err)
require.Equal(t, evictedKey, keyToEvict)
})
}
84 changes: 42 additions & 42 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ type ImmuStore struct {
commitWHub *watchers.WatchersHub

indexers map[[sha256.Size]byte]*indexer
nextIndexerID uint32
nextIndexerID uint64
indexCache *cache.Cache

memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory
Expand Down Expand Up @@ -720,47 +720,52 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
}

if store.synced {
go func() {
for {
committedTxID := store.LastCommittedTxID()
go store.syncer()
}

// passive wait for one new transaction at least
store.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
return store, nil
}

// TODO: waiting on earlier stages of transaction processing may also be possible
prevLatestPrecommitedTx := committedTxID + 1
func (s *ImmuStore) syncer() {
for {
committedTxID := s.LastCommittedTxID()

// TODO: parametrize concurrency evaluation
for i := 0; i < 4; i++ {
// give some time for more transactions to be precommitted
time.Sleep(store.syncFrequency / 4)
// passive wait for one new transaction at least
err := s.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
if errors.Is(err, watchers.ErrAlreadyClosed) {
return
}

latestPrecommitedTx := store.LastPrecommittedTxID()
// TODO: waiting on earlier stages of transaction processing may also be possible
prevLatestPrecommitedTx := committedTxID + 1

if prevLatestPrecommitedTx == latestPrecommitedTx {
// avoid waiting if there are no new transactions
break
}
// TODO: parametrize concurrency evaluation
for i := 0; i < 4; i++ {
// give some time for more transactions to be precommitted
time.Sleep(s.syncFrequency / 4)

prevLatestPrecommitedTx = latestPrecommitedTx
}
latestPrecommitedTx := s.LastPrecommittedTxID()

// ensure durability
err := store.sync()
if errors.Is(err, ErrAlreadyClosed) ||
errors.Is(err, multiapp.ErrAlreadyClosed) ||
errors.Is(err, singleapp.ErrAlreadyClosed) ||
errors.Is(err, watchers.ErrAlreadyClosed) {
return
}
if err != nil {
store.notify(Error, true, "%s: while syncing transactions", err)
}
if prevLatestPrecommitedTx == latestPrecommitedTx {
// avoid waiting if there are no new transactions
break
}
}()
}

return store, nil
prevLatestPrecommitedTx = latestPrecommitedTx
}

// ensure durability
err = s.sync()
if errors.Is(err, ErrAlreadyClosed) ||
errors.Is(err, multiapp.ErrAlreadyClosed) ||
errors.Is(err, singleapp.ErrAlreadyClosed) ||
errors.Is(err, watchers.ErrAlreadyClosed) {
return
}
if err != nil {
s.notify(Error, true, "%s: while syncing transactions", err)
}
}
}

type NotificationType = int
Expand Down Expand Up @@ -855,15 +860,11 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error {
}

if s.indexCache == nil {
if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil {
s.indexCache = indexFactoryFunc()
} else {
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
if err != nil {
return err
}
s.indexCache = c
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
if err != nil {
return err
}
s.indexCache = c
}

indexer, err := newIndexer(indexPath, s, s.opts)
Expand Down Expand Up @@ -3292,7 +3293,6 @@ func (s *ImmuStore) Sync() error {
if s.closed {
return ErrAlreadyClosed
}

return s.sync()
}

Expand Down
2 changes: 1 addition & 1 deletion embedded/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments)
}

id := atomic.AddUint32(&store.nextIndexerID, 1)
id := atomic.AddUint64(&store.nextIndexerID, 1)
if id-1 > math.MaxUint16 {
return nil, ErrMaxIndexersLimitExceeded
}
Expand Down
8 changes: 0 additions & 8 deletions embedded/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,6 @@ type IndexOptions struct {

// Maximum time waiting for more transactions to be committed and included into the same bulk
BulkPreparationTimeout time.Duration

// CacheFactory function
CacheFactory IndexCacheFactoryFunc
}

type AHTOptions struct {
Expand Down Expand Up @@ -710,11 +707,6 @@ func (opts *IndexOptions) WithMaxGlobalBufferedDataSize(size int) *IndexOptions
return opts
}

func (opts *IndexOptions) WithCacheFactoryFunc(indexCacheFactory IndexCacheFactoryFunc) *IndexOptions {
opts.CacheFactory = indexCacheFactory
return opts
}

// AHTOptions

func (opts *AHTOptions) WithWriteBufferSize(writeBufferSize int) *AHTOptions {
Expand Down
Loading
Loading