From 7e9f28a65647e17d674f6e89c8714af615fafb41 Mon Sep 17 00:00:00 2001 From: Jeronimo Irazabal Date: Wed, 6 Sep 2023 19:31:56 -0300 Subject: [PATCH] chore(pkg/database): context propagation Signed-off-by: Jeronimo Irazabal --- embedded/document/engine.go | 13 +++++++++- embedded/store/immustore.go | 5 ---- embedded/store/indexer.go | 49 ++++++++++++++++++++----------------- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/embedded/document/engine.go b/embedded/document/engine.go index c2a8239f76..da23ff6bb9 100644 --- a/embedded/document/engine.go +++ b/embedded/document/engine.go @@ -1189,7 +1189,18 @@ func (e *Engine) getDocument(key []byte, valRef store.ValueRef) (docAtRevision * } func (e *Engine) getEncodedDocument(ctx context.Context, key []byte, atTx uint64) (encDoc *EncodedDocument, err error) { - valRef, err := e.sqlEngine.GetStore().GetBetween(ctx, key, atTx, atTx) + err = e.sqlEngine.GetStore().WaitForIndexingUpto(ctx, atTx) + if err != nil { + return nil, err + } + + var valRef store.ValueRef + + if atTx == 0 { + valRef, err = e.sqlEngine.GetStore().Get(ctx, key) + } else { + valRef, err = e.sqlEngine.GetStore().GetBetween(ctx, key, atTx, atTx) + } if errors.Is(err, store.ErrKeyNotFound) { return nil, ErrDocumentNotFound } diff --git a/embedded/store/immustore.go b/embedded/store/immustore.go index 269c86dc7d..4f71127488 100644 --- a/embedded/store/immustore.go +++ b/embedded/store/immustore.go @@ -915,11 +915,6 @@ func (s *ImmuStore) GetBetween(ctx context.Context, key []byte, initialTxID uint return nil, err } - err = indexer.WaitForIndexingUpto(ctx, finalTxID) - if err != nil { - return nil, err - } - indexedVal, tx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID) if err != nil { return nil, err diff --git a/embedded/store/indexer.go b/embedded/store/indexer.go index feccd51cb8..e1d89f6df1 100644 --- a/embedded/store/indexer.go +++ b/embedded/store/indexer.go @@ -62,7 +62,7 @@ type indexer struct { closed bool compactionMutex sync.Mutex - mutex sync.Mutex + rwmutex sync.RWMutex metricsLastCommittedTrx prometheus.Gauge metricsLastIndexedTrx prometheus.Gauge @@ -166,8 +166,8 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error) } func (idx *indexer) init(spec *IndexSpec) { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.Lock() + defer idx.rwmutex.Unlock() idx.spec = spec @@ -179,19 +179,22 @@ func (idx *indexer) Prefix() []byte { } func (idx *indexer) Ts() uint64 { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() return idx.index.Ts() } func (idx *indexer) SyncSnapshot() (*tbtree.Snapshot, error) { + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() + return idx.index.SyncSnapshot() } func (idx *indexer) Get(key []byte) (value []byte, tx uint64, hc uint64, err error) { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, 0, 0, ErrAlreadyClosed @@ -201,8 +204,8 @@ func (idx *indexer) Get(key []byte) (value []byte, tx uint64, hc uint64, err err } func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64) (value []byte, tx uint64, hc uint64, err error) { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, 0, 0, ErrAlreadyClosed @@ -212,8 +215,8 @@ func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64) } func (idx *indexer) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []tbtree.TimedValue, hCount uint64, err error) { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, 0, ErrAlreadyClosed @@ -226,8 +229,8 @@ func (idx *indexer) Snapshot() (*tbtree.Snapshot, error) { idx.compactionMutex.Lock() defer idx.compactionMutex.Unlock() - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, ErrAlreadyClosed @@ -240,8 +243,8 @@ func (idx *indexer) SnapshotMustIncludeTxIDWithRenewalPeriod(ctx context.Context idx.compactionMutex.Lock() defer idx.compactionMutex.Unlock() - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, ErrAlreadyClosed @@ -256,8 +259,8 @@ func (idx *indexer) SnapshotMustIncludeTxIDWithRenewalPeriod(ctx context.Context } func (idx *indexer) GetWithPrefix(prefix []byte, neq []byte) (key []byte, value []byte, tx uint64, hc uint64, err error) { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return nil, nil, 0, 0, ErrAlreadyClosed @@ -267,8 +270,8 @@ func (idx *indexer) GetWithPrefix(prefix []byte, neq []byte) (key []byte, value } func (idx *indexer) Sync() error { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return ErrAlreadyClosed @@ -281,8 +284,8 @@ func (idx *indexer) Close() error { idx.compactionMutex.Lock() defer idx.compactionMutex.Unlock() - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.RLock() + defer idx.rwmutex.RUnlock() if idx.closed { return ErrAlreadyClosed @@ -371,8 +374,8 @@ func (idx *indexer) resume() { } func (idx *indexer) restartIndex() error { - idx.mutex.Lock() - defer idx.mutex.Unlock() + idx.rwmutex.Lock() + defer idx.rwmutex.Unlock() if idx.closed { return ErrAlreadyClosed