Skip to content

Commit

Permalink
rocksdb: separate CF for roots
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Dec 5, 2023
1 parent 556022e commit 21f5eff
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 27 deletions.
4 changes: 2 additions & 2 deletions go/storage/mkvs/db/rocksdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (ba *rocksdbBatch) Commit(root node.Root) error {
return api.ErrAlreadyFinalized
}

rootsMeta, err := loadRootsMetadata(ba.db.db, root.Version)
rootsMeta, err := loadRootsMetadata(ba.db.db, ba.db.cfRoots, root.Version)
if err != nil {
return err
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (ba *rocksdbBatch) Commit(root node.Root) error {
}

var oldRootsMeta *rootsMetadata
oldRootsMeta, err = loadRootsMetadata(ba.db.db, ba.oldRoot.Version)
oldRootsMeta, err = loadRootsMetadata(ba.db.db, ba.db.cfRoots, ba.oldRoot.Version)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions go/storage/mkvs/db/rocksdb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ type rootsMetadata struct {
// Roots is the map of a root created in a version to any derived roots (in this or later versions).
Roots map[node.TypedHash][]node.TypedHash

rootsCf *grocksdb.ColumnFamilyHandle

// version is the version this metadata is for.
version uint64
}

// loadRootsMetadata loads the roots metadata for the given version from the database.
func loadRootsMetadata(db *grocksdb.DB, version uint64) (*rootsMetadata, error) {
rootsMeta := &rootsMetadata{version: version}
func loadRootsMetadata(db *grocksdb.DB, cf *grocksdb.ColumnFamilyHandle, version uint64) (*rootsMetadata, error) {
rootsMeta := &rootsMetadata{version: version, rootsCf: cf}

s, err := db.Get(defaultReadOptions, rootsMetadataKeyFmt.Encode(version))
s, err := db.GetCF(defaultReadOptions, cf, rootsMetadataKeyFmt.Encode(version))
if err != nil {
return nil, fmt.Errorf("mkvs/rocksdb: failed to get roots metadata from backing store: %w", err)
}
Expand All @@ -152,5 +154,5 @@ func loadRootsMetadata(db *grocksdb.DB, version uint64) (*rootsMetadata, error)

// save saves the roots metadata to the database.
func (rm *rootsMetadata) save(batch *grocksdb.WriteBatch) {
batch.Put(rootsMetadataKeyFmt.Encode(rm.version), cbor.Marshal(rm))
batch.PutCF(rm.rootsCf, rootsMetadataKeyFmt.Encode(rm.version), cbor.Marshal(rm))
}
53 changes: 32 additions & 21 deletions go/storage/mkvs/db/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,42 +29,43 @@ const (
multipartVersionNone uint64 = 0
)

// Metadata CF keys (not timestamped).
// Metadata (default) CF keys (not timestamped).
var (
// rootsMetadataKeyFmt is the key format for roots metadata. The key format is (version).
//
// Value is CBOR-serialized rootsMetadata.
// TODO: The rootsMetadata is one per version, which means it can also get quite large,
// maybe use same db options as for nodes CFs? (minus the timestamps).
rootsMetadataKeyFmt = keyformat.New(0x00, uint64(0))

// rootUpdatedNodesKeyFmt is the key format for the pending updated nodes for the
// given root that need to be removed only in case the given root is not among
// the finalized roots. They key format is (version, root).
//
// Value is CBOR-serialized []updatedNode.
rootUpdatedNodesKeyFmt = keyformat.New(0x01, uint64(0), &node.TypedHash{})
rootUpdatedNodesKeyFmt = keyformat.New(0x00, uint64(0), &node.TypedHash{})

// metadataKeyFmt is the key format for metadata.
//
// Value is CBOR-serialized metadata.
metadataKeyFmt = keyformat.New(0x02)
metadataKeyFmt = keyformat.New(0x01)

// multipartRestoreNodeLogKeyFmt is the key format for the nodes inserted during a chunk restore.
// Once a set of chunks is fully restored, these entries should be removed. If chunk restoration
// is interrupted for any reason, the nodes associated with these keys should be removed, along
// with these entries.
//
// Value is empty.
multipartRestoreNodeLogKeyFmt = keyformat.New(0x03, &node.TypedHash{})
multipartRestoreNodeLogKeyFmt = keyformat.New(0x02, &node.TypedHash{})

// multipartRestoreNodeLogKeyFmt is the key format for the root nodes inserted during a chunk restore.
// Once a set of chunks is fully restored, these entries should be removed. If chunk restoration
// is interrupted for any reason, the nodes associated with these keys should be removed, along
// with these entries.
//
// Value is empty.
multipartRestoreRootLogKeyFmt = keyformat.New(0x04, &node.TypedHash{})
multipartRestoreRootLogKeyFmt = keyformat.New(0x03, &node.TypedHash{})
)

// Roots CF keys (not timestamped).
var (
// rootsMetadataKeyFmt is the key format for roots metadata. The key format is (version).
//
// Value is CBOR-serialized rootsMetadata.
rootsMetadataKeyFmt = keyformat.New(0x00, uint64(0))
)

// Node CF keys (timestamped and used by state and io tree CFs).
Expand Down Expand Up @@ -94,6 +95,7 @@ var (

const (
cfMetadataName = "default"
cfRootsName = "roots"
cfStateTreeName = "state_tree"
cfIOTreeName = "io_tree"
)
Expand Down Expand Up @@ -134,6 +136,7 @@ func New(cfg *api.Config) (api.NodeDB, error) {
optsNodes.SetTargetFileSizeMultiplier(2)

bbto := grocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetFormatVersion(4) // Latest version format, default uses older backwards compatible one.
bbto.SetBlockSize(32 * 1024)
bbto.SetPinL0FilterAndIndexBlocksInCache(true)
// Configure block cache. Recommendation is 1/3 of memory budget.
Expand Down Expand Up @@ -184,13 +187,15 @@ func New(cfg *api.Config) (api.NodeDB, error) {
cfg.DB,
[]string{
cfMetadataName,
cfRootsName,
cfStateTreeName,
cfIOTreeName,
},
[]*grocksdb.Options{
optsMeta,
optsNodes,
optsNodes,
optsNodes,
},
false)
case false:
Expand All @@ -199,22 +204,25 @@ func New(cfg *api.Config) (api.NodeDB, error) {
cfg.DB,
[]string{
cfMetadataName,
cfRootsName,
cfStateTreeName,
cfIOTreeName,
},
[]*grocksdb.Options{
optsMeta,
optsNodes,
optsNodes,
optsNodes,
},
)
}
if err != nil {
return nil, fmt.Errorf("mkvs/rocksdb: failed to open database: %w", err)
}
db.cfMetadata = cfHandles[0] // Also the default handle.
db.cfStateTree = cfHandles[1]
db.cfIOTree = cfHandles[2]
db.cfRoots = cfHandles[1]
db.cfStateTree = cfHandles[2]
db.cfIOTree = cfHandles[3]

// Load database metadata.
if err = db.load(); err != nil {
Expand Down Expand Up @@ -248,6 +256,7 @@ type rocksdbNodeDB struct {

db *grocksdb.DB
cfMetadata *grocksdb.ColumnFamilyHandle
cfRoots *grocksdb.ColumnFamilyHandle
cfStateTree *grocksdb.ColumnFamilyHandle
cfIOTree *grocksdb.ColumnFamilyHandle

Expand Down Expand Up @@ -549,7 +558,7 @@ func (d *rocksdbNodeDB) GetRootsForVersion(version uint64) ([]node.Root, error)
return nil, nil
}

rootsMeta, err := loadRootsMetadata(d.db, version)
rootsMeta, err := loadRootsMetadata(d.db, d.cfRoots, version)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -581,7 +590,7 @@ func (d *rocksdbNodeDB) HasRoot(root node.Root) bool {
return false
}

rootsMeta, err := loadRootsMetadata(d.db, root.Version)
rootsMeta, err := loadRootsMetadata(d.db, d.cfRoots, root.Version)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -623,7 +632,7 @@ func (d *rocksdbNodeDB) Finalize(roots []node.Root) error { // nolint: gocyclo
finalizedRoots[node.TypedHashFromRoot(root)] = true
}
var rootsChanged bool
rootsMeta, err := loadRootsMetadata(d.db, version)
rootsMeta, err := loadRootsMetadata(d.db, d.cfRoots, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -777,7 +786,7 @@ func (d *rocksdbNodeDB) Prune(ctx context.Context, version uint64) error {
return api.ErrNotEarliest
}

rootsMeta, err := loadRootsMetadata(d.db, version)
rootsMeta, err := loadRootsMetadata(d.db, d.cfRoots, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -835,7 +844,7 @@ func (d *rocksdbNodeDB) Prune(ctx context.Context, version uint64) error {
}

// Prune roots metadata.
batch.Delete(rootsMetadataKeyFmt.Encode(version))
batch.DeleteCF(d.cfRoots, rootsMetadataKeyFmt.Encode(version))

// Prune all write logs in version.
if !d.discardWriteLogs {
Expand Down Expand Up @@ -1005,20 +1014,22 @@ func (d *rocksdbNodeDB) NewBatch(oldRoot node.Root, version uint64, chunk bool)

func (d *rocksdbNodeDB) Size() (uint64, error) {
meta := d.db.GetColumnFamilyMetadataCF(d.cfMetadata)
roots := d.db.GetColumnFamilyMetadataCF(d.cfRoots)
io := d.db.GetColumnFamilyMetadataCF(d.cfIOTree)
state := d.db.GetColumnFamilyMetadataCF(d.cfStateTree)

return meta.Size() + io.Size() + state.Size(), nil
return meta.Size() + roots.Size() + io.Size() + state.Size(), nil
}

func (d *rocksdbNodeDB) Sync() error {
return d.db.FlushCFs([]*grocksdb.ColumnFamilyHandle{d.cfMetadata, d.cfIOTree, d.cfStateTree}, defaultFlushOptions)
return d.db.FlushCFs([]*grocksdb.ColumnFamilyHandle{d.cfMetadata, d.cfRoots, d.cfIOTree, d.cfStateTree}, defaultFlushOptions)
}

func (d *rocksdbNodeDB) Close() {
d.closeOnce.Do(func() {
d.db.Close()
d.cfMetadata = nil
d.cfRoots = nil
d.cfIOTree = nil
d.cfStateTree = nil
d.db = nil
Expand Down

0 comments on commit 21f5eff

Please sign in to comment.