diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index d2c97af93897..caf4f0bd094c 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -26,7 +26,7 @@ ABSL_DECLARE_FLAG(float, mem_defrag_threshold); ABSL_DECLARE_FLAG(float, mem_defrag_waste_threshold); ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval); ABSL_DECLARE_FLAG(std::vector, rename_command); -ABSL_DECLARE_FLAG(double, oom_deny_ratio); +ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio); ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float); namespace dfly { @@ -456,19 +456,20 @@ TEST_F(DflyEngineTest, OOM) { /// Reproduces the case where items with expiry data were evicted, /// and then written with the same key. TEST_F(DflyEngineTest, Bug207) { - max_memory_limit = 300000; - - absl::FlagSaver fs; - absl::SetFlag(&FLAGS_oom_deny_ratio, 4); + max_memory_limit = 5000000; // 5mb ResetService(); shard_set->TEST_EnableCacheMode(); ssize_t i = 0; RespExpr resp; + std::string value(1024, 'b'); for (; i < 10000; ++i) { - resp = Run({"setex", StrCat("key", i), "30", "bar"}); + resp = Run({"setex", StrCat("key", i), "30", value}); // we evict some items because 5000 is too much when max_memory_limit is 300000. + if (resp != "OK") { + continue; + } ASSERT_EQ(resp, "OK"); } @@ -483,31 +484,30 @@ TEST_F(DflyEngineTest, Bug207) { EXPECT_GT(evicted_count(resp.GetString()), 0); for (; i > 0; --i) { - resp = Run({"setex", StrCat("key", i), "30", "bar"}); + resp = Run({"setex", StrCat("key", i), "30", value}); ASSERT_EQ(resp, "OK"); } } TEST_F(DflyEngineTest, StickyEviction) { - max_memory_limit = 300000; - absl::FlagSaver fs; - absl::SetFlag(&FLAGS_oom_deny_ratio, 4); + max_memory_limit = 5000000; // 5mb ResetService(); + shard_set->TEST_EnableCacheMode(); - string tmp_val(100, '.'); + std::string value(1024, 'b'); ssize_t failed = -1; - for (ssize_t i = 0; i < 5000; ++i) { + for (ssize_t i = 0; i < 2000; ++i) { string key = StrCat("volatile", i); - ASSERT_EQ("OK", Run({"set", key, tmp_val})); + ASSERT_EQ("OK", Run({"set", key, value})); } bool done = false; - for (ssize_t i = 0; !done && i < 5000; ++i) { + for (ssize_t i = 0; !done && i < 7000; ++i) { string key = StrCat("key", i); while (true) { - if (Run({"set", key, tmp_val}) != "OK") { + if (Run({"set", key, value}) != "OK") { failed = i; done = true; break; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 29c7957f2eb5..4c22e6592a04 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -198,6 +198,11 @@ optional GetPeriodicCycleMs() { return clock_cycle_ms; } +template +To ConvertNumericOrRound(From value, From max = std::numeric_limits::max()) { + return static_cast(std::min(value, max)); +} + } // namespace __thread EngineShard* EngineShard::shard_ = nullptr; @@ -704,6 +709,7 @@ void EngineShard::RetireExpiredAndEvict() { { std::unique_lock lk(db_slice.GetSerializationMutex()); } constexpr double kTtlDeleteLimit = 200; constexpr double kRedLimitFactor = 0.1; + constexpr double kRedLimitFactorRev = 1.0 - kRedLimitFactor; uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); uint32_t deleted = GetMovingSum6(TTL_DELETE); @@ -717,8 +723,17 @@ void EngineShard::RetireExpiredAndEvict() { ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); } - ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); + const size_t max_rss_memory = + max_memory_limit * std::max(ServerState::tlocal()->rss_oom_deny_ratio, 0.0); + const ssize_t rss_memory = + ConvertNumericOrRound(rss_mem_current.load(memory_order_relaxed)); + // For memory we are using threshold for free memory + const ssize_t memory_budget_threshold = + ssize_t(max_memory_limit * kRedLimitFactor) / shard_set->size(); + // For rss we are using threshold for used memory + const ssize_t rss_memory_threshold = + ssize_t(max_rss_memory * kRedLimitFactorRev) / shard_set->size(); DbContext db_cntx; db_cntx.time_now_ms = GetCurrentTimeMs(); @@ -736,10 +751,12 @@ void EngineShard::RetireExpiredAndEvict() { } // if our budget is below the limit - if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) { + if ((db_slice.memory_budget() < memory_budget_threshold || rss_memory > rss_memory_threshold) && + GetFlag(FLAGS_enable_heartbeat_eviction)) { uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); - db_slice.FreeMemWithEvictionStep(i, starting_segment_id, - eviction_redline - db_slice.memory_budget()); + size_t goal_bytes = std::max(memory_budget_threshold - db_slice.memory_budget(), + rss_memory - rss_memory_threshold); + db_slice.FreeMemWithEvictionStep(i, starting_segment_id, goal_bytes); } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 66614638b9a1..1e7e834f7a33 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -100,9 +100,10 @@ ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{}, "0 - means the program will automatically determine its maximum memory usage. " "default: 0"); -ABSL_FLAG(double, oom_deny_ratio, 1.1, - "commands with flag denyoom will return OOM when the ratio between maxmemory and used " - "memory is above this value"); +ABSL_RETIRED_FLAG( + double, oom_deny_ratio, 1.1, + "commands with flag denyoom will return OOM when the ratio between maxmemory and used " + "memory is above this value"); ABSL_FLAG(double, rss_oom_deny_ratio, 1.25, "When the ratio between maxmemory and RSS memory exceeds this value, commands marked as " @@ -722,11 +723,6 @@ string FailedCommandToString(std::string_view command, facade::CmdArgList args, return result; } -void SetOomDenyRatioOnAllThreads(double ratio) { - auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->oom_deny_ratio = ratio; }; - shard_set->pool()->AwaitBrief(cb); -} - void SetRssOomDenyRatioOnAllThreads(double ratio) { auto cb = [ratio](unsigned, auto*) { ServerState::tlocal()->rss_oom_deny_ratio = ratio; }; shard_set->pool()->AwaitBrief(cb); @@ -793,9 +789,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("max_eviction_per_heartbeat"); config_registry.RegisterMutable("max_segment_to_consider"); - config_registry.RegisterSetter("oom_deny_ratio", - [](double val) { SetOomDenyRatioOnAllThreads(val); }); - config_registry.RegisterSetter("rss_oom_deny_ratio", [](double val) { SetRssOomDenyRatioOnAllThreads(val); }); @@ -872,7 +865,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector }); Transaction::Init(shard_num); - SetOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_oom_deny_ratio)); SetRssOomDenyRatioOnAllThreads(absl::GetFlag(FLAGS_rss_oom_deny_ratio)); // Requires that shard_set will be initialized before because server_family_.Init might @@ -1000,7 +992,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) { uint64_t start_ns = absl::GetCurrentTimeNanos(); auto memory_stats = etl.GetMemoryUsage(start_ns); - if (memory_stats.used_mem > (max_memory_limit * etl.oom_deny_ratio) || + if (memory_stats.used_mem > max_memory_limit || (etl.rss_oom_deny_ratio > 0 && memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) { DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss " diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dd01f41e6b06..a8f1ff47fb89 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -134,7 +134,6 @@ ABSL_DECLARE_FLAG(bool, tls); ABSL_DECLARE_FLAG(string, tls_ca_cert_file); ABSL_DECLARE_FLAG(string, tls_ca_cert_dir); ABSL_DECLARE_FLAG(int, replica_priority); -ABSL_DECLARE_FLAG(double, oom_deny_ratio); ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio); bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) { diff --git a/src/server/server_state.h b/src/server/server_state.h index f8be766a5816..e80bb5c451f8 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -178,6 +178,7 @@ class ServerState { // public struct - to allow initialization. uint64_t used_mem = 0; uint64_t rss_mem = 0; }; + MemoryUsageStats GetMemoryUsage(uint64_t now_ns); bool AllowInlineScheduling() const; @@ -294,7 +295,6 @@ class ServerState { // public struct - to allow initialization. // Exec descriptor frequency count for this thread. absl::flat_hash_map exec_freq_count; - double oom_deny_ratio; double rss_oom_deny_ratio; private: diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index 2e4c5066cdcf..7d6344c9243e 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -254,6 +254,8 @@ TEST_F(StringFamilyTest, MGetSet) { } TEST_F(StringFamilyTest, MGetCachingModeBug2276) { + max_memory_limit = 3000000; // 3mb + absl::FlagSaver fs; SetTestFlag("cache_mode", "true"); ResetService(); diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index c4ceef3d6398..4adeab03f182 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -149,7 +149,7 @@ async def test_reply_guard_oom(df_factory, df_seeder_factory): @pytest.mark.asyncio async def test_denyoom_commands(df_factory): df_server = df_factory.create( - proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", oom_deny_ratio=0.7 + proactor_threads=1, maxmemory="256mb", oom_deny_commands="get", rss_oom_deny_ratio=0.7 ) df_server.start() client = df_server.client() diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 843e7c606dc9..94c6bd281ffd 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -107,3 +107,70 @@ async def test_rss_oom_ratio(df_factory: DflyInstanceFactory, admin_port): # new client create shoud not fail after memory usage decrease client = df_server.client() await client.execute_command("set x y") + + +@pytest.mark.asyncio +@dfly_args( + { + "proactor_threads": 1, + "cache_mode": "true", + "maxmemory": "256mb", + "rss_oom_deny_ratio": 0.5, + } +) +async def test_cache_eviction_with_rss_deny_oom( + async_client: aioredis.Redis, + df_server: DflyInstance, +): + """ + Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit + """ + + max_memory = 256 * 1024 * 1024 # 256 MB + first_fill_size = int(0.25 * max_memory) # 25% of max memory + second_fill_size = int(0.3 * max_memory) # Another 30% of max memory + rss_increase_size = int(0.3 * max_memory) # 30% of max memory + + key_size = 1024 # 1 mb + num_keys_first_fill = first_fill_size // key_size + num_keys_second_fill = second_fill_size // key_size + + # Fill 25% of max memory using DEBUG POPULATE + await async_client.execute_command("DEBUG", "POPULATE", num_keys_first_fill, "key", key_size) + + await asyncio.sleep(1) # Wait for RSS heartbeat + + # Get RSS memory before creating new connections + info_before_connections = await async_client.info("memory") + rss_before_connections = info_before_connections["used_memory_rss"] + + # Increase RSS memory by 30% of max memory + # We can simulate RSS increase by creating new connections + # Estimate memory per connection + estimated_connection_memory = 15 * 1024 # 15 KB per connection + num_connections = rss_increase_size // estimated_connection_memory + connections = [] + for _ in range(num_connections): + conn = aioredis.Redis(port=df_server.port) + await conn.ping() + connections.append(conn) + + await asyncio.sleep(1) # Wait for RSS heartbeat update + + # Get RSS memory after creating new connections + info_after_connections = await async_client.info("memory") + rss_after_connections = info_after_connections["used_memory_rss"] + + assert rss_after_connections > rss_before_connections, "RSS memory should have increased." + + # Attempt to insert another 30% of data + await async_client.execute_command("DEBUG", "POPULATE", num_keys_second_fill, "key2", key_size) + + await asyncio.sleep(1) # Wait for RSS heartbeat + + # Check that eviction has occurred + info = await async_client.info("stats") + assert info["evicted_keys"] > 0, "Eviction should have occurred due to rss memory pressure." + + for conn in connections: + await conn.close()