Skip to content

Commit

Permalink
db_bench and performence changes
Browse files Browse the repository at this point in the history
  • Loading branch information
RoyBenMoshe committed Aug 7, 2023
1 parent 2ac458d commit 1edf41a
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 37 deletions.
Binary file added core
Binary file not shown.
14 changes: 11 additions & 3 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <cinttypes>
#include <limits>

#include "db/write_controller.h"
#include "logging/logging.h"
#include "monitoring/statistics.h"
#include "options/db_options.h"
Expand All @@ -33,6 +32,7 @@
#include "rocksdb/table_properties.h"
#include "rocksdb/wal_filter.h"
#include "rocksdb/write_buffer_manager.h"
#include "rocksdb/write_controller.h"
#include "table/block_based/block_based_table_factory.h"
#include "util/compression.h"

Expand Down Expand Up @@ -550,17 +550,25 @@ SharedOptions::SharedOptions(size_t total_ram_size_bytes, size_t total_threads,
cache = NewLRUCache(total_ram_size_bytes_);
write_controller.reset(
new WriteController(true /*dynamic_delay*/, delayed_write_rate_));
write_buffer_manager.reset(
new WriteBufferManager(initial_write_buffer_size_, cache));
write_buffer_manager.reset(new WriteBufferManager(
initial_write_buffer_size_, cache, true /*allow_stall*/));
}

void SharedOptions::IncreaseWriteBufferSize(size_t increase_by) {
if (write_buffer_manager->buffer_size() == 1 && increase_by > 1) {
write_buffer_manager->SetBufferSize(increase_by);
if (total_ram_size_bytes_ / 4 > increase_by) {
write_buffer_manager->SetBufferSize(increase_by);
} else if (total_ram_size_bytes_ / 4 != increase_by) {
write_buffer_manager->SetBufferSize(total_ram_size_bytes_ / 4);
}
} else if (total_ram_size_bytes_ / 4 >
write_buffer_manager->buffer_size() + increase_by) {
write_buffer_manager->SetBufferSize(write_buffer_manager->buffer_size() +
increase_by);
} else if (total_ram_size_bytes_ / 4 !=
write_buffer_manager->buffer_size() + increase_by) {
write_buffer_manager->SetBufferSize(total_ram_size_bytes_ / 4);
}
}

