Skip to content

Commit

Permalink
Redo of Index/Filter/Data blocks sizes per CF after rebase on RocksDB
Browse files Browse the repository at this point in the history
8.1 (#516)
  • Loading branch information
udi-speedb committed Aug 3, 2023
1 parent 335c424 commit 51c6266
Show file tree
Hide file tree
Showing 23 changed files with 771 additions and 48 deletions.
50 changes: 50 additions & 0 deletions cache/cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,54 @@ void Cache::SetEvictionCallback(EvictionCallback&& fn) {
eviction_callback_ = std::move(fn);
}

// ==================================================================================================================================
Cache::ItemOwnerId Cache::ItemOwnerIdAllocator::Allocate() {
// In practice, onwer-ids are allocated and freed when cf-s
// are created and destroyed => relatively rare => paying
// the price to always lock the mutex and simplify the code
std::lock_guard<std::mutex> lock(free_ids_mutex_);

// First allocate from the free list if possible
if (free_ids_.empty() == false) {
auto allocated_id = free_ids_.front();
free_ids_.pop_front();
return allocated_id;
}

// Nothing on the free list - try to allocate from the
// next item counter if not yet exhausted
if (has_wrapped_around_) {
// counter exhausted, allocation not possible
return kUnknownItemId;
}

auto allocated_id = next_item_owner_id_++;

if (allocated_id == kMaxOwnerItemId) {
has_wrapped_around_ = true;
}

return allocated_id;
}

void Cache::ItemOwnerIdAllocator::Free(ItemOwnerId* id) {
if (*id != kUnknownItemId) {
std::lock_guard<std::mutex> lock(free_ids_mutex_);
// The freed id is lost but this is a luxury feature. We can't
// pay too much space to support it.
if (free_ids_.size() < kMaxFreeItemOwnersIdListSize) {
free_ids_.push_back(*id);
}
*id = kUnknownItemId;
}
}

Cache::ItemOwnerId Cache::GetNextItemOwnerId() {
return owner_id_allocator_.Allocate();
}

void Cache::DiscardItemOwnerId(ItemOwnerId* item_owner_id) {
owner_id_allocator_.Free(item_owner_id);
}

} // namespace ROCKSDB_NAMESPACE
15 changes: 15 additions & 0 deletions cache/cache_entry_roles.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,19 @@ std::string BlockCacheEntryStatsMapKeys::UsedPercent(CacheEntryRole role) {
return GetPrefixedCacheEntryRoleName(kPrefix, role);
}

const std::string& BlockCacheCfStatsMapKeys::CfName() {
static const std::string kCfName = "cf_name";
return kCfName;
}

const std::string& BlockCacheCfStatsMapKeys::CacheId() {
static const std::string kCacheId = "id";
return kCacheId;
}

std::string BlockCacheCfStatsMapKeys::UsedBytes(CacheEntryRole role) {
const static std::string kPrefix = "bytes.";
return GetPrefixedCacheEntryRoleName(kPrefix, role);
}

} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion cache/cache_entry_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class CacheEntryStatsCollector {
last_start_time_micros_ = start_time_micros;
working_stats_.BeginCollection(cache_, clock_, start_time_micros);

cache_->ApplyToAllEntries(working_stats_.GetEntryCallback(), {});
cache_->ApplyToAllEntriesWithOwnerId(working_stats_.GetEntryCallback(),
{});
TEST_SYNC_POINT_CALLBACK(
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr);

Expand Down
17 changes: 14 additions & 3 deletions cache/clock_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,10 +1056,11 @@ void ClockCacheShard<Table>::EraseUnRefEntries() {
}

template <class Table>
void ClockCacheShard<Table>::ApplyToSomeEntries(
void ClockCacheShard<Table>::ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state) {
// The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most
Expand All @@ -1086,7 +1087,7 @@ void ClockCacheShard<Table>::ApplyToSomeEntries(
[callback](const HandleImpl& h) {
UniqueId64x2 unhashed;
callback(ReverseHash(h.hashed_key, &unhashed), h.value,
h.GetTotalCharge(), h.helper);
h.GetTotalCharge(), h.helper, Cache::kUnknownItemId);
},
index_begin, index_end, false);
}
Expand Down Expand Up @@ -1134,6 +1135,16 @@ Status ClockCacheShard<Table>::Insert(const Slice& key,
const Cache::CacheItemHelper* helper,
size_t charge, HandleImpl** handle,
Cache::Priority priority) {
return InsertWithOwnerId(key, hashed_key, value, helper, charge,
Cache::kUnknownItemId, handle, priority);
}

template <class Table>
Status ClockCacheShard<Table>::InsertWithOwnerId(
const Slice& key, const UniqueId64x2& hashed_key, Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */, HandleImpl** handle,
Cache::Priority priority) {
if (UNLIKELY(key.size() != kCacheKeySize)) {
return Status::NotSupported("ClockCache only supports key size " +
std::to_string(kCacheKeySize) + "B");
Expand Down
13 changes: 10 additions & 3 deletions cache/clock_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,12 @@ class ALIGN_AS(CACHE_LINE_SIZE) ClockCacheShard final : public CacheShardBase {
Cache::ObjectPtr value, const Cache::CacheItemHelper* helper,
size_t charge, HandleImpl** handle, Cache::Priority priority);

Status InsertWithOwnerId(const Slice& key, const UniqueId64x2& hashed_key,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */,
HandleImpl** handle, Cache::Priority priority);

HandleImpl* CreateStandalone(const Slice& key, const UniqueId64x2& hashed_key,
Cache::ObjectPtr obj,
const Cache::CacheItemHelper* helper,
Expand Down Expand Up @@ -643,10 +649,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) ClockCacheShard final : public CacheShardBase {

size_t GetTableAddressCount() const;

void ApplyToSomeEntries(
const std::function<void(const Slice& key, Cache::ObjectPtr obj,
void ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state);

void EraseUnRefEntries();
Expand Down
28 changes: 21 additions & 7 deletions cache/lru_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ void LRUCacheShard::EraseUnRefEntries() {
}
}

void LRUCacheShard::ApplyToSomeEntries(
void LRUCacheShard::ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state) {
// The state is essentially going to be the starting hash, which works
// nicely even if we resize between calls because we use upper-most
Expand Down Expand Up @@ -196,7 +197,7 @@ void LRUCacheShard::ApplyToSomeEntries(
[callback,
metadata_charge_policy = metadata_charge_policy_](LRUHandle* h) {
callback(h->key(), h->value, h->GetCharge(metadata_charge_policy),
h->helper);
h->helper, h->item_owner_id);
},
index_begin, index_end);
}
Expand Down Expand Up @@ -518,7 +519,8 @@ bool LRUCacheShard::Release(LRUHandle* e, bool /*useful*/,
LRUHandle* LRUCacheShard::CreateHandle(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
size_t charge) {
size_t charge,
Cache::ItemOwnerId item_owner_id) {
assert(helper);
// value == nullptr is reserved for indicating failure in SecondaryCache
assert(!(helper->IsSecondaryCacheCompatible() && value == nullptr));
Expand All @@ -539,7 +541,7 @@ LRUHandle* LRUCacheShard::CreateHandle(const Slice& key, uint32_t hash,
e->next = e->prev = nullptr;
memcpy(e->key_data, key.data(), key.size());
e->CalcTotalCharge(charge, metadata_charge_policy_);

e->item_owner_id = item_owner_id;
return e;
}

Expand All @@ -548,7 +550,18 @@ Status LRUCacheShard::Insert(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
size_t charge, LRUHandle** handle,
Cache::Priority priority) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge);
return InsertWithOwnerId(key, hash, value, helper, charge,
Cache::kUnknownItemId, handle, priority);
}

Status LRUCacheShard::InsertWithOwnerId(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper,
size_t charge,
Cache::ItemOwnerId item_owner_id,
LRUHandle** handle,
Cache::Priority priority) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge, item_owner_id);
e->SetPriority(priority);
e->SetInCache(true);
return InsertItem(e, handle);
Expand All @@ -559,7 +572,8 @@ LRUHandle* LRUCacheShard::CreateStandalone(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* helper,
size_t charge,
bool allow_uncharged) {
LRUHandle* e = CreateHandle(key, hash, value, helper, charge);
LRUHandle* e =
CreateHandle(key, hash, value, helper, charge, Cache::kUnknownItemId);
e->SetIsStandalone(true);
e->Ref();

Expand Down
15 changes: 12 additions & 3 deletions cache/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct LRUHandle {
uint32_t hash;
// The number of external refs to this entry. The cache itself is not counted.
uint32_t refs;
Cache::ItemOwnerId item_owner_id = Cache::kUnknownItemId;

// Mutable flags - access controlled by mutex
// The m_ and M_ prefixes (and im_ and IM_ later) are to hopefully avoid
Expand Down Expand Up @@ -302,6 +303,12 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {
const Cache::CacheItemHelper* helper, size_t charge,
LRUHandle** handle, Cache::Priority priority);

Status InsertWithOwnerId(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId /* item_owner_id */,
LRUHandle** handle, Cache::Priority priority);

LRUHandle* CreateStandalone(const Slice& key, uint32_t hash,
Cache::ObjectPtr obj,
const Cache::CacheItemHelper* helper,
Expand All @@ -325,10 +332,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {
size_t GetOccupancyCount() const;
size_t GetTableAddressCount() const;

void ApplyToSomeEntries(
void ApplyToSomeEntriesWithOwnerId(
const std::function<void(const Slice& key, Cache::ObjectPtr value,
size_t charge,
const Cache::CacheItemHelper* helper)>& callback,
const Cache::CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>& callback,
size_t average_entries_per_lock, size_t* state);

void EraseUnRefEntries();
Expand Down Expand Up @@ -373,7 +381,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShardBase {

LRUHandle* CreateHandle(const Slice& key, uint32_t hash,
Cache::ObjectPtr value,
const Cache::CacheItemHelper* helper, size_t charge);
const Cache::CacheItemHelper* helper, size_t charge,
Cache::ItemOwnerId item_owner_id);

// Initialized before use.
size_t capacity_;
Expand Down
31 changes: 28 additions & 3 deletions cache/sharded_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,19 @@ class ShardedCache : public ShardedCacheBase {
Status Insert(const Slice& key, ObjectPtr obj, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
return InsertWithOwnerId(key, obj, helper, charge, kUnknownItemId, handle,
priority);
}

Status InsertWithOwnerId(const Slice& key, ObjectPtr obj,
const CacheItemHelper* helper, size_t charge,
ItemOwnerId item_owner_id, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
assert(helper);
HashVal hash = CacheShard::ComputeHash(key);
auto h_out = reinterpret_cast<HandleImpl**>(handle);
return GetShard(hash).Insert(key, hash, obj, helper, charge, h_out,
priority);
return GetShard(hash).InsertWithOwnerId(key, hash, obj, helper, charge,
item_owner_id, h_out, priority);
}

Handle* CreateStandalone(const Slice& key, ObjectPtr obj,
Expand Down Expand Up @@ -235,6 +243,22 @@ class ShardedCache : public ShardedCacheBase {
const std::function<void(const Slice& key, ObjectPtr value, size_t charge,
const CacheItemHelper* helper)>& callback,
const ApplyToAllEntriesOptions& opts) override {
auto callback_with_owner_id =
[&callback](const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper,
Cache::ItemOwnerId /* item_owner_id */) {
callback(key, obj, charge, helper);
};

ApplyToAllEntriesWithOwnerId(callback_with_owner_id, opts);
}

void ApplyToAllEntriesWithOwnerId(
const std::function<void(const Slice& key, ObjectPtr obj, size_t charge,
const CacheItemHelper* helper,
Cache::ItemOwnerId item_owner_id)>&
callback_with_owner_id,
const ApplyToAllEntriesOptions& opts) override {
uint32_t num_shards = GetNumShards();
// Iterate over part of each shard, rotating between shards, to
// minimize impact on latency of concurrent operations.
Expand All @@ -248,7 +272,8 @@ class ShardedCache : public ShardedCacheBase {
remaining_work = false;
for (uint32_t i = 0; i < num_shards; i++) {
if (states[i] != SIZE_MAX) {
shards_[i].ApplyToSomeEntries(callback, aepl, &states[i]);
shards_[i].ApplyToSomeEntriesWithOwnerId(callback_with_owner_id, aepl,
&states[i]);
remaining_work |= states[i] != SIZE_MAX;
}
}
Expand Down
11 changes: 8 additions & 3 deletions cache/typed_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,18 @@ class FullTypedCacheInterface
inline Status InsertFull(
const Slice& key, TValuePtr value, size_t charge,
TypedHandle** handle = nullptr, Priority priority = Priority::LOW,
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier) {
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier,
Cache::ItemOwnerId item_owner_id = Cache::kUnknownItemId) {
auto untyped_handle = reinterpret_cast<Handle**>(handle);
auto helper = lowest_used_cache_tier == CacheTier::kNonVolatileBlockTier
? GetFullHelper()
: GetBasicHelper();
return this->cache_->Insert(key, UpCastValue(value), helper, charge,
untyped_handle, priority);

// fprintf(stderr, "InsertFull: owner_id:%d, charge:%d\n",
// (int)item_owner_id, (int)charge);
return this->cache_->InsertWithOwnerId(key, UpCastValue(value), helper,
charge, item_owner_id,
untyped_handle, priority);
}

// Like SecondaryCache::InsertSaved, with SecondaryCache compatibility
Expand Down
11 changes: 11 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,11 @@ ColumnFamilyData::ColumnFamilyData(
CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
bbto->block_cache)));
}

if (bbto->block_cache && table_cache_) {
cache_owner_id_ = bbto->block_cache->GetNextItemOwnerId();
table_cache_->SetBlockCacheOwnerId(cache_owner_id_);
}
}
}

Expand All @@ -662,6 +667,12 @@ ColumnFamilyData::~ColumnFamilyData() {
prev->next_ = next;
next->prev_ = prev;

const BlockBasedTableOptions* bbto =
ioptions_.table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto && bbto->block_cache) {
bbto->block_cache->DiscardItemOwnerId(&cache_owner_id_);
}

if (!dropped_ && column_family_set_ != nullptr) {
// If it's dropped, it's already removed from column family set
// If column_family_set_ == nullptr, this is dummy CFD and not in
Expand Down
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ class ColumnFamilyData {
static constexpr uint64_t kLaggingFlushesThreshold = 10U;
void SetNumTimedQueuedForFlush(uint64_t num) { num_queued_for_flush_ = num; }

Cache::ItemOwnerId GetCacheOwnerId() const { return cache_owner_id_; }

// Allocate and return a new epoch number
uint64_t NewEpochNumber() { return next_epoch_number_.fetch_add(1); }

Expand Down Expand Up @@ -688,6 +690,8 @@ class ColumnFamilyData {
uint64_t num_queued_for_flush_ = 0U;

std::atomic<uint64_t> next_epoch_number_;

Cache::ItemOwnerId cache_owner_id_ = Cache::kUnknownItemId;
};

// ColumnFamilySet has interesting thread-safety requirements
Expand Down
Loading

0 comments on commit 51c6266

Please sign in to comment.