Skip to content

Commit

Permalink
Clean Memory: Introduce a TablePinningPolicy to control how and when …
Browse files Browse the repository at this point in the history
…memory is pinned (#459)
  • Loading branch information
mrambacher authored Aug 2, 2023
1 parent 8ed9b68 commit 6e05428
Show file tree
Hide file tree
Showing 48 changed files with 1,135 additions and 175 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ set(SOURCES
table/block_based/partitioned_index_iterator.cc
table/block_based/partitioned_index_reader.cc
table/block_based/reader_common.cc
table/block_based/table_pinning_policy.cc
table/block_based/uncompression_dict_reader.cc
table/block_fetcher.cc
table/cuckoo/cuckoo_table_builder.cc
Expand Down
2 changes: 1 addition & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Speedb Change Log

## Unreleased

* Add a TablePinningPolicy to the BlockBasedTableOptions. This class controls when blocks should be pinned in memory for a block based table. The default behavior uses the MetadataCacheOptions to control pinning and behaves identical to the previous releases.
### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.

Expand Down
2 changes: 2 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"table/block_based/partitioned_index_iterator.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/table_pinning_policy.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
Expand Down Expand Up @@ -550,6 +551,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"table/block_based/partitioned_index_iterator.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/table_pinning_policy.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/compaction_merging_iterator.cc",
Expand Down
2 changes: 1 addition & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
2 changes: 1 addition & 1 deletion db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
5 changes: 3 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ Status TableCache::GetTableReader(
file->Hint(FSRandomAccessFile::kRandom);
}
StopWatch sw(ioptions_.clock, ioptions_.stats, TABLE_OPEN_IO_MICROS);
bool is_bottom = (level == ioptions_.num_levels - 1);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.stats : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter.get(), ioptions_.listeners,
file_temperature, level == ioptions_.num_levels - 1));
file_temperature, is_bottom));
UniqueId64x2 expected_unique_id;
if (ioptions_.verify_sst_unique_id_in_manifest) {
expected_unique_id = file_meta.unique_id;
Expand All @@ -140,7 +141,7 @@ Status TableCache::GetTableReader(
ro,
TableReaderOptions(ioptions_, prefix_extractor, file_options,
internal_comparator, skip_filters, immortal_tables_,
false /* force_direct_prefetch */, level,
false /* force_direct_prefetch */, level, is_bottom,
block_cache_tracer_, max_file_size_for_l0_meta_pin,
db_session_id_, file_meta.fd.GetNumber(),
expected_unique_id, file_meta.fd.largest_seqno),
Expand Down
1 change: 1 addition & 0 deletions db_stress_tool/db_stress_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ DECLARE_bool(compression_use_zstd_dict_trainer);
DECLARE_string(checksum_type);
DECLARE_string(env_uri);
DECLARE_string(fs_uri);
DECLARE_string(pinning_policy);
DECLARE_uint64(ops_per_thread);
DECLARE_uint64(log2_keys_per_lock);
DECLARE_uint64(max_manifest_file_size);
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_gflags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ DEFINE_string(fs_uri, "",
" with --env_uri."
" Creates a default environment with the specified filesystem.");

DEFINE_string(pinning_policy, "", "URI for registry TablePinningPolicy");

DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range);
Expand Down
20 changes: 20 additions & 0 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/filter_policy.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/table_pinning_policy.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/write_batch_with_index.h"
Expand Down Expand Up @@ -506,6 +507,12 @@ std::string StressTest::DebugString(const Slice& value,
void StressTest::PrintStatistics() {
if (dbstats) {
fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str());
const auto bbto =
options_.table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto != nullptr && bbto->pinning_policy) {
fprintf(stdout, "PINNING STATISTICS:\n%s\n",
bbto->pinning_policy->ToString().c_str());
}
}
if (dbstats_secondaries) {
fprintf(stdout, "Secondary instances STATISTICS:\n%s\n",
Expand Down Expand Up @@ -3098,6 +3105,19 @@ void InitializeOptionsFromFlags(
block_based_options.max_auto_readahead_size = FLAGS_max_auto_readahead_size;
block_based_options.num_file_reads_for_auto_readahead =
FLAGS_num_file_reads_for_auto_readahead;
if (!FLAGS_pinning_policy.empty()) {
ConfigOptions config_options;
config_options.ignore_unknown_options = false;
config_options.ignore_unsupported_options = false;
Status s = TablePinningPolicy::CreateFromString(
config_options, FLAGS_pinning_policy,
&block_based_options.pinning_policy);
if (!s.ok()) {
fprintf(stderr, "Failed to create PinningPolicy: %s\n",
s.ToString().c_str());
exit(1);
}
}
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));

// Write-Buffer-Manager
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct TableReaderOptions;
struct TableBuilderOptions;
class TableBuilder;
class TableFactory;
class TablePinningPolicy;
class TableReader;
class WritableFileWriter;
struct ConfigOptions;
Expand Down Expand Up @@ -655,6 +656,9 @@ struct BlockBasedTableOptions {
//
// Default: 2
uint64_t num_file_reads_for_auto_readahead = 2;

// EXPERIMENTAL
std::shared_ptr<TablePinningPolicy> pinning_policy;
};

// Table Properties that are specific to block-based table properties.
Expand Down
118 changes: 118 additions & 0 deletions include/rocksdb/table_pinning_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#pragma once

#include "rocksdb/customizable.h"
#include "rocksdb/status.h"

namespace ROCKSDB_NAMESPACE {

struct BlockBasedTableOptions;
struct ConfigOptions;

// Struct that contains information about the table being evaluated for pinning
struct TablePinningOptions {
TablePinningOptions() = default;

TablePinningOptions(int _level, bool _is_bottom, size_t _file_size,
size_t _max_file_size_for_l0_meta_pin)
: level(_level),
is_bottom(_is_bottom),
file_size(_file_size),
max_file_size_for_l0_meta_pin(_max_file_size_for_l0_meta_pin) {}
int level = -1;
bool is_bottom = false;
size_t file_size = 0;
size_t max_file_size_for_l0_meta_pin = 0;
};

// Struct containing information about an entry that has been pinned
struct PinnedEntry {
PinnedEntry() {}
PinnedEntry(int _level, uint8_t _type, size_t _size)
: level(_level), type(_type), size(_size) {}

int level = -1;
uint8_t type = 0;
size_t size = 0;
};

// TablePinningPolicy provides a configurable way to determine when blocks
// should be pinned in memory for the block based tables.
//
// Exceptions MUST NOT propagate out of overridden functions into RocksDB,
// because RocksDB is not exception-safe. This could cause undefined behavior
// including data loss, unreported corruption, deadlocks, and more.
class TablePinningPolicy : public Customizable {
public:
static const uint8_t kTopLevel = 1;
static const uint8_t kPartition = 2;
static const uint8_t kIndex = 3;
static const uint8_t kFilter = 4;
static const uint8_t kDictionary = 5;
static const char* Type() { return "TablePinningPolicy"; }

// Creates/Returns a new TablePinningPolicy based in the input value
static Status CreateFromString(const ConfigOptions& config_options,
const std::string& value,
std::shared_ptr<TablePinningPolicy>* policy);
virtual ~TablePinningPolicy() = default;

// Returns true if the block defined by type and size is a candidate for
// pinning This method indicates that pinning might be possible, but does not
// perform the pinning operation. Returns true if the data is a candidate for
// pinning and false otherwise
virtual bool MayPin(const TablePinningOptions& tpo, uint8_t type,
size_t size) const = 0;

// Attempts to pin the block in memory.
// If successful, pinned returns the pinned block
// Returns true and updates pinned on success and false if the data cannot be
// pinned
virtual bool PinData(const TablePinningOptions& tpo, uint8_t type,
size_t size, std::unique_ptr<PinnedEntry>* pinned) = 0;

// Releases and clears the pinned entry.
virtual void UnPinData(std::unique_ptr<PinnedEntry>&& pinned) = 0;

// Returns the amount of data currently pinned.
virtual size_t GetPinnedUsage() const = 0;

// Returns the info (e.g. statistics) associated with this policy.
virtual std::string ToString() const = 0;
};

class TablePinningPolicyWrapper : public TablePinningPolicy {
public:
explicit TablePinningPolicyWrapper(
const std::shared_ptr<TablePinningPolicy>& t)
: target_(t) {}
bool MayPin(const TablePinningOptions& tpo, uint8_t type,
size_t size) const override {
return target_->MayPin(tpo, type, size);
}

bool PinData(const TablePinningOptions& tpo, uint8_t type, size_t size,
std::unique_ptr<PinnedEntry>* pinned) override {
return target_->PinData(tpo, type, size, pinned);
}

void UnPinData(std::unique_ptr<PinnedEntry>&& pinned) override {
target_->UnPinData(std::move(pinned));
}

size_t GetPinnedUsage() const override { return target_->GetPinnedUsage(); }

protected:
std::shared_ptr<TablePinningPolicy> target_;
};

TablePinningPolicy* NewDefaultPinningPolicy(const BlockBasedTableOptions& bbto);
} // namespace ROCKSDB_NAMESPACE
32 changes: 31 additions & 1 deletion options/customizable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table_pinning_policy.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
Expand Down Expand Up @@ -1392,6 +1393,22 @@ class MockFilterPolicy : public FilterPolicy {
}
};

class MockTablePinningPolicy : public TablePinningPolicy {
public:
static const char* kClassName() { return "Mock"; }
const char* Name() const override { return kClassName(); }
bool MayPin(const TablePinningOptions&, uint8_t, size_t) const override {
return false;
}
bool PinData(const TablePinningOptions&, uint8_t, size_t,
std::unique_ptr<PinnedEntry>*) override {
return false;
}
void UnPinData(std::unique_ptr<PinnedEntry>&&) override {}
size_t GetPinnedUsage() const override { return 0; }
std::string ToString() const override { return ""; }
};

static int RegisterLocalObjects(ObjectLibrary& library,
const std::string& /*arg*/) {
size_t num_types;
Expand Down Expand Up @@ -1506,14 +1523,20 @@ static int RegisterLocalObjects(ObjectLibrary& library,
guard->reset(new MockTablePropertiesCollectorFactory());
return guard->get();
});

library.AddFactory<const FilterPolicy>(
MockFilterPolicy::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<const FilterPolicy>* guard,
std::string* /* errmsg */) {
guard->reset(new MockFilterPolicy());
return guard->get();
});
library.AddFactory<TablePinningPolicy>(
MockTablePinningPolicy::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<TablePinningPolicy>* guard,
std::string* /* errmsg */) {
guard->reset(new MockTablePinningPolicy());
return guard->get();
});

