Skip to content

Commit

Permalink
Make a static pinning decision based on the last level with data (not
Browse files Browse the repository at this point in the history
bottommost level) (#626)
  • Loading branch information
udi-speedb committed Oct 8, 2023
1 parent 20a6593 commit 0d9342a
Show file tree
Hide file tree
Showing 24 changed files with 249 additions and 51 deletions.
5 changes: 3 additions & 2 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Unreleased

Fix RepeatableThread to work properly with on thread start callback feature (https://github.com/speedb-io/speedb/pull/667).
* Fix RepeatableThread to work properly with on thread start callback feature (https://github.com/speedb-io/speedb/pull/667).

### New Features
* Non-Blocking Manual Compaction (CompactRange()) - Support non-blocking manual compactions by setting a new CompactRangeOptions option (async_completion_cb). When set, the CompactRange() call will return control to the caller immediately. The manual compaction iteslf will be performed in an internally created thread. The manual compaction will ALWAYS call the specified callback upon completion and provide the completion status (#597).
Expand All @@ -11,8 +11,9 @@ Fix RepeatableThread to work properly with on thread start callback feature (htt
* Unit Testing: Expose the disallow_trivial_move flag in the MoveFilesToLevel testing utility (#677).

### Bug Fixes
* db_bench: fix SeekRandomWriteRandom valid check. Use key and value only after checking iterator is valid.
* db_bench: Fix SeekRandomWriteRandom valid check. Use key and value only after checking iterator is valid.
* Fix a JAVA build issue introduced by #597 (#680)
* Static Pinning: Make static pinning decisions based on the table's level relative to the currently known last level with data (rather than bottommost level) at the time a table reader is created and added to the table cache (#662).

### Miscellaneous

Expand Down
27 changes: 24 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include <algorithm>
#include <cinttypes>
#include <functional>
#include <limits>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -598,9 +599,11 @@ ColumnFamilyData::ColumnFamilyData(
if (_dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer, io_tracer,
db_session_id));
auto is_last_level_with_data_func = std::bind(
&ColumnFamilyData::IsLastLevelWithData, this, std::placeholders::_1);
table_cache_.reset(new TableCache(
ioptions_, file_options, _table_cache, block_cache_tracer, io_tracer,
db_session_id, is_last_level_with_data_func));
blob_file_cache_.reset(
new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
internal_stats_->GetBlobFileReadHist(), io_tracer));
Expand Down Expand Up @@ -1672,6 +1675,24 @@ void ColumnFamilyData::RecoverEpochNumbers() {
vstorage->RecoverEpochNumbers(this);
}

VersionStorageInfo* ColumnFamilyData::TEST_GetCurrentStorageInfo() {
return current_->storage_info();
}

int ColumnFamilyData::IsLastLevelWithData(int level) const {
auto* vstorage = current_->storage_info();
assert(vstorage);

int last_level_with_data = vstorage->num_non_empty_levels() - 1;

auto is_last_level_with_data = false;
if ((level > 0) and (level == last_level_with_data)) {
is_last_level_with_data = true;
}

return is_last_level_with_data;
}

ColumnFamilySet::ColumnFamilySet(
const std::string& dbname, const ImmutableDBOptions* db_options,
const FileOptions& file_options, Cache* table_cache,
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ class ColumnFamilyData {
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);

int IsLastLevelWithData(int level) const;

// REQUIREMENT: db mutex must be held
double TEST_CalculateWriteDelayDivider(
uint64_t compaction_needed_bytes,
Expand All @@ -503,6 +505,8 @@ class ColumnFamilyData {

void TEST_ResetWriteControllerToken() { write_controller_token_.reset(); }

VersionStorageInfo* TEST_GetCurrentStorageInfo();

private:
void UpdateCFRate(void* client_id, uint64_t write_rate);
void ResetCFRate(void* client_id);
Expand Down
7 changes: 5 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ CompactionJob::CompactionJob(
output_directory_(output_directory),
stats_(stats),
bottommost_level_(false),
last_level_with_data_(false),
write_hint_(Env::WLTH_NOT_SET),
compaction_job_stats_(compaction_job_stats),
job_id_(job_id),
Expand Down Expand Up @@ -273,6 +274,7 @@ void CompactionJob::Prepare() {

write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
last_level_with_data_ = cfd->IsLastLevelWithData(c->output_level());

if (c->ShouldFormSubcompactions()) {
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
Expand Down Expand Up @@ -1893,8 +1895,9 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(), cfd->GetID(),
cfd->GetName(), sub_compact->compaction->output_level(),
bottommost_level_, TableFileCreationReason::kCompaction,
0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
bottommost_level_, last_level_with_data_,
TableFileCreationReason::kCompaction, 0 /* oldest_key_time */,
current_time, db_id_, db_session_id_,
sub_compact->compaction->max_output_file_size(), file_number);

outputs.NewBuilder(tboptions);
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ class CompactionJob {
Statistics* stats_;
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;

// Is this compaction creating a file in the last level with data?
bool last_level_with_data_ = false;
Env::WriteLifeTimeHint write_hint_;

IOStatus io_status_;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
0 /* level */, false /* is_bottommost */,
false /* is_last_level_with_data */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, db_id_, db_session_id_,
0 /* target_file_size */, meta.fd.GetNumber());
Expand Down
126 changes: 126 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fcntl.h>

#include <algorithm>
#include <atomic>
#include <set>
#include <thread>
#include <unordered_set>
Expand Down Expand Up @@ -66,6 +67,7 @@
#include "rocksdb/slice_transform.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/table.h"
#include "rocksdb/table_pinning_policy.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/types.h"
Expand Down Expand Up @@ -7426,6 +7428,130 @@ TEST_F(DBTest, ShuttingDownNotBlockStalledWrites) {
thd.join();
}

class MyPinningPolicy : public TablePinningPolicy {
public:
bool MayPin(const TablePinningOptions& /*tpo*/, uint8_t /*type*/,
size_t /*size*/) const override {
return true;
}

~MyPinningPolicy() { ValidateAtDestruction(); }

void ValidateAtDestruction() {
ASSERT_EQ(0U, usage_);
ASSERT_EQ(0U, total_num_pinned_);
ASSERT_EQ(0U, num_pinned_last_level_with_data_);
}

bool PinData(const TablePinningOptions& tpo, uint8_t type, size_t size,
std::unique_ptr<PinnedEntry>* pinned) override {
pinned->reset(
new PinnedEntry(tpo.level, type, size, tpo.is_last_level_with_data));
++total_num_pinned_;
usage_ += size;
if (tpo.is_last_level_with_data) {
++num_pinned_last_level_with_data_;
}

return true;
}

void UnPinData(std::unique_ptr<PinnedEntry>&& pinned) override {
ASSERT_GT(total_num_pinned_, 0U);
--total_num_pinned_;

ASSERT_GE(usage_, pinned->size);
usage_ -= pinned->size;

if (pinned->is_last_level_with_data) {
ASSERT_GT(num_pinned_last_level_with_data_, 0U);
--num_pinned_last_level_with_data_;
}

pinned.reset();
}

size_t GetPinnedUsage() const override { return usage_; }

std::string ToString() const {
std::string result;
result.append("Pinned Memory=")
.append(std::to_string(usage_.load()))
.append("\n");
result.append("Total Num Pinned=")
.append(std::to_string(total_num_pinned_.load()))
.append("\n");
result.append("Total Num Pinned Last Level With Data=")
.append(std::to_string(num_pinned_last_level_with_data_.load()))
.append("\n");
return result;
}

static const char* kClassName() { return "speedb_test_pinning_policy"; }
static const char* kNickName() { return "speedb.TestPinningPolicy"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kNickName(); }

public:
std::atomic<size_t> usage_ = 0U;
std::atomic<uint64_t> total_num_pinned_ = 0U;
std::atomic<uint64_t> num_pinned_last_level_with_data_ = 0U;
};

TEST_F(DBTest, StaticPinningLastLevelWithData) {
Options options = CurrentOptions();
BlockBasedTableOptions block_based_options;
auto pinning_policy = std::make_shared<MyPinningPolicy>();
block_based_options.pinning_policy = pinning_policy;
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
DestroyAndReopen(options);

ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();

auto AssertNumNonEmptyLevels = [&cfd](size_t expected_num_non_empty_levels) {
ASSERT_EQ(expected_num_non_empty_levels,
cfd->TEST_GetCurrentStorageInfo()->num_non_empty_levels());
};

const std::string key1 = "key1";
const std::string value1 = "value1";
const std::string key2 = "key2";
const std::string value2 = "value2";
const std::string value3 = "value3";

// Create a file and place it in level-1
// However, we still expect all pinning to be with last_level_with_data ==
// false
ASSERT_OK(Put(key1, value1));
ASSERT_OK(Flush());
ASSERT_OK(Put(key2, value2));
ASSERT_OK(Flush());

db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);

ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
AssertNumNonEmptyLevels(2);

ASSERT_EQ(2U, pinning_policy->total_num_pinned_);
ASSERT_EQ(0U, pinning_policy->num_pinned_last_level_with_data_);

// Create a file and place it in level-1
// However, we still expect all pinning to be with last_level_with_data ==
// false
ASSERT_OK(Put(key1, value3));
ASSERT_OK(Flush());

// This will create a file at level-1 that is currently known to be the last
// with data
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);

ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
ASSERT_EQ(2U, pinning_policy->total_num_pinned_);
ASSERT_EQ(1U, pinning_policy->num_pinned_last_level_with_data_);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
1 change: 1 addition & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*last_level_with_data*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
6 changes: 3 additions & 3 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,9 @@ Status FlushJob::WriteLevel0Table() {
cfd_->int_tbl_prop_collector_factories(), output_compression_,
mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
0 /* level */, false /* is_bottommost */,
TableFileCreationReason::kFlush, oldest_key_time, current_time,
db_id_, db_session_id_, 0 /* target_file_size */,
meta_.fd.GetNumber());
false /* is_last_level_with_data */, TableFileCreationReason::kFlush,
oldest_key_time, current_time, db_id_, db_session_id_,
0 /* target_file_size */, meta_.fd.GetNumber());
const SequenceNumber job_snapshot_seq =
job_context_->GetJobSnapshotSequence();
s = BuildTable(
Expand Down
1 change: 1 addition & 0 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
env_options_, cfd_->internal_comparator(),
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1, /*bottommost*/ false,
/*last_level_with_data*/ false,
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
Expand Down
1 change: 1 addition & 0 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ class Repairer {
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
kNoCompression, default_compression, cfd->GetID(), cfd->GetName(),
-1 /* level */, false /* is_bottommost */,
false /* is_last_level_with_data */,
TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_,
0 /*target_file_size*/, meta.fd.GetNumber());
Expand Down
17 changes: 12 additions & 5 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ TableCache::TableCache(const ImmutableOptions& ioptions,
const FileOptions* file_options, Cache* const cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id)
const std::string& db_session_id,
IsLastLevelWithDataFunc is_last_leve_with_data_func)
: ioptions_(ioptions),
file_options_(*file_options),
cache_(cache),
immortal_tables_(false),
block_cache_tracer_(block_cache_tracer),
loader_mutex_(kLoadConcurency, kGetSliceNPHash64UnseededFnPtr),
io_tracer_(io_tracer),
db_session_id_(db_session_id) {
db_session_id_(db_session_id),
is_last_leve_with_data_func_(is_last_leve_with_data_func) {
if (ioptions_.row_cache) {
// If the same cache is shared by multiple instances, we need to
// disambiguate its entries.
Expand Down Expand Up @@ -152,12 +154,17 @@ Status TableCache::GetTableReader(
expected_unique_id = kNullUniqueId64x2; // null ID == no verification
}

auto is_last_level_with_data = is_bottom;
if (is_last_leve_with_data_func_) {
is_last_level_with_data = is_last_leve_with_data_func_(level);
}

TableReaderOptions table_reader_options(
ioptions_, prefix_extractor, file_options, internal_comparator,
skip_filters, immortal_tables_, false /* force_direct_prefetch */,
level, is_bottom, block_cache_tracer_, max_file_size_for_l0_meta_pin,
db_session_id_, file_meta.fd.GetNumber(), expected_unique_id,
file_meta.fd.largest_seqno);
level, is_bottom, is_last_level_with_data, block_cache_tracer_,
max_file_size_for_l0_meta_pin, db_session_id_, file_meta.fd.GetNumber(),
expected_unique_id, file_meta.fd.largest_seqno);
table_reader_options.cache_owner_id = cache_owner_id_;

s = ioptions_.table_factory->NewTableReader(
Expand Down
8 changes: 7 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#pragma once
#include <cstdint>
#include <functional>
#include <string>
#include <vector>

Expand Down Expand Up @@ -63,12 +64,16 @@ class HistogramImpl;
// cache, lookup is very fast. The row cache is obtained from
// ioptions.row_cache
class TableCache {
public:
using IsLastLevelWithDataFunc = std::function<int(int level)>;

public:
TableCache(const ImmutableOptions& ioptions,
const FileOptions* storage_options, Cache* cache,
BlockCacheTracer* const block_cache_tracer,
const std::shared_ptr<IOTracer>& io_tracer,
const std::string& db_session_id);
const std::string& db_session_id,
IsLastLevelWithDataFunc is_last_leve_with_data_func = nullptr);
~TableCache();

// Cache interface for table cache
Expand Down Expand Up @@ -287,6 +292,7 @@ class TableCache {
std::shared_ptr<IOTracer> io_tracer_;
std::string db_session_id_;
Cache::ItemOwnerId cache_owner_id_ = Cache::kUnknownItemOwnerId;
IsLastLevelWithDataFunc is_last_leve_with_data_func_;
};

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit 0d9342a

Please sign in to comment.