Skip to content

Commit

Permalink
ManualCompactionTest.CompactTouchesAllKeys_NonBlocking passes with co…
Browse files Browse the repository at this point in the history
…mpact range threads mngr
  • Loading branch information
udi-speedb committed Aug 14, 2023
1 parent 643ab50 commit 371ba66
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 3 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ set(SOURCES
db/db_impl/db_impl_experimental.cc
db/db_impl/db_impl_readonly.cc
db/db_impl/db_impl_secondary.cc
db/db_impl/compact_range_threads_mngr.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
Expand Down
2 changes: 2 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ Status DBImpl::CloseHelper() {
spdb_write_->Shutdown();
}

compact_range_threads_mngr_.Shutdown();

// Below check is added as recovery_error_ is not checked and it causes crash
// in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
// reached.
Expand Down
9 changes: 8 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "db/column_family.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_job.h"
#include "db/db_impl/compact_range_threads_mngr.h"
#include "db/db_impl/db_spdb_impl_write.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
Expand Down Expand Up @@ -66,7 +67,6 @@
#include "util/repeatable_thread.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"

namespace ROCKSDB_NAMESPACE {

class Arena;
Expand Down Expand Up @@ -1419,6 +1419,11 @@ class DBImpl : public DB {
const Slice* begin, const Slice* end,
const std::string& trim_ts);

void CompactRangeNonBlockingThread(const CompactRangeOptions options,
ColumnFamilyData* cfd, std::string begin,
std::string end,
const std::string trim_ts);

Status CompactRangeInternalBlocking(const CompactRangeOptions& options,
ColumnFamilyData* cfd, const Slice* begin,
const Slice* end,
Expand Down Expand Up @@ -2761,6 +2766,8 @@ class DBImpl : public DB {
// The number of LockWAL called without matching UnlockWAL call.
// See also lock_wal_write_token_
uint32_t lock_wal_count_;

CompactRangeThreadsMngr compact_range_threads_mngr_;
};

class GetWithTimestampReadCallback : public ReadCallback {
Expand Down
43 changes: 41 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <cinttypes>
#include <deque>
#include <functional>
#include <limits>
#include <memory>
#include <thread>

#include "db/builder.h"
#include "db/db_impl/db_impl.h"
Expand Down Expand Up @@ -987,6 +990,32 @@ Status DBImpl::IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
return Status::OK();
}

void DBImpl::CompactRangeNonBlockingThread(const CompactRangeOptions options,
ColumnFamilyData* cfd,
std::string begin_str,
std::string end_str,
const std::string trim_ts) {
assert(options.async_completion_cb);

fprintf(stderr,
"In CompactRangeNonBlockingThread, calling "
"CompactRangeInternalBlocking\n");

Slice begin{begin_str};
Slice* begin_to_use = begin.empty() ? nullptr : &begin;
Slice end{end_str};
Slice* end_to_use = end.empty() ? nullptr : &end;

auto status = CompactRangeInternalBlocking(options, cfd, begin_to_use,
end_to_use, trim_ts);
fprintf(stderr,
"CompactRangeInternalBlocking Done, calling "
"options.async_completion_cb(%s)\n",
status.ToString().c_str());
options.async_completion_cb(status);
fprintf(stderr, "In CompactRangeNonBlockingThread, Thread Exiting\n");
}

Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
Expand Down Expand Up @@ -1014,8 +1043,18 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
}

if (options.async_completion_cb) {
// Do Nothing
assert(0);
std::string begin_str;
if (begin != nullptr) {
begin_str.assign(begin->data(), begin->size());
}
std::string end_str;
if (end != nullptr) {
end_str.assign(end->data(), end->size());
}
std::thread compact_range_thread(&DBImpl::CompactRangeNonBlockingThread,
this, options, cfd, begin_str, end_str,
trim_ts);
compact_range_threads_mngr_.AddThread(std::move(compact_range_thread));
return Status::OK();
} else {
return CompactRangeInternalBlocking(options, cfd, begin, end, trim_ts);
Expand Down
60 changes: 60 additions & 0 deletions db/manual_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "rocksdb/db.h"
#include "rocksdb/slice.h"
#include "rocksdb/write_batch.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"

using ROCKSDB_NAMESPACE::CompactionFilter;
Expand Down Expand Up @@ -132,6 +133,65 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) {
}
}

void CompactRangeCompleteCb(ROCKSDB_NAMESPACE::Status completion_stats) {
std::cout << "CompactRangeCompleteCb: completion_stats = "
<< completion_stats.ToString() << '\n';
TEST_SYNC_POINT("TestCompactRangeComplete");
}

TEST_F(ManualCompactionTest, CompactTouchesAllKeys_NonBlocking) {
for (int iter = 0; iter < 2; ++iter) {
std::cout << "\nITERATION #" << (iter + 1) << '\n';
DB* db;
Options options;
if (iter == 0) { // level compaction
options.num_levels = 3;
options.compaction_style = CompactionStyle::kCompactionStyleLevel;
} else { // universal compaction
options.compaction_style = CompactionStyle::kCompactionStyleUniversal;
}
options.create_if_missing = true;
options.compression = CompressionType::kNoCompression;
options.compaction_filter = new DestroyAllCompactionFilter();
ASSERT_OK(DB::Open(options, dbname_, &db));

ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("destroy")));
ASSERT_OK(db->Put(WriteOptions(), Slice("key2"), Slice("destroy")));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
ASSERT_OK(db->Put(WriteOptions(), Slice("key4"), Slice("destroy")));

Slice key4("key4");

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"TestCompactRangeComplete", "WaitForCompactRangeComplete"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();

CompactRangeOptions cr_options;
cr_options.async_completion_cb = CompactRangeCompleteCb;
std::cout << "Compact Range Non-Blocking" << '\n';
ASSERT_OK(db->CompactRange(cr_options, nullptr, &key4));
std::cout << "Waiting for Compact Range Non-Blocking to complete" << '\n';

TEST_SYNC_POINT("WaitForCompactRangeComplete");

std::cout << "Compact Range Non-Blocking complete !!!!!" << '\n';

Iterator* itr = db->NewIterator(ReadOptions());
itr->SeekToFirst();
ASSERT_TRUE(itr->Valid());
ASSERT_EQ("key3", itr->key().ToString());
itr->Next();
ASSERT_TRUE(!itr->Valid());
delete itr;

delete options.compaction_filter;
delete db;
ASSERT_OK(DestroyDB(dbname_, options));
}

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(ManualCompactionTest, Test) {
// Open database. Disable compression since it affects the creation
// of layers and the code below is trying to test against a very
Expand Down
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ LIB_SOURCES = \
db/db_impl/db_impl_secondary.cc \
db/db_impl/db_impl_write.cc \
db/db_impl/db_spdb_impl_write.cc \
db/db_impl/compact_range_threads_mngr.cc \
db/db_info_dumper.cc \
db/db_iter.cc \
db/dbformat.cc \
Expand Down

0 comments on commit 371ba66

Please sign in to comment.