return static_cast<int>(library.GetFactoryCount(&num_types));
}
Expand Down Expand Up @@ -2108,6 +2131,13 @@ TEST_F(LoadCustomizableTest, LoadFlushBlockPolicyFactoryTest) {
}
}

TEST_F(LoadCustomizableTest, LoadTablePiningPolicyTest) {
ASSERT_OK(TestSharedBuiltins<TablePinningPolicy>("Mock", ""));
if (RegisterTests("Test")) {
ExpectCreateShared<TablePinningPolicy>("Mock");
}
}

} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
Expand Down
3 changes: 3 additions & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "options/db_options.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "rocksdb/table_pinning_policy.h"
#include "test_util/testharness.h"

#ifndef GFLAGS
Expand Down Expand Up @@ -129,6 +130,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
sizeof(CacheUsageOptions)},
{offsetof(struct BlockBasedTableOptions, filter_policy),
sizeof(std::shared_ptr<const FilterPolicy>)},
{offsetof(struct BlockBasedTableOptions, pinning_policy),
sizeof(std::shared_ptr<TablePinningPolicy>)},
};

// In this test, we catch a new option of BlockBasedTableOptions that is not
Expand Down
5 changes: 3 additions & 2 deletions plugin/speedb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
set(speedb_SOURCES
speedb_registry.cc
memtable/hash_spd_rep.cc
paired_filter/speedb_paired_bloom.cc
paired_filter/speedb_paired_bloom_internal.cc)
paired_filter/speedb_paired_bloom.cc
paired_filter/speedb_paired_bloom_internal.cc
pinning_policy/scoped_pinning_policy.cc)

set(speedb_FUNC register_SpeedbPlugins)
Loading

0 comments on commit 6e05428

Please sign in to comment.