Skip to content

Commit

Permalink
feat(rdb_load): add support for loading huge lists (#3850)
Browse files Browse the repository at this point in the history
* feat(rdb_load): add support for loading huge lists

* feat(rdb_load): add EnsureObjEncoding
  • Loading branch information
andydunstall authored Oct 2, 2024
1 parent a01dfcb commit a066579
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 56 deletions.
135 changes: 79 additions & 56 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ class RdbLoaderBase::OpaqueObjLoader {
sds ToSds(const RdbVariant& obj);
string_view ToSV(const RdbVariant& obj);

// Returns whether pv_ has the given object type and encoding. If not ec_
// is set to the error.
bool EnsureObjEncoding(CompactObjType type, unsigned encoding);

template <typename F> static void Iterate(const LoadTrace& ltrace, F&& f) {
unsigned cnt = 0;
for (const auto& seg : ltrace.arr) {
Expand Down Expand Up @@ -549,17 +553,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
// Note we only use append_ when the set size exceeds kMaxBlobLen,
// which is greater than SetFamily::MaxIntsetEntries so we'll always use
// a string set not an int set.
if (pv_->ObjType() != OBJ_SET) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
if (!EnsureObjEncoding(OBJ_SET, kEncodingStrMap2)) {
return;
}

set = static_cast<StringSet*>(pv_->RObjPtr());
} else {
set = CompactObj::AllocateMR<StringSet>();
Expand Down Expand Up @@ -667,14 +663,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
// Note we only use append_ when the map size exceeds kMaxBlobLen,
// which is greater than 64 so we'll always use a StringMap set not
// listpack.
if (pv_->ObjType() != OBJ_HASH) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != kEncodingStrMap2) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
if (!EnsureObjEncoding(OBJ_HASH, kEncodingStrMap2)) {
return;
}

Expand Down Expand Up @@ -738,9 +727,22 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
}

void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
quicklist* ql =
quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
auto cleanup = absl::Cleanup([&] { quicklistRelease(ql); });
quicklist* ql;
if (config_.append) {
if (!EnsureObjEncoding(OBJ_LIST, OBJ_ENCODING_QUICKLIST)) {
return;
}

ql = static_cast<quicklist*>(pv_->RObjPtr());
} else {
ql = quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
}

auto cleanup = absl::Cleanup([&] {
if (!config_.append) {
quicklistRelease(ql);
}
});

Iterate(*ltrace, [&](const LoadBlob& blob) {
unsigned container = blob.encoding;
Expand Down Expand Up @@ -802,7 +804,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {

std::move(cleanup).Cancel();

pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
if (!config_.append) {
pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
}
}

void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
Expand All @@ -814,14 +818,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
// Note we only use append_ when the set size exceeds kMaxBlobLen,
// which is greater than server.zset_max_listpack_entries so we'll always
// use a SortedMap set not listpack.
if (pv_->ObjType() != OBJ_ZSET) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType();
ec_ = RdbError(errc::invalid_rdb_type);
return;
}
if (pv_->Encoding() != OBJ_ENCODING_SKIPLIST) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding();
ec_ = RdbError(errc::invalid_encoding);
if (!EnsureObjEncoding(OBJ_ZSET, OBJ_ENCODING_SKIPLIST)) {
return;
}

Expand Down Expand Up @@ -1206,6 +1203,21 @@ string_view RdbLoaderBase::OpaqueObjLoader::ToSV(const RdbVariant& obj) {
return string_view{};
}

bool RdbLoaderBase::OpaqueObjLoader::EnsureObjEncoding(CompactObjType type, unsigned encoding) {
if (pv_->ObjType() != type) {
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType() << "; expected " << type;
ec_ = RdbError(errc::invalid_rdb_type);
return false;
}
if (pv_->Encoding() != encoding) {
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding() << "; expected " << encoding;
ec_ = RdbError(errc::invalid_encoding);
return false;
}

return true;
}

std::error_code RdbLoaderBase::FetchBuf(size_t size, void* dest) {
if (size == 0)
return kOk;
Expand Down Expand Up @@ -1738,42 +1750,53 @@ auto RdbLoaderBase::ReadZSetZL() -> io::Result<OpaqueObj> {
}

auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
uint64_t len;
SET_OR_UNEXPECT(LoadLen(nullptr), len);
size_t len;
if (pending_read_.remaining > 0) {
len = pending_read_.remaining;
} else {
SET_OR_UNEXPECT(LoadLen(NULL), len);
pending_read_.reserve = len;
}

if (len == 0)
return Unexpected(errc::empty_key);

unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);

for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(len, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), container);

if (container != QUICKLIST_NODE_CONTAINER_PACKED &&
container != QUICKLIST_NODE_CONTAINER_PLAIN) {
LOG(ERROR) << "Quicklist integrity check failed.";
return Unexpected(errc::rdb_file_corrupted);
}
}

RdbVariant var;
error_code ec = ReadStringObj(&var);
if (ec)
return make_unexpected(ec);
load_trace->arr.resize(1);
// Lists pack multiple entries into each list node (8Kb by default),
// therefore using a smaller segment length than kMaxBlobLen.
size_t n = std::min<size_t>(len, 512);
load_trace->arr[0].resize(n);
for (size_t i = 0; i < n; ++i) {
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), container);

if (StrLen(var) == 0) {
if (container != QUICKLIST_NODE_CONTAINER_PACKED &&
container != QUICKLIST_NODE_CONTAINER_PLAIN) {
LOG(ERROR) << "Quicklist integrity check failed.";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].rdb_var = std::move(var);
load_trace->arr[i][j].encoding = container;
}
len -= n;

RdbVariant var;
error_code ec = ReadStringObj(&var);
if (ec)
return make_unexpected(ec);

if (StrLen(var) == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[0][i].rdb_var = std::move(var);
load_trace->arr[0][i].encoding = container;
}

// If there are still unread elements, cache the number of remaining
// elements, or clear if the full object has been read.
if (len > n) {
pending_read_.remaining = len - n;
} else if (pending_read_.remaining > 0) {
pending_read_.remaining = 0;
}

return OpaqueObj{std::move(load_trace), rdbtype};
Expand Down
20 changes: 20 additions & 0 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -624,4 +624,24 @@ TEST_F(RdbTest, LoadHugeZSet) {
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
}

// Tests loading a huge list, where the list is loaded in multiple partial
// reads.
TEST_F(RdbTest, LoadHugeList) {
// Add 2 lists with 100k elements each (note must have more than 512*8Kb
// elements to test partial reads).
Run({"debug", "populate", "2", "test", "100", "rand", "type", "list", "elements", "100000"});
ASSERT_EQ(100000, CheckedInt({"llen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"llen", "test:1"}));

RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

ASSERT_EQ(100000, CheckedInt({"llen", "test:0"}));
ASSERT_EQ(100000, CheckedInt({"llen", "test:1"}));
}

} // namespace dfly

0 comments on commit a066579

Please sign in to comment.