Expand Down
2 changes: 1 addition & 1 deletion options/options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include "cache/lru_cache.h"
#include "cache/sharded_cache.h"
#include "db/write_controller.h"
#include "options/options_helper.h"
#include "options/options_parser.h"
#include "port/port.h"
Expand All @@ -25,6 +24,7 @@
#include "rocksdb/utilities/leveldb_options.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/write_controller.h"
#include "table/block_based/filter_policy_internal.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
Expand Down
91 changes: 58 additions & 33 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ namespace {
// The benchmark needs to be created before running the first group, retained
// between groups, and destroyed after running the last group
std::unique_ptr<ROCKSDB_NAMESPACE::Benchmark> benchmark;
// // The shared options needs to be created before running the first group,
// retained
// // between groups, and destroyed after running the last group
// std::unique_ptr<ROCKSDB_NAMESPACE::SharedOptions> shared_options;

int ErrorExit(const char* format, ...) {
std::string extended_format = std::string("\nERROR: ") + format + "\n";
Expand Down Expand Up @@ -299,8 +303,6 @@ DEFINE_string(
"\twaitforcompaction - pause until compaction is (probably) done\n"
"\tflush - flush the memtable\n"
"\tstats -- Print DB stats\n"
"\ttable-readers-mem -- Print table readers memory. excluding memory "
"used in block cache\n"
"\tresetstats -- Reset DB stats\n"
"\tlevelstats -- Print the number of files and bytes per level\n"
"\tmemstats -- Print memtable stats\n"
Expand Down Expand Up @@ -1860,11 +1862,9 @@ DEFINE_int64(multiread_stride, 0,
DEFINE_bool(multiread_batched, false, "Use the new MultiGet API");

DEFINE_string(memtablerep, "speedb.HashSpdRepFactory", "");

DEFINE_int64(hash_bucket_count, 1000000, "hash bucket count");
DEFINE_bool(use_seek_parallel_threshold, true,
"if use seek parallel threshold .");

DEFINE_bool(use_seek_parralel_threshold, true,
"if use seek parralel threshold .");
DEFINE_bool(use_plain_table, false,
"if use plain table instead of block-based table format");
DEFINE_bool(use_cuckoo_table, false, "if use cuckoo table format");
Expand Down Expand Up @@ -1949,6 +1949,14 @@ namespace {
std::vector<uint64_t> db_idxs_to_use;
} // namespace

DEFINE_bool(enable_speedb_features, false,
"If true, Speedb features will be enabled "
"You must provide total_ram_size in bytes ,"
" and max_background_jobs. "
"delayed_write_rate is recommended. ");

DEFINE_uint64(total_ram_size, 512 * 1024 * 1024ul,
"SharedOptions total ram size bytes. ");
namespace ROCKSDB_NAMESPACE {
namespace {
static Status CreateMemTableRepFactory(
Expand Down Expand Up @@ -3659,15 +3667,9 @@ class Benchmark {
std::unique_ptr<ExpiredTimeFilter> filter;
while (std::getline(benchmark_stream, name, ',')) {
if (open_options_.write_buffer_manager) {
fprintf(stderr,
"\nWBM's Usage Info [BEFORE Benchmark (%s)]: %s OF %s\n\n",
name.c_str(),
BytesToHumanString(
open_options_.write_buffer_manager->memory_usage())
.c_str(),
BytesToHumanString(
open_options_.write_buffer_manager->buffer_size())
.c_str());
fprintf(stderr, "\nBEFORE Benchmark (%s): %lu OF %lu\n\n", name.c_str(),
open_options_.write_buffer_manager->memory_usage(),
open_options_.write_buffer_manager->buffer_size());
}

// Sanitize parameters
Expand Down Expand Up @@ -3970,9 +3972,6 @@ class Benchmark {
PrintStats("rocksdb.block-cache-entry-stats");
} else if (name == "stats") {
PrintStats("rocksdb.stats");
} else if (name == "table-readers-mem") {
fprintf(stdout, "table-readers-mem");
PrintStats("rocksdb.estimate-table-readers-mem");
} else if (name == "resetstats") {
ResetStats();
} else if (name == "verify") {
Expand Down Expand Up @@ -4129,15 +4128,9 @@ class Benchmark {
}

if (open_options_.write_buffer_manager) {
fprintf(stderr,
"\nWBM's Usage Info [AFTER Benchmark (%s)]: %s OF %s\n\n",
name.c_str(),
BytesToHumanString(
open_options_.write_buffer_manager->memory_usage())
.c_str(),
BytesToHumanString(
open_options_.write_buffer_manager->buffer_size())
.c_str());
fprintf(stderr, "\nAFTER Benchmark (%s): %lu OF %lu\n", name.c_str(),
open_options_.write_buffer_manager->memory_usage(),
open_options_.write_buffer_manager->buffer_size());
}
}

Expand Down Expand Up @@ -5170,6 +5163,11 @@ class Benchmark {

void OpenDb(Options options, const std::string& db_name,
DBWithColumnFamilies* db) {
SharedOptions so(FLAGS_total_ram_size, options.max_background_jobs,
options.delayed_write_rate);
if (FLAGS_enable_speedb_features) {
options.EnableSpeedbFeatures(so);
}
uint64_t open_start = FLAGS_report_open_timing ? FLAGS_env->NowNanos() : 0;
Status s;
// Open with column families if necessary.
Expand All @@ -5183,8 +5181,14 @@ class Benchmark {
}
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < num_hot; i++) {
column_families.push_back(ColumnFamilyDescriptor(
ColumnFamilyName(i), ColumnFamilyOptions(options)));
if (FLAGS_enable_speedb_features) {
column_families.push_back(ColumnFamilyDescriptor(
ColumnFamilyName(i),
*ColumnFamilyOptions(options).EnableSpeedbFeaturesCF(so)));
} else {
column_families.push_back(ColumnFamilyDescriptor(
ColumnFamilyName(i), ColumnFamilyOptions(options)));
}
}
std::vector<int> cfh_idx_to_prob;
if (!FLAGS_column_family_distribution.empty()) {
Expand Down Expand Up @@ -5331,8 +5335,14 @@ class Benchmark {
}
} else if (FLAGS_ttl > 0) {
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
if (FLAGS_enable_speedb_features) {
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName,
*ColumnFamilyOptions(options).EnableSpeedbFeaturesCF(so)));
} else {
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
}
DBWithTTL* db_with_ttl;
std::vector<int32_t> ttls(column_families.size(), FLAGS_ttl);
s = DBWithTTL::Open(options, db_name, column_families, &db->cfh,
Expand All @@ -5345,8 +5355,14 @@ class Benchmark {
}
} else {
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
if (FLAGS_enable_speedb_features) {
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName,
*ColumnFamilyOptions(options).EnableSpeedbFeaturesCF(so)));
} else {
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
}
s = DB::Open(options, db_name, column_families, &db->cfh, &db->db);
db->cfh.resize(1);
db->num_created = 1;
Expand Down Expand Up @@ -9516,6 +9532,15 @@ int db_bench_tool_run_group(int group_num, int num_groups, int argc,
"settable");
}

if (FLAGS_enable_speedb_features) {
if (gflags::GetCommandLineFlagInfoOrDie("max_background_jobs").is_default ||
gflags::GetCommandLineFlagInfoOrDie("total_ram_size").is_default) {
ErrorExit(
"enable_speedb_features - Please provide explicitly total_ram_size "
"in bytes and max_background_jobs ");
}
}

if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NONE"))
FLAGS_compaction_fadvice_e = ROCKSDB_NAMESPACE::Options::NONE;
else if (!strcasecmp(FLAGS_compaction_fadvice.c_str(), "NORMAL"))
Expand Down

0 comments on commit 1edf41a

Please sign in to comment.