From 55514917372960e384ccffa9e34924bc804f9bd7 Mon Sep 17 00:00:00 2001 From: jacktengg Date: Mon, 18 Nov 2024 17:38:24 +0800 Subject: [PATCH 1/2] reserve bock in scanner --- be/src/pipeline/pipeline_task.cpp | 43 ++++++++++--------- be/src/runtime/runtime_state.h | 4 ++ be/src/vec/exec/scan/scanner_context.cpp | 20 +++++++-- be/src/vec/exec/scan/scanner_context.h | 2 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 9 +++- be/src/vec/spill/spill_stream.cpp | 4 +- .../org/apache/doris/qe/SessionVariable.java | 10 +++++ gensrc/thrift/PaloInternalService.thrift | 3 ++ 8 files changed, 68 insertions(+), 27 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index e6801121358522..69a1c490911196 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -426,7 +426,7 @@ Status PipelineTask::execute(bool* eos) { _root->reset_reserve_mem_size(_state); auto workload_group = _state->get_query_ctx()->workload_group(); - if (workload_group && reserve_size > 0) { + if (workload_group && _state->enable_reserve_memory() && reserve_size > 0) { auto st = thread_context()->try_reserve_memory(reserve_size); COUNTER_UPDATE(_memory_reserve_times, 1); @@ -458,25 +458,28 @@ Status PipelineTask::execute(bool* eos) { DEFER_RELEASE_RESERVED(); COUNTER_UPDATE(_memory_reserve_times, 1); const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); - status = thread_context()->try_reserve_memory(sink_reserve_size); - if (!status.ok()) { - COUNTER_UPDATE(_memory_reserve_failed_times, 1); - LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " - << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) - << ", sink name: " << _sink->get_name() - << ", node id: " << _sink->node_id() << ", task id: " << _state->task_id() - << ", failed: " << status.to_string() - << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); - _state->get_query_ctx()->update_paused_reason(status); - _state->get_query_ctx()->set_low_memory_mode(); - ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( - _state->get_query_ctx()->shared_from_this(), sink_reserve_size); - DCHECK_EQ(_pending_block.get(), nullptr); - _pending_block = std::move(_block); - _block = vectorized::Block::create_unique(_pending_block->clone_empty()); - _eos = *eos; - *eos = false; - continue; + if (_state->enable_reserve_memory()) { + status = thread_context()->try_reserve_memory(sink_reserve_size); + if (!status.ok()) { + COUNTER_UPDATE(_memory_reserve_failed_times, 1); + LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: " + << PrettyPrinter::print(sink_reserve_size, TUnit::BYTES) + << ", sink name: " << _sink->get_name() + << ", node id: " << _sink->node_id() + << ", task id: " << _state->task_id() + << ", failed: " << status.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + _state->get_query_ctx()->update_paused_reason(status); + _state->get_query_ctx()->set_low_memory_mode(); + ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( + _state->get_query_ctx()->shared_from_this(), sink_reserve_size); + DCHECK_EQ(_pending_block.get(), nullptr); + _pending_block = std::move(_block); + _block = vectorized::Block::create_unique(_pending_block->clone_empty()); + _eos = *eos; + *eos = false; + continue; + } } // Define a lambda function to catch sink exception, because sink will check diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 31e1d378526fea..b06d1ad033cbe5 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -561,6 +561,10 @@ class RuntimeState { std::shared_ptr* producer_filter); bool is_nereids() const; + bool enable_reserve_memory() const { + return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory; + } + bool enable_join_spill() const { return (_query_options.__isset.enable_force_spill && _query_options.enable_force_spill) || (_query_options.__isset.enable_join_spill && _query_options.enable_join_spill); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index af92080c777c64..997f4ac2efc1d4 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -222,7 +222,7 @@ Status ScannerContext::init() { return Status::OK(); } -vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { +vectorized::BlockUPtr ScannerContext::get_free_block(size_t block_avg_bytes, bool force) { vectorized::BlockUPtr block = nullptr; if (_free_blocks.try_dequeue(block)) { DCHECK(block->mem_reuse()); @@ -232,9 +232,21 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { // The caller of get_free_block will increase the memory usage update_peak_memory_usage(-block->allocated_bytes()); } else if (_block_memory_usage < _max_bytes_in_queue || force) { - _newly_create_free_blocks_num->update(1); - block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0, - true /*ignore invalid slots*/); + Status status; + if (!force && _state->enable_reserve_memory()) { + status = thread_context()->try_reserve_memory(block_avg_bytes); + if (!status.ok()) { + LOG(INFO) << "query: " << print_id(_query_id) << ", scanner try to reserve: " + << PrettyPrinter::print(block_avg_bytes, TUnit::BYTES) + << ", failed: " << status.to_string() + << ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str(); + } + } + if (status.ok()) { + _newly_create_free_blocks_num->update(1); + block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0, + true /*ignore invalid slots*/); + } } return block; } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index b81834fd7b74b0..d63008bd24a59a 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -121,7 +121,7 @@ class ScannerContext : public std::enable_shared_from_this, } Status init(); - vectorized::BlockUPtr get_free_block(bool force); + vectorized::BlockUPtr get_free_block(size_t block_avg_bytes, bool force); void return_free_block(vectorized::BlockUPtr block); inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 0dae893b5c602c..2596d3d2cae0f1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -277,6 +277,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, bool first_read = true; // If the first block is full, then it is true. Or the first block + second block > batch_size bool has_first_full_block = false; + size_t block_avg_bytes = ctx->batch_size(); // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && @@ -291,7 +292,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, config::doris_scanner_max_run_time_ms * 1e6) { break; } - BlockUPtr free_block = ctx->get_free_block(first_read); + BlockUPtr free_block = ctx->get_free_block(block_avg_bytes, first_read); if (free_block == nullptr) { break; } @@ -338,6 +339,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, ctx->inc_block_usage(free_block->allocated_bytes()); scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes); } + if (scan_task->cached_blocks.back().first->rows() > 0) { + block_avg_bytes = (scan_task->cached_blocks.back().first->allocated_bytes() + + scan_task->cached_blocks.back().first->rows() - 1) / + scan_task->cached_blocks.back().first->rows() * + ctx->batch_size(); + } } // end for while if (UNLIKELY(!status.ok())) { diff --git a/be/src/vec/spill/spill_stream.cpp b/be/src/vec/spill/spill_stream.cpp index a27916a87a3c89..fafebb4e62ae41 100644 --- a/be/src/vec/spill/spill_stream.cpp +++ b/be/src/vec/spill/spill_stream.cpp @@ -116,7 +116,9 @@ Status SpillStream::prepare() { } SpillReaderUPtr SpillStream::create_separate_reader() const { - return std::make_unique(stream_id_, writer_->get_file_path()); + return std::make_unique( + state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_, + writer_->get_file_path()); } const TUniqueId& SpillStream::query_id() const { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 52e78618c05ee9..8489d16f866352 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -552,6 +552,7 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits"; public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; + public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory"; public static final String MIN_REVOCABLE_MEM = "min_revocable_mem"; public static final String ENABLE_JOIN_SPILL = "enable_join_spill"; public static final String ENABLE_SORT_SPILL = "enable_sort_spill"; @@ -2132,6 +2133,14 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true) public int maxFetchRemoteTabletCount = 512; + @VariableMgr.VarAttr( + name = ENABLE_RESERVE_MEMORY, + description = {"控制是否启用分配内存前先reverve memory的功能。默认为 false。", + "Controls whether to enable reserve memory before allocating memory. " + + "The default value is false."}, + needForward = true, fuzzy = true) + public boolean enableReserveMemory = false; + @VariableMgr.VarAttr( name = ENABLE_JOIN_SPILL, description = {"控制是否启用join算子落盘。默认为 false。", @@ -3863,6 +3872,7 @@ public TQueryOptions toThrift() { tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); tResult.setSkipBadTablet(skipBadTablet); tResult.setDisableFileCache(disableFileCache); + tResult.setEnableReserveMemory(enableReserveMemory); tResult.setEnableJoinSpill(enableJoinSpill); tResult.setEnableSortSpill(enableSortSpill); tResult.setEnableAggSpill(enableAggSpill); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 37c7da3d702cca..92066532879019 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -356,6 +356,9 @@ struct TQueryOptions { 139: optional i32 query_slot_count = 0; 140: optional bool enable_auto_create_when_overwrite = false; + + 141: optional bool enable_reserve_memory = false + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. From b908a224e54de661165f65f159f0c6a94f36e103 Mon Sep 17 00:00:00 2001 From: jacktengg Date: Mon, 18 Nov 2024 23:12:09 +0800 Subject: [PATCH 2/2] fix --- be/src/pipeline/exec/operator.h | 5 ----- be/src/vec/exec/scan/scanner_scheduler.cpp | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index e7c668d2cdbc59..4c431470435469 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -645,11 +645,6 @@ class DataSinkOperatorXBase : public OperatorBase { [[nodiscard]] std::string get_name() const override { return _name; } - [[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, vectorized::Block* block, - bool eos) { - return true; - } - virtual bool should_dry_run(RuntimeState* state) { return false; } [[nodiscard]] virtual bool count_down_destination() { return true; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2596d3d2cae0f1..3bd3ccb4c0eee4 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -292,6 +292,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, config::doris_scanner_max_run_time_ms * 1e6) { break; } + DEFER_RELEASE_RESERVED(); BlockUPtr free_block = ctx->get_free_block(block_avg_bytes, first_read); if (free_block == nullptr) { break;