Skip to content

Commit

Permalink
fix(eviction): Tune eviction threshold in cache mode
Browse files Browse the repository at this point in the history
fixes #4139

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
  • Loading branch information
BagritsevichStepan committed Nov 21, 2024
1 parent a694bf4 commit 32440fc
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 35 deletions.
30 changes: 15 additions & 15 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);

namespace dfly {
Expand Down Expand Up @@ -456,19 +456,20 @@ TEST_F(DflyEngineTest, OOM) {
/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;

absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
max_memory_limit = 5000000; // 5mb
ResetService();

shard_set->TEST_EnableCacheMode();

ssize_t i = 0;
RespExpr resp;
std::string value(1024, 'b');
for (; i < 10000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
resp = Run({"setex", StrCat("key", i), "30", value});
// we evict some items because 5000 is too much when max_memory_limit is 300000.
if (resp != "OK") {
continue;
}
ASSERT_EQ(resp, "OK");
}

Expand All @@ -483,31 +484,30 @@ TEST_F(DflyEngineTest, Bug207) {
EXPECT_GT(evicted_count(resp.GetString()), 0);

for (; i > 0; --i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
resp = Run({"setex", StrCat("key", i), "30", value});
ASSERT_EQ(resp, "OK");
}
}

TEST_F(DflyEngineTest, StickyEviction) {
max_memory_limit = 300000;
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
max_memory_limit = 5000000; // 5mb
ResetService();

shard_set->TEST_EnableCacheMode();

string tmp_val(100, '.');
std::string value(1024, 'b');

ssize_t failed = -1;
for (ssize_t i = 0; i < 5000; ++i) {
for (ssize_t i = 0; i < 2000; ++i) {
string key = StrCat("volatile", i);
ASSERT_EQ("OK", Run({"set", key, tmp_val}));
ASSERT_EQ("OK", Run({"set", key, value}));
}

bool done = false;
for (ssize_t i = 0; !done && i < 5000; ++i) {
for (ssize_t i = 0; !done && i < 7000; ++i) {
string key = StrCat("key", i);
while (true) {
if (Run({"set", key, tmp_val}) != "OK") {
if (Run({"set", key, value}) != "OK") {
failed = i;
done = true;
break;
Expand Down
25 changes: 21 additions & 4 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ optional<uint32_t> GetPeriodicCycleMs() {
return clock_cycle_ms;
}

template <typename From, typename To>
To ConvertNumericOrRound(From value, From max = std::numeric_limits<To>::max()) {
return static_cast<To>(std::min(value, max));
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand Down Expand Up @@ -704,6 +709,7 @@ void EngineShard::RetireExpiredAndEvict() {
{ std::unique_lock lk(db_slice.GetSerializationMutex()); }
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
constexpr double kRedLimitFactorRev = 1.0 - kRedLimitFactor;

uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
Expand All @@ -717,8 +723,17 @@ void EngineShard::RetireExpiredAndEvict() {
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}

ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
const size_t max_rss_memory =
max_memory_limit * std::max(ServerState::tlocal()->rss_oom_deny_ratio, 0.0);
const ssize_t rss_memory = 0; // ConvertNumericOrRound<uint64_t,
// ssize_t>(rss_mem_current.load(memory_order_relaxed));

// For memory we are using threshold for free memory
const ssize_t memory_budget_threshold =
ssize_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
// For rss we are using threshold for used memory
const ssize_t rss_memory_threshold =
ssize_t(max_rss_memory * kRedLimitFactorRev) / shard_set->size();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

Expand All @@ -736,10 +751,12 @@ void EngineShard::RetireExpiredAndEvict() {
}

// if our budget is below the limit
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
if ((db_slice.memory_budget() < memory_budget_threshold || rss_memory > rss_memory_threshold) &&
GetFlag(FLAGS_enable_heartbeat_eviction)) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
size_t goal_bytes = std::max(memory_budget_threshold - db_slice.memory_budget(),
rss_memory - rss_memory_threshold);
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, goal_bytes);
}
}

Expand Down
18 changes: 5 additions & 13 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");

ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
ABSL_RETIRED_FLAG(
double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");

ABSL_FLAG(double, rss_oom_deny_ratio, 1.25,
"When the ratio between maxmemory and RSS memory exceeds this value, commands marked as "
Expand Down Expand Up @@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args,
return result;
}

void SetOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}

void SetRssOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
Expand Down Expand Up @@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");

config_registry.RegisterSetter<double>("oom_deny_ratio",
[](double val) { SetOomDenyRatioOnAllThreads(val); });

config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });

Expand Down Expand Up @@ -872,7 +865,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
});
Transaction::Init(shard_num);

SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));

// Requires that shard_set will be initialized before because server_family_.Init might
Expand Down Expand Up @@ -1000,7 +992,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);

if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) ||
if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
Expand Down
1 change: 0 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t used_mem = 0;
uint64_t rss_mem = 0;
};

MemoryUsageStats GetMemoryUsage(uint64_t now_ns);

bool AllowInlineScheduling() const;
Expand Down Expand Up @@ -294,7 +295,6 @@ class ServerState { // public struct - to allow initialization.

// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double oom_deny_ratio;
double rss_oom_deny_ratio;

private:
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory):
@pytest.mark.asyncio
async def test_denyoom_commands(df_factory):
df_server = df_factory.create(
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", rss_oom_deny_ratio=0.7
)
df_server.start()
client = df_server.client()
Expand Down
68 changes: 68 additions & 0 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,71 @@ async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port):
# new client create shoud not fail after memory usage decrease
client = df_server.client()
await client.execute_command("set x y")


@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "256mb",
"rss_oom_deny_ratio": 0.5,
}
)
async def test_cache_eviction_with_rss_deny_oom(
df_server: DflyInstance,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""

client = df_server.client()

max_memory = 256 * 1024 * 1024 # 256 MB
first_fill_size = int(0.15 * max_memory) # 20% of max memory
second_fill_size = int(0.3 * max_memory) # Another 20% of max memory
rss_increase_size = int(0.3 * max_memory) # 30% of max memory

key_size = 1024 # 1 mb
num_keys_first_fill = first_fill_size // key_size
num_keys_second_fill = second_fill_size // key_size

# Step 1: Fill 15% of max memory using DEBUG POPULATE
await client.execute_command("DEBUG", "POPULATE", num_keys_first_fill, "key", key_size)

await asyncio.sleep(1) # Wait for RSS heartbeat

# Get RSS memory before creating new connections
info_before_connections = await client.info("memory")
rss_before_connections = info_before_connections["used_memory_rss"]

# Step 2: Increase RSS memory by 30% of max memory
# We can simulate RSS increase by creating new connections
# Estimate memory per connection
estimated_connection_memory = 20 * 1024 # 20 KB per connection
num_connections = rss_increase_size // estimated_connection_memory
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)

await asyncio.sleep(1) # Wait for RSS heartbeat update

# Get RSS memory after creating new connections
info_after_connections = await client.info("memory")
rss_after_connections = info_after_connections["used_memory_rss"]

assert rss_after_connections > rss_before_connections, "RSS memory should have increased."

# Step 3: Attempt to insert another 30% of data
await client.execute_command("DEBUG", "POPULATE", num_keys_second_fill, "key2", key_size)

await asyncio.sleep(1) # Wait for RSS heartbeat

# Step 4: Check that eviction has occurred
info = await client.info("stats")
assert info.get("evicted_keys", 0) > 0, "Eviction should have occurred due to memory pressure."

for conn in connections:
await conn.close()

0 comments on commit 32440fc

Please sign in to comment.