Skip to content

Commit

Permalink
Table store (#774)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley authored Oct 15, 2024
1 parent 413dde7 commit 841e0db
Show file tree
Hide file tree
Showing 15 changed files with 2,292 additions and 124 deletions.
14 changes: 14 additions & 0 deletions common/kvstore/Batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kvstore

// Batch is a collection of key / value pairs that can be written atomically to a database.
type Batch[K any] interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key K, value []byte)
// Delete removes the key from the batch.
Delete(key K)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Size() uint32
}
36 changes: 36 additions & 0 deletions common/kvstore/leveldb/leveldb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func NewStore(logger logging.Logger, path string) (kvstore.Store, error) {

// Put stores a data in the store.
func (store *levelDBStore) Put(key []byte, value []byte) error {
if value == nil {
value = []byte{}
}
return store.db.Put(key, value, nil)
}

Expand Down Expand Up @@ -84,6 +87,39 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error {
return store.db.Write(batch, nil)
}

// NewBatch creates a new batch for the store.
func (store *levelDBStore) NewBatch() kvstore.StoreBatch {
return &levelDBBatch{
store: store,
batch: new(leveldb.Batch),
}
}

type levelDBBatch struct {
store *levelDBStore
batch *leveldb.Batch
}

func (m *levelDBBatch) Put(key []byte, value []byte) {
if value == nil {
value = []byte{}
}
m.batch.Put(key, value)
}

func (m *levelDBBatch) Delete(key []byte) {
m.batch.Delete(key)
}

func (m *levelDBBatch) Apply() error {
return m.store.db.Write(m.batch, nil)
}

// Size returns the number of operations in the batch.
func (m *levelDBBatch) Size() uint32 {
return uint32(m.batch.Len())
}

// Shutdown shuts down the store.
//
// Warning: it is not thread safe to call this method concurrently with other methods on this class,
Expand Down
58 changes: 58 additions & 0 deletions common/kvstore/mapstore/map_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func NewStore() kvstore.Store {

// Put adds a key-value pair to the store.
func (store *mapStore) Put(key []byte, value []byte) error {
if value == nil {
value = []byte{}
}

store.lock.Lock()
defer store.lock.Unlock()

Expand Down Expand Up @@ -74,6 +78,60 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error {
return nil
}

// NewBatch creates a new batch for the store.
func (store *mapStore) NewBatch() kvstore.StoreBatch {
return &batch{
store: store,
keys: make([][]byte, 0),
values: make([][]byte, 0),
}
}

// batch is a collection of operations that can be applied atomically to a mapStore.
type batch struct {
store *mapStore
keys [][]byte
values [][]byte
}

// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
func (m *batch) Put(key []byte, value []byte) {
if value == nil {
value = []byte{}
}
m.keys = append(m.keys, key)
m.values = append(m.values, value)
}

// Delete removes the key from the batch. Does not return an error if the key does not exist.
func (m *batch) Delete(key []byte) {
m.keys = append(m.keys, key)
m.values = append(m.values, nil)
}

// Apply atomically writes & deletes all the key / value pairs in the batch to the database.
func (m *batch) Apply() error {
for i, key := range m.keys {
if m.values[i] == nil {
err := m.store.Delete(key)
if err != nil {
return err
}
} else {
err := m.store.Put(key, m.values[i])
if err != nil {
return err
}
}
}
return nil
}

// Size returns the number of operations in the batch.
func (m *batch) Size() uint32 {
return uint32(len(m.keys))
}

type mapIterator struct {
keys []string
values map[string][]byte
Expand Down
18 changes: 9 additions & 9 deletions common/kvstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ import (
"github.com/syndtr/goleveldb/leveldb/iterator"
)

// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")

// StoreBatch is a collection of operations that can be applied atomically to a Store.
type StoreBatch Batch[[]byte]

// Store implements a key-value store. May be backed by a database like LevelDB.
//
// Implementations of this interface are expected to be thread-safe.
type Store interface {

// Put stores the given key / value pair in the database, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key []byte, value []byte) error

// Get retrieves the value for the given key. Returns a ErrNotFound error if the key does not exist.
Expand All @@ -20,12 +27,8 @@ type Store interface {
// Delete removes the key from the database. Does not return an error if the key does not exist.
Delete(key []byte) error

// DeleteBatch atomically removes a list of keys from the database.
DeleteBatch(keys [][]byte) error

// WriteBatch atomically writes a list of key / value pairs to the database. The key at index i in the keys slice
// corresponds to the value at index i in the values slice.
WriteBatch(keys, values [][]byte) error
// NewBatch creates a new batch that can be used to perform multiple operations atomically.
NewBatch() StoreBatch

// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database.
// Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done.
Expand All @@ -45,6 +48,3 @@ type Store interface {
// or while there exist unclosed iterators.
Destroy() error
}

// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")
55 changes: 55 additions & 0 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kvstore

import "errors"

// ErrTableLimitExceeded is returned when the maximum number of tables has been reached.
var ErrTableLimitExceeded = errors.New("table limit exceeded")

// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
// Store permits access to the table as if it were a store.
Store

// Name returns the name of the table.
Name() string

// TableKey creates a new key scoped to this table that can be used for batch operations that modify this table.
TableKey(key []byte) TableKey
}

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// TableBatch is a collection of operations that can be applied atomically to a TableStore.
type TableBatch Batch[TableKey]

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
// and keys within a particular table can be iterated over efficiently.
//
// A TableStore is only required to support a maximum of 2^32-X unique, where X is a small integer number of tables
// reserved for internal use by the table store. The exact value of X is implementation dependent.
//
// Implementations of this interface are expected to be thread-safe, except where noted.
type TableStore interface {

// GetTable gets the table with the given name. If the table does not exist, it is first created.
// Returns ErrTableNotFound if the table does not exist and cannot be created.
GetTable(name string) (Table, error)

// GetTables returns a list of all tables in the store in no particular order.
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() TableBatch

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error

// Destroy shuts down and permanently deletes all data in the store.
Destroy() error
}
74 changes: 74 additions & 0 deletions common/kvstore/tablestore/table_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package tablestore

import (
"github.com/Layr-Labs/eigenda/common/kvstore"
"github.com/Layr-Labs/eigensdk-go/logging"
)

var _ kvstore.TableStore = &tableStore{}

// tableStore is an implementation of TableStore that wraps a Store.
type tableStore struct {
logger logging.Logger

// A base store implementation that this TableStore wraps.
base kvstore.Store

// A map from table names to tables.
tableMap map[string]kvstore.Table
}

// wrapper wraps the given Store to create a TableStore.
//
// WARNING: it is not safe to access the wrapped store directly while the TableStore is in use. The TableStore uses
// special key formatting, and direct access to the wrapped store may violate the TableStore's invariants, resulting
// in undefined behavior.
func newTableStore(
logger logging.Logger,
base kvstore.Store,
tables map[string]kvstore.Table) kvstore.TableStore {

return &tableStore{
logger: logger,
base: base,
tableMap: tables,
}
}

// GetTable gets the table with the given name. If the table does not exist, it is first created.
func (t *tableStore) GetTable(name string) (kvstore.Table, error) {
table, ok := t.tableMap[name]
if !ok {
return nil, kvstore.ErrTableNotFound
}

return table, nil
}

// GetTables returns a list of all tables in the store in no particular order.
func (t *tableStore) GetTables() []kvstore.Table {
tables := make([]kvstore.Table, 0, len(t.tableMap))
for _, table := range t.tableMap {
tables = append(tables, table)
}

return tables
}

// NewBatch creates a new batch for writing to the store.
func (t *tableStore) NewBatch() kvstore.TableBatch {
return &tableStoreBatch{
store: t,
batch: t.base.NewBatch(),
}
}

// Shutdown shuts down the store, flushing any remaining cached data to disk.
func (t *tableStore) Shutdown() error {
return t.base.Shutdown()
}

// Destroy shuts down and permanently deletes all data in the store.
func (t *tableStore) Destroy() error {
return t.base.Destroy()
}
32 changes: 32 additions & 0 deletions common/kvstore/tablestore/table_store_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package tablestore

import "github.com/Layr-Labs/eigenda/common/kvstore"

// tableStoreBatch is a batch for writing to a table store.
type tableStoreBatch struct {
store *tableStore
batch kvstore.StoreBatch
}

// Put adds a key-value pair to the batch.
func (t *tableStoreBatch) Put(key kvstore.TableKey, value []byte) {
if value == nil {
value = []byte{}
}
t.batch.Put(key, value)
}

// Delete removes a key-value pair from the batch.
func (t *tableStoreBatch) Delete(key kvstore.TableKey) {
t.batch.Delete(key)
}

// Apply applies the batch to the store.
func (t *tableStoreBatch) Apply() error {
return t.batch.Apply()
}

// Size returns the number of operations in the batch.
func (t *tableStoreBatch) Size() uint32 {
return t.batch.Size()
}
Loading

0 comments on commit 841e0db

Please sign in to comment.