Skip to content

Commit

Permalink
replace db_session_id with opaque names
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuval-Ariel committed Aug 2, 2023
1 parent 5ce5944 commit f2990d0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
32 changes: 16 additions & 16 deletions include/rocksdb/write_buffer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,18 @@ class WriteBufferManager final {
public:
uint16_t get_start_delay_percent() const { return start_delay_percent_; }

// Add this WriteController(WC) and Logger to controllers_to_session_ids_map_
// and loggers_to_session_ids_map_ respectively.
// Add this WriteController(WC) and Logger to controllers_to_client_ids_map_
// and loggers_to_client_ids_map_ respectively.
// The WBM is responsible for updating (when stalling is allowed) these WCs
// and report through the Loggers.
// The connection between the WC and the Loggers can be looked up through
// controllers_to_loggers_map_ which this method also populates.
void RegisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger,
std::string db_session_id);
const std::string& wbm_client_id);
void DeregisterWCAndLogger(std::shared_ptr<WriteController> wc,
std::shared_ptr<Logger> logger,
std::string db_session_id);
const std::string& wbm_client_id);

private:
// The usage + delay factor are coded in a single (atomic) uint64_t value as
Expand Down Expand Up @@ -326,15 +326,15 @@ class WriteBufferManager final {
std::atomic<uint64_t> coded_usage_state_ = kNoneCodedUsageState;

private:
using DbSessionIDs = std::unordered_set<std::string>;
// When Closing the db, remove this WC/Logger - db_session_id from its
using WBMClientIds = std::unordered_set<std::string>;
// When Closing the db, remove this WC/Logger - wbm_client_id from its
// corresponding map. Returns true if the ptr (WC or Logger) is removed from
// the map when it has no more db_session_id. Meaning no db is using this
// WC/Logger with this WBM
// the map when it has no more wbm_client_id. Meaning no db is using this
// WC/Logger with this WBM.
template <typename SharedPtrType>
bool RemoveFromMap(SharedPtrType ptr, std::string db_session_id,
bool RemoveFromMap(SharedPtrType ptr, const std::string& wbm_client_id,
std::mutex& mutex,
std::unordered_map<SharedPtrType, DbSessionIDs>& map);
std::unordered_map<SharedPtrType, WBMClientIds>& map);

void UpdateControllerDelayState();

Expand All @@ -345,15 +345,15 @@ class WriteBufferManager final {
// a map of all write controllers which are associated with this WBM.
// The WBM needs to update them when its delay requirements change.
// The key is the WC to update and the value is an unordered_set of all
// db_session_ids opened with the WC. The DbSessionIDs are used as unique
// wbm_client_ids opened with the WC. The WBMClientIds are used as unique
// identifiers of the connection between the WC and the db.
std::unordered_map<std::shared_ptr<WriteController>, DbSessionIDs>
controllers_to_session_ids_map_;
std::unordered_map<std::shared_ptr<WriteController>, WBMClientIds>
controllers_to_client_ids_map_;
std::mutex controllers_map_mutex_;

// a map of Loggers similar to the above controllers_to_session_ids_map_.
std::unordered_map<std::shared_ptr<Logger>, DbSessionIDs>
loggers_to_session_ids_map_;
// a map of Loggers similar to the above controllers_to_client_ids_map_.
std::unordered_map<std::shared_ptr<Logger>, WBMClientIds>
loggers_to_client_ids_map_;
std::mutex loggers_map_mutex_;

using Loggers = std::unordered_set<Logger*>;
Expand Down
42 changes: 21 additions & 21 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,29 +323,29 @@ std::string WriteBufferManager::GetPrintableOptions() const {

void WriteBufferManager::RegisterWCAndLogger(
std::shared_ptr<WriteController> wc, std::shared_ptr<Logger> logger,
std::string db_session_id) {
const std::string& wbm_client_id) {
{
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
controllers_to_session_ids_map_[wc].insert(db_session_id);
controllers_to_client_ids_map_[wc].insert(wbm_client_id);
controllers_to_loggers_map_[wc.get()].insert(logger.get());
}
{
std::lock_guard<std::mutex> lock(loggers_map_mutex_);
loggers_to_session_ids_map_[logger].insert(db_session_id);
loggers_to_client_ids_map_[logger].insert(wbm_client_id);
}
}

// UDI - made this member function instead of a free function so that
// DbSessionIDs is recognized.
// WBMClientIds is recognized.
template <typename SharedPtrType>
bool WriteBufferManager::RemoveFromMap(
SharedPtrType ptr, std::string db_session_id, std::mutex& mutex,
std::unordered_map<SharedPtrType, DbSessionIDs>& map) {
SharedPtrType ptr, const std::string& wbm_client_id, std::mutex& mutex,
std::unordered_map<SharedPtrType, WBMClientIds>& map) {
std::lock_guard<std::mutex> lock(mutex);
assert(map.count(ptr));
assert(map[ptr].empty() == false);
assert(map[ptr].count(db_session_id));
map[ptr].erase(db_session_id);
assert(map[ptr].count(wbm_client_id));
map[ptr].erase(wbm_client_id);
if (map[ptr].empty()) {
map.erase(ptr);
return true;
Expand All @@ -356,12 +356,12 @@ bool WriteBufferManager::RemoveFromMap(

void WriteBufferManager::DeregisterWCAndLogger(
std::shared_ptr<WriteController> wc, std::shared_ptr<Logger> logger,
std::string db_session_id) {
bool last_logger = RemoveFromMap(logger, db_session_id, loggers_map_mutex_,
loggers_to_session_ids_map_);
const std::string& wbm_client_id) {
bool last_logger = RemoveFromMap(logger, wbm_client_id, loggers_map_mutex_,
loggers_to_client_ids_map_);
bool last_controller =
RemoveFromMap(wc, db_session_id, controllers_map_mutex_,
controllers_to_session_ids_map_);
RemoveFromMap(wc, wbm_client_id, controllers_map_mutex_,
controllers_to_client_ids_map_);
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
if (last_controller) {
assert(wc.unique() == false);
Expand Down Expand Up @@ -416,13 +416,13 @@ uint64_t CalcDelayFromFactor(uint64_t max_write_rate, uint64_t delay_factor) {

void WriteBufferManager::WBMSetupDelay(uint64_t delay_factor) {
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
for (auto& wc_and_session_id : controllers_to_session_ids_map_) {
// make sure that controllers_to_session_ids_map_ does not hold
for (auto& wc_and_client_id : controllers_to_client_ids_map_) {
// make sure that controllers_to_client_ids_map_ does not hold
// the last ref to the WC.
assert(wc_and_session_id.first.unique() == false);
assert(wc_and_client_id.first.unique() == false);
// the final rate depends on the write controllers max rate so
// each wc can receive a different delay requirement.
WriteController* wc = wc_and_session_id.first.get();
WriteController* wc = wc_and_client_id.first.get();
if (wc->is_dynamic_delay()) {
uint64_t wbm_write_rate =
CalcDelayFromFactor(wc->max_delayed_write_rate(), delay_factor);
Expand All @@ -439,12 +439,12 @@ void WriteBufferManager::WBMSetupDelay(uint64_t delay_factor) {

void WriteBufferManager::ResetDelay(UsageState usage_state) {
std::lock_guard<std::mutex> lock(controllers_map_mutex_);
for (auto& wc_and_session_id : controllers_to_session_ids_map_) {
// make sure that controllers_to_session_ids_map_ does not hold the last ref
for (auto& wc_and_client_id : controllers_to_client_ids_map_) {
// make sure that controllers_to_client_ids_map_ does not hold the last ref
// to the WC since holding the last ref means that the last DB that was
// using this WC has destructed and using this WC is no longer valid.
assert(wc_and_session_id.first.unique() == false);
WriteController* wc = wc_and_session_id.first.get();
assert(wc_and_client_id.first.unique() == false);
WriteController* wc = wc_and_client_id.first.get();
if (wc->is_dynamic_delay()) {
auto usage_state_str = "No Delay";
if (usage_state == UsageState::kStop) {
Expand Down

0 comments on commit f2990d0

Please sign in to comment.