Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(eviction): Tune eviction threshold in cache mode #4142

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);

namespace dfly {
Expand Down Expand Up @@ -456,19 +456,20 @@ TEST_F(DflyEngineTest, OOM) {
/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
max_memory_limit = 300000;

absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
max_memory_limit = 5000000; // 5mb
ResetService();

shard_set->TEST_EnableCacheMode();

ssize_t i = 0;
RespExpr resp;
std::string value(1024, 'b');
for (; i < 10000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
resp = Run({"setex", StrCat("key", i), "30", value});
// we evict some items because 5000 is too much when max_memory_limit is 300000.
if (resp != "OK") {
continue;
}
ASSERT_EQ(resp, "OK");
}

Expand All @@ -483,31 +484,30 @@ TEST_F(DflyEngineTest, Bug207) {
EXPECT_GT(evicted_count(resp.GetString()), 0);

for (; i > 0; --i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
resp = Run({"setex", StrCat("key", i), "30", value});
ASSERT_EQ(resp, "OK");
}
}

TEST_F(DflyEngineTest, StickyEviction) {
max_memory_limit = 300000;
absl::FlagSaver fs;
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
max_memory_limit = 5000000; // 5mb
ResetService();

shard_set->TEST_EnableCacheMode();

string tmp_val(100, '.');
std::string value(1024, 'b');

ssize_t failed = -1;
for (ssize_t i = 0; i < 5000; ++i) {
for (ssize_t i = 0; i < 2000; ++i) {
string key = StrCat("volatile", i);
ASSERT_EQ("OK", Run({"set", key, tmp_val}));
ASSERT_EQ("OK", Run({"set", key, value}));
}

bool done = false;
for (ssize_t i = 0; !done && i < 5000; ++i) {
for (ssize_t i = 0; !done && i < 7000; ++i) {
string key = StrCat("key", i);
while (true) {
if (Run({"set", key, tmp_val}) != "OK") {
if (Run({"set", key, value}) != "OK") {
failed = i;
done = true;
break;
Expand Down
25 changes: 21 additions & 4 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ optional<uint32_t> GetPeriodicCycleMs() {
return clock_cycle_ms;
}

template <typename From, typename To>
To ConvertNumericOrRound(From value, From max = std::numeric_limits<To>::max()) {
return static_cast<To>(std::min(value, max));
}

} // namespace

__thread EngineShard* EngineShard::shard_ = nullptr;
Expand Down Expand Up @@ -704,6 +709,7 @@ void EngineShard::RetireExpiredAndEvict() {
{ std::unique_lock lk(db_slice.GetSerializationMutex()); }
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
constexpr double kRedLimitFactorRev = 1.0 - kRedLimitFactor;

uint32_t traversed = GetMovingSum6(TTL_TRAVERSE);
uint32_t deleted = GetMovingSum6(TTL_DELETE);
Expand All @@ -717,8 +723,17 @@ void EngineShard::RetireExpiredAndEvict() {
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}

ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
const size_t max_rss_memory =
max_memory_limit * std::max(ServerState::tlocal()->rss_oom_deny_ratio, 0.0);
const ssize_t rss_memory =
ConvertNumericOrRound<uint64_t, ssize_t>(rss_mem_current.load(memory_order_relaxed));

// For memory we are using threshold for free memory
const ssize_t memory_budget_threshold =
ssize_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
// For rss we are using threshold for used memory
const ssize_t rss_memory_threshold =
ssize_t(max_rss_memory * kRedLimitFactorRev) / shard_set->size();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();

Expand All @@ -736,10 +751,12 @@ void EngineShard::RetireExpiredAndEvict() {
}

// if our budget is below the limit
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
if ((db_slice.memory_budget() < memory_budget_threshold || rss_memory > rss_memory_threshold) &&
GetFlag(FLAGS_enable_heartbeat_eviction)) {
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
size_t goal_bytes = std::max(memory_budget_threshold - db_slice.memory_budget(),
rss_memory - rss_memory_threshold);
db_slice.FreeMemWithEvictionStep(i, starting_segment_id, goal_bytes);
}
}

Expand Down
18 changes: 5 additions & 13 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");

ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
ABSL_RETIRED_FLAG(
double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");

ABSL_FLAG(double, rss_oom_deny_ratio, 1.25,
"When the ratio between maxmemory and RSS memory exceeds this value, commands marked as "
Expand Down Expand Up @@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args,
return result;
}

void SetOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
}

void SetRssOomDenyRatioOnAllThreads(double ratio) {
auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; };
shard_set->pool()->AwaitBrief(cb);
Expand Down Expand Up @@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");

config_registry.RegisterSetter<double>("oom_deny_ratio",
[](double val) { SetOomDenyRatioOnAllThreads(val); });

config_registry.RegisterSetter<double>("rss_oom_deny_ratio",
[](double val) { SetRssOomDenyRatioOnAllThreads(val); });

Expand Down Expand Up @@ -872,7 +865,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
});
Transaction::Init(shard_num);

SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio));
SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio));

// Requires that shard_set will be initialized before because server_family_.Init might
Expand Down Expand Up @@ -1000,7 +992,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);

if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) ||
if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
Expand Down
1 change: 0 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t used_mem = 0;
uint64_t rss_mem = 0;
};

MemoryUsageStats GetMemoryUsage(uint64_t now_ns);

bool AllowInlineScheduling() const;
Expand Down Expand Up @@ -294,7 +295,6 @@ class ServerState { // public struct - to allow initialization.

// Exec descriptor frequency count for this thread.
absl::flat_hash_map<std::string, unsigned> exec_freq_count;
double oom_deny_ratio;
double rss_oom_deny_ratio;

private:
Expand Down
2 changes: 2 additions & 0 deletions src/server/string_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ TEST_F(StringFamilyTest, MGetSet) {
}

TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
max_memory_limit = 3000000; // 3mb

absl::FlagSaver fs;
SetTestFlag("cache_mode", "true");
ResetService();
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/generic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory):
@pytest.mark.asyncio
async def test_denyoom_commands(df_factory):
df_server = df_factory.create(
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7
proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", rss_oom_deny_ratio=0.7
)
df_server.start()
client = df_server.client()
Expand Down
67 changes: 67 additions & 0 deletions tests/dragonfly/memory_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,70 @@ async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port):
# new client create shoud not fail after memory usage decrease
client = df_server.client()
await client.execute_command("set x y")


@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "256mb",
"rss_oom_deny_ratio": 0.5,
}
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
df_server: DflyInstance,
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""

max_memory = 256 * 1024 * 1024 # 256 MB
first_fill_size = int(0.25 * max_memory) # 25% of max memory
second_fill_size = int(0.3 * max_memory) # Another 30% of max memory
rss_increase_size = int(0.3 * max_memory) # 30% of max memory

key_size = 1024 # 1 mb
num_keys_first_fill = first_fill_size // key_size
num_keys_second_fill = second_fill_size // key_size

# Fill 25% of max memory using DEBUG POPULATE
await async_client.execute_command("DEBUG", "POPULATE", num_keys_first_fill, "key", key_size)

await asyncio.sleep(1) # Wait for RSS heartbeat

# Get RSS memory before creating new connections
info_before_connections = await async_client.info("memory")
rss_before_connections = info_before_connections["used_memory_rss"]

# Increase RSS memory by 30% of max memory
# We can simulate RSS increase by creating new connections
# Estimate memory per connection
estimated_connection_memory = 15 * 1024 # 15 KB per connection
num_connections = rss_increase_size // estimated_connection_memory
connections = []
for _ in range(num_connections):
conn = aioredis.Redis(port=df_server.port)
await conn.ping()
connections.append(conn)

await asyncio.sleep(1) # Wait for RSS heartbeat update

# Get RSS memory after creating new connections
info_after_connections = await async_client.info("memory")
rss_after_connections = info_after_connections["used_memory_rss"]

assert rss_after_connections > rss_before_connections, "RSS memory should have increased."

# Attempt to insert another 30% of data
await async_client.execute_command("DEBUG", "POPULATE", num_keys_second_fill, "key2", key_size)

await asyncio.sleep(1) # Wait for RSS heartbeat

# Check that eviction has occurred
info = await async_client.info("stats")
assert info["evicted_keys"] > 0, "Eviction should have occurred due to rss memory pressure."

for conn in connections:
await conn.close()
Loading