Skip to content

Commit

Permalink
chore(pkg/database): context propagation
Browse files Browse the repository at this point in the history
Signed-off-by: Jeronimo Irazabal <jeronimo.irazabal@gmail.com>
  • Loading branch information
jeroiraz committed Sep 6, 2023
1 parent 17b6ec0 commit 7e9f28a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 29 deletions.
13 changes: 12 additions & 1 deletion embedded/document/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,18 @@ func (e *Engine) getDocument(key []byte, valRef store.ValueRef) (docAtRevision *
}

func (e *Engine) getEncodedDocument(ctx context.Context, key []byte, atTx uint64) (encDoc *EncodedDocument, err error) {
valRef, err := e.sqlEngine.GetStore().GetBetween(ctx, key, atTx, atTx)
err = e.sqlEngine.GetStore().WaitForIndexingUpto(ctx, atTx)
if err != nil {
return nil, err
}

var valRef store.ValueRef

if atTx == 0 {
valRef, err = e.sqlEngine.GetStore().Get(ctx, key)
} else {
valRef, err = e.sqlEngine.GetStore().GetBetween(ctx, key, atTx, atTx)
}
if errors.Is(err, store.ErrKeyNotFound) {
return nil, ErrDocumentNotFound
}
Expand Down
5 changes: 0 additions & 5 deletions embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,11 +915,6 @@ func (s *ImmuStore) GetBetween(ctx context.Context, key []byte, initialTxID uint
return nil, err
}

err = indexer.WaitForIndexingUpto(ctx, finalTxID)
if err != nil {
return nil, err
}

indexedVal, tx, hc, err := indexer.GetBetween(key, initialTxID, finalTxID)
if err != nil {
return nil, err
Expand Down
49 changes: 26 additions & 23 deletions embedded/store/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type indexer struct {
closed bool

compactionMutex sync.Mutex
mutex sync.Mutex
rwmutex sync.RWMutex

metricsLastCommittedTrx prometheus.Gauge
metricsLastIndexedTrx prometheus.Gauge
Expand Down Expand Up @@ -166,8 +166,8 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
}

func (idx *indexer) init(spec *IndexSpec) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.Lock()
defer idx.rwmutex.Unlock()

idx.spec = spec

Expand All @@ -179,19 +179,22 @@ func (idx *indexer) Prefix() []byte {
}

func (idx *indexer) Ts() uint64 {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

return idx.index.Ts()
}

func (idx *indexer) SyncSnapshot() (*tbtree.Snapshot, error) {
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

return idx.index.SyncSnapshot()
}

func (idx *indexer) Get(key []byte) (value []byte, tx uint64, hc uint64, err error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, 0, 0, ErrAlreadyClosed
Expand All @@ -201,8 +204,8 @@ func (idx *indexer) Get(key []byte) (value []byte, tx uint64, hc uint64, err err
}

func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64) (value []byte, tx uint64, hc uint64, err error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, 0, 0, ErrAlreadyClosed
Expand All @@ -212,8 +215,8 @@ func (idx *indexer) GetBetween(key []byte, initialTxID uint64, finalTxID uint64)
}

func (idx *indexer) History(key []byte, offset uint64, descOrder bool, limit int) (timedValues []tbtree.TimedValue, hCount uint64, err error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, 0, ErrAlreadyClosed
Expand All @@ -226,8 +229,8 @@ func (idx *indexer) Snapshot() (*tbtree.Snapshot, error) {
idx.compactionMutex.Lock()
defer idx.compactionMutex.Unlock()

idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, ErrAlreadyClosed
Expand All @@ -240,8 +243,8 @@ func (idx *indexer) SnapshotMustIncludeTxIDWithRenewalPeriod(ctx context.Context
idx.compactionMutex.Lock()
defer idx.compactionMutex.Unlock()

idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, ErrAlreadyClosed
Expand All @@ -256,8 +259,8 @@ func (idx *indexer) SnapshotMustIncludeTxIDWithRenewalPeriod(ctx context.Context
}

func (idx *indexer) GetWithPrefix(prefix []byte, neq []byte) (key []byte, value []byte, tx uint64, hc uint64, err error) {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return nil, nil, 0, 0, ErrAlreadyClosed
Expand All @@ -267,8 +270,8 @@ func (idx *indexer) GetWithPrefix(prefix []byte, neq []byte) (key []byte, value
}

func (idx *indexer) Sync() error {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return ErrAlreadyClosed
Expand All @@ -281,8 +284,8 @@ func (idx *indexer) Close() error {
idx.compactionMutex.Lock()
defer idx.compactionMutex.Unlock()

idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.RLock()
defer idx.rwmutex.RUnlock()

if idx.closed {
return ErrAlreadyClosed
Expand Down Expand Up @@ -371,8 +374,8 @@ func (idx *indexer) resume() {
}

func (idx *indexer) restartIndex() error {
idx.mutex.Lock()
defer idx.mutex.Unlock()
idx.rwmutex.Lock()
defer idx.rwmutex.Unlock()

if idx.closed {
return ErrAlreadyClosed
Expand Down

0 comments on commit 7e9f28a

Please sign in to comment.