Skip to content

Commit

Permalink
rocksdb: sync writes and additional checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Dec 13, 2023
1 parent 3e0d377 commit a8bd21a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
4 changes: 2 additions & 2 deletions go/storage/mkvs/db/rocksdb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ func (ba *rocksdbBatch) Commit(root node.Root) error {

// Flush node updates.
if ba.multipartNodes != nil {
if err = ba.db.db.Write(defaultWriteOptions, ba.multipartNodes); err != nil {
if err = ba.db.db.Write(ba.db.defaultWriteOptions, ba.multipartNodes); err != nil {
return fmt.Errorf("mkvs/rocksdb: failed to flush node log batch: %w", err)
}
}
if err = ba.db.db.Write(defaultWriteOptions, ba.bat); err != nil {
if err = ba.db.db.Write(ba.db.defaultWriteOptions, ba.bat); err != nil {
return fmt.Errorf("mkvs/rocksdb: failed to flush batch: %w", err)
}

Expand Down
8 changes: 4 additions & 4 deletions go/storage/mkvs/db/rocksdb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,16 @@ func (m *metadata) getMultipartVersion() uint64 {
return m.value.MultipartVersion
}

func (m *metadata) setMultipartVersion(db *grocksdb.DB, version uint64) error {
func (m *metadata) setMultipartVersion(db *grocksdb.DB, wo *grocksdb.WriteOptions, version uint64) error {
m.Lock()
defer m.Unlock()

m.value.MultipartVersion = version
return m.save(db)
return m.save(db, wo)
}

func (m *metadata) save(db *grocksdb.DB) error {
return db.Put(defaultWriteOptions, metadataKeyFmt.Encode(), cbor.Marshal(m.value))
func (m *metadata) save(db *grocksdb.DB, wo *grocksdb.WriteOptions) error {
return db.Put(wo, metadataKeyFmt.Encode(), cbor.Marshal(m.value))
}

// TODO: Collaps with save.
Expand Down
29 changes: 18 additions & 11 deletions go/storage/mkvs/db/rocksdb/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ var (
)

var (
defaultWriteOptions = grocksdb.NewDefaultWriteOptions()
defaultReadOptions = grocksdb.NewDefaultReadOptions()
defaultFlushOptions = grocksdb.NewDefaultFlushOptions()
)
Expand All @@ -109,6 +108,7 @@ func newOptions(versioned bool, maxCacheSize int64) *grocksdb.Options {

// TODO: Consider separate options for state vs. io.
opts := grocksdb.NewDefaultOptions()
opts.SetParanoidChecks(true)
opts.SetCreateIfMissing(true)
if versioned {
opts.SetComparator(createTimestampComparator())
Expand Down Expand Up @@ -171,11 +171,14 @@ func newOptions(versioned bool, maxCacheSize int64) *grocksdb.Options {
// New creates a new RocksDB-backed node database.
func New(cfg *api.Config) (api.NodeDB, error) {
db := &rocksdbNodeDB{
logger: logging.GetLogger("mkvs/db/rocksdb"),
namespace: cfg.Namespace,
discardWriteLogs: cfg.DiscardWriteLogs,
readOnly: cfg.ReadOnly,
logger: logging.GetLogger("mkvs/db/rocksdb"),
namespace: cfg.Namespace,
discardWriteLogs: cfg.DiscardWriteLogs,
readOnly: cfg.ReadOnly,
defaultWriteOptions: grocksdb.NewDefaultWriteOptions(),
}
// Configure fsync.
db.defaultWriteOptions.SetSync(!cfg.NoFsync)

// Create options for the metadata column family.
// TODO: Consider also tuning some options of the metadata CF (although this is small compared to nodes CFs).
Expand Down Expand Up @@ -270,6 +273,8 @@ type rocksdbNodeDB struct {
cfStateTree *grocksdb.ColumnFamilyHandle
cfIOTree *grocksdb.ColumnFamilyHandle

defaultWriteOptions *grocksdb.WriteOptions

closeOnce sync.Once
}

Expand Down Expand Up @@ -328,7 +333,7 @@ func (d *rocksdbNodeDB) load() error {
// No metadata exists, create some.
d.meta.value.Version = dbVersion
d.meta.value.Namespace = d.namespace
if err = d.meta.save(d.db); err != nil {
if err = d.meta.save(d.db, d.defaultWriteOptions); err != nil {
return err
}

Expand Down Expand Up @@ -763,7 +768,7 @@ func (d *rocksdbNodeDB) Finalize(roots []node.Root) error { // nolint: gocyclo
d.meta.setLastFinalizedVersion(batch, version)

// Commit batch.
if err := d.db.Write(defaultWriteOptions, batch); err != nil {
if err := d.db.Write(d.defaultWriteOptions, batch); err != nil {
return fmt.Errorf("mkvs/rocksdb: failed to commit finalized roots: %w", err)
}

Expand Down Expand Up @@ -875,7 +880,7 @@ func (d *rocksdbNodeDB) Prune(ctx context.Context, version uint64) error {
// Update metadata.
d.meta.setEarliestVersion(batch, version+1)

if err := d.db.Write(defaultWriteOptions, batch); err != nil {
if err := d.db.Write(d.defaultWriteOptions, batch); err != nil {
return fmt.Errorf("mkvs/rocksdb: failed to prune version %d: %w", version, err)
}

Expand Down Expand Up @@ -905,7 +910,7 @@ func (d *rocksdbNodeDB) StartMultipartInsert(version uint64) error {
return nil
}

if err := d.meta.setMultipartVersion(d.db, version); err != nil {
if err := d.meta.setMultipartVersion(d.db, d.defaultWriteOptions, version); err != nil {
return err
}
d.multipartVersion = version
Expand Down Expand Up @@ -976,11 +981,11 @@ func (d *rocksdbNodeDB) cleanMultipartLocked(removeNodes bool) error {

// Apply the batch first. If anything fails, having corrupt
// multipart info in d.meta shouldn't hurt us next run.
if err := d.db.Write(defaultWriteOptions, batch); err != nil {
if err := d.db.Write(d.defaultWriteOptions, batch); err != nil {
return err
}

if err := d.meta.setMultipartVersion(d.db, multipartVersionNone); err != nil {
if err := d.meta.setMultipartVersion(d.db, d.defaultWriteOptions, multipartVersionNone); err != nil {
return err
}

Expand Down Expand Up @@ -1037,11 +1042,13 @@ func (d *rocksdbNodeDB) Sync() error {

func (d *rocksdbNodeDB) Close() {
d.closeOnce.Do(func() {
d.defaultWriteOptions.Destroy()
d.db.Close()
d.cfMetadata = nil
d.cfRoots = nil
d.cfIOTree = nil
d.cfStateTree = nil
d.defaultWriteOptions = nil
d.db = nil
})
}
Expand Down

0 comments on commit a8bd21a

Please sign in to comment.