Skip to content

Commit

Permalink
KIKIMR-19216: improve merging
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov committed Oct 14, 2023
1 parent b8a327d commit 306a5ee
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 107 deletions.
11 changes: 4 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), portionRecordIndexField, column->BuildArray(batch->num_rows())));
}
Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
mergeStream.AddPoolSource({}, batch, nullptr);
mergeStream.AddSource(batch, nullptr);
}
batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true);
batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields);
}
Y_ABORT_UNLESS(batchResults.size());

Expand Down Expand Up @@ -208,11 +208,8 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter
return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
}

void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) {
if (CheckPoints.size()) {
AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less);
}
CheckPoints.emplace_back(position);
void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include) {
AFL_VERIFY(CheckPoints.emplace(position, include).second);
}

}
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
private:
using TBase = TCompactColumnEngineChanges;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints;
std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
protected:
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
virtual TPortionMeta::EProduced GetResultProducedClass() const override {
Expand All @@ -19,7 +19,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
public:
using TBase::TBase;

void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position);
void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position, const bool include = true);

virtual TString TypeString() const override {
return StaticTypeName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void TFetchingInterval::ConstructResult() {
if (i->GetStart().Compare(Start) == std::partial_ordering::equivalent && !i->IsMergingStarted()) {
auto rb = i->GetBatch();
if (rb) {
Merger->AddPoolSource({}, rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
Merger->AddSource(rb, i->GetFilterStageData().GetNotAppliedEarlyFilter());
}
i->StartMerging();
}
Expand Down
70 changes: 25 additions & 45 deletions ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,37 @@ namespace NKikimr::NOlap::NIndexedReader {

void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
Y_ABORT_UNLESS(point);
Y_ABORT_UNLESS(point->IsSameSortingSchema(SortSchema));
AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point->IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);

SortHeap.emplace_back(TBatchIterator(*point));
std::push_heap(SortHeap.begin(), SortHeap.end());
SortHeap.Push(TBatchIterator(*point));
}

void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
Y_VERIFY_DEBUG(NArrow::IsSorted(batch, SortSchema));
if (!poolId) {
AddNewToHeap(poolId, batch, filter, true);
} else {
auto it = BatchPools.find(*poolId);
if (it == BatchPools.end()) {
it = BatchPools.emplace(*poolId, std::deque<TIteratorData>()).first;
}
it->second.emplace_back(batch, filter);
if (it->second.size() == 1) {
AddNewToHeap(poolId, batch, filter, true);
}
}
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
}
if (restoreHeap) {
std::push_heap(SortHeap.begin(), SortHeap.end());
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse));
}
}

void TMergePartialStream::RemoveControlPoint() {
Y_ABORT_UNLESS(ControlPoints == 1);
Y_ABORT_UNLESS(ControlPointEnriched());
Y_ABORT_UNLESS(-- ControlPoints == 0);
std::pop_heap(SortHeap.begin(), SortHeap.end());
SortHeap.pop_back();
Y_ABORT_UNLESS(SortHeap.Current().IsControlPoint());
SortHeap.RemoveTop();
}

void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
Expand All @@ -73,11 +58,11 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
bool cpReachedFlag = false;
while (SortHeap.size() && !cpReachedFlag) {
if (SortHeap.front().IsControlPoint()) {
while (SortHeap.Size() && !cpReachedFlag) {
if (SortHeap.Current().IsControlPoint()) {
RemoveControlPoint();
cpReachedFlag = true;
if (SortHeap.empty() || !includeFinish || SortHeap.front().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) {
if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(readTo) == std::partial_ordering::greater) {
return true;
}
}
Expand All @@ -92,7 +77,7 @@ bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSo

bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
while (SortHeap.size()) {
while (SortHeap.Size()) {
if (auto currentPosition = DrainCurrentPosition()) {
CheckSequenceInDebug(*currentPosition);
builder.AddRecord(*currentPosition);
Expand All @@ -102,19 +87,19 @@ bool TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
}

std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
Y_ABORT_UNLESS(SortHeap.size());
Y_ABORT_UNLESS(!SortHeap.front().IsControlPoint());
TSortableBatchPosition result = SortHeap.front().GetKeyColumns();
TSortableBatchPosition resultVersion = SortHeap.front().GetVersionColumns();
Y_ABORT_UNLESS(SortHeap.Size());
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
bool isFirst = true;
const bool deletedFlag = SortHeap.front().IsDeleted();
while (SortHeap.size() && (isFirst || result.Compare(SortHeap.front().GetKeyColumns()) == std::partial_ordering::equivalent)) {
auto& anotherIterator = SortHeap.front();
const bool deletedFlag = SortHeap.Current().IsDeleted();
while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) {
auto& anotherIterator = SortHeap.Current();
if (!isFirst) {
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", result.DebugJson());
}
NextInHeap(true);
SortHeap.Next();
isFirst = false;
}
if (deletedFlag) {
Expand All @@ -123,13 +108,13 @@ std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition(
return result;
}

std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::vector<TSortableBatchPosition>& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields, const bool includePositions)
std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields)
{
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
for (auto&& i : positions) {
NIndexedReader::TRecordBatchBuilder indexesBuilder(resultFields);
DrainCurrentTo(indexesBuilder, i, includePositions);
DrainCurrentTo(indexesBuilder, i.first, i.second);
result.emplace_back(indexesBuilder.Finalize());
if (result.back()->num_rows() == 0) {
result.pop_back();
Expand All @@ -147,11 +132,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
if (PoolId) {
result["pool_id"] = *PoolId;
} else {
result["pool_id"] = "absent";
}
result["key"] = KeyColumns.DebugJson();
return result;
}
Expand Down
Loading

0 comments on commit 306a5ee

Please sign in to comment.