Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reserve bock in scanner #44185

Open
wants to merge 2 commits into
base: spill_and_reserve
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,6 @@ class DataSinkOperatorXBase : public OperatorBase {

[[nodiscard]] std::string get_name() const override { return _name; }

[[nodiscard]] virtual bool try_reserve_memory(RuntimeState* state, vectorized::Block* block,
bool eos) {
return true;
}

virtual bool should_dry_run(RuntimeState* state) { return false; }

[[nodiscard]] virtual bool count_down_destination() { return true; }
Expand Down
43 changes: 23 additions & 20 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ Status PipelineTask::execute(bool* eos) {
_root->reset_reserve_mem_size(_state);

auto workload_group = _state->get_query_ctx()->workload_group();
if (workload_group && reserve_size > 0) {
if (workload_group && _state->enable_reserve_memory() && reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);

COUNTER_UPDATE(_memory_reserve_times, 1);
Expand Down Expand Up @@ -458,25 +458,28 @@ Status PipelineTask::execute(bool* eos) {
DEFER_RELEASE_RESERVED();
COUNTER_UPDATE(_memory_reserve_times, 1);
const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos);
status = thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: "
<< PrettyPrinter::print(sink_reserve_size, TUnit::BYTES)
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << ", task id: " << _state->task_id()
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
_state->get_query_ctx()->set_low_memory_mode();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size);
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
_eos = *eos;
*eos = false;
continue;
if (_state->enable_reserve_memory()) {
status = thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
LOG(INFO) << "query: " << print_id(query_id) << ", try to reserve: "
<< PrettyPrinter::print(sink_reserve_size, TUnit::BYTES)
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id()
<< ", task id: " << _state->task_id()
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
_state->get_query_ctx()->set_low_memory_mode();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(), sink_reserve_size);
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block = vectorized::Block::create_unique(_pending_block->clone_empty());
_eos = *eos;
*eos = false;
continue;
}
}

// Define a lambda function to catch sink exception, because sink will check
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ class RuntimeState {
std::shared_ptr<IRuntimeFilter>* producer_filter);
bool is_nereids() const;

bool enable_reserve_memory() const {
return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory;
}

bool enable_join_spill() const {
return (_query_options.__isset.enable_force_spill && _query_options.enable_force_spill) ||
(_query_options.__isset.enable_join_spill && _query_options.enable_join_spill);
Expand Down
20 changes: 16 additions & 4 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Status ScannerContext::init() {
return Status::OK();
}

vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr ScannerContext::get_free_block(size_t block_avg_bytes, bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
Expand All @@ -232,9 +232,21 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
// The caller of get_free_block will increase the memory usage
update_peak_memory_usage(-block->allocated_bytes());
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
Status status;
if (!force && _state->enable_reserve_memory()) {
status = thread_context()->try_reserve_memory(block_avg_bytes);
if (!status.ok()) {
jacktengg marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << "query: " << print_id(_query_id) << ", scanner try to reserve: "
<< PrettyPrinter::print(block_avg_bytes, TUnit::BYTES)
<< ", failed: " << status.to_string()
<< ", debug info: " << GlobalMemoryArbitrator::process_mem_log_str();
}
}
if (status.ok()) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
}
}
return block;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
}
Status init();

vectorized::BlockUPtr get_free_block(bool force);
vectorized::BlockUPtr get_free_block(size_t block_avg_bytes, bool force);
void return_free_block(vectorized::BlockUPtr block);
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }

Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
bool first_read = true;
// If the first block is full, then it is true. Or the first block + second block > batch_size
bool has_first_full_block = false;
size_t block_avg_bytes = ctx->batch_size();

// During low memory mode, every scan task will return at most 2 block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
Expand All @@ -291,7 +292,8 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
config::doris_scanner_max_run_time_ms * 1e6) {
break;
}
BlockUPtr free_block = ctx->get_free_block(first_read);
DEFER_RELEASE_RESERVED();
BlockUPtr free_block = ctx->get_free_block(block_avg_bytes, first_read);
if (free_block == nullptr) {
break;
}
Expand Down Expand Up @@ -338,6 +340,12 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (scan_task->cached_blocks.back().first->rows() > 0) {
block_avg_bytes = (scan_task->cached_blocks.back().first->allocated_bytes() +
scan_task->cached_blocks.back().first->rows() - 1) /
scan_task->cached_blocks.back().first->rows() *
ctx->batch_size();
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/spill/spill_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ Status SpillStream::prepare() {
}

SpillReaderUPtr SpillStream::create_separate_reader() const {
return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
return std::make_unique<SpillReader>(
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
writer_->get_file_path());
}

const TUniqueId& SpillStream::query_id() const {
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold";
public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits";
public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit";
public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
Expand Down Expand Up @@ -2132,6 +2133,14 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
@VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
public int maxFetchRemoteTabletCount = 512;

@VariableMgr.VarAttr(
name = ENABLE_RESERVE_MEMORY,
description = {"控制是否启用分配内存前先reverve memory的功能。默认为 false。",
"Controls whether to enable reserve memory before allocating memory. "
+ "The default value is false."},
needForward = true, fuzzy = true)
public boolean enableReserveMemory = false;

@VariableMgr.VarAttr(
name = ENABLE_JOIN_SPILL,
description = {"控制是否启用join算子落盘。默认为 false。",
Expand Down Expand Up @@ -3863,6 +3872,7 @@ public TQueryOptions toThrift() {
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setSkipBadTablet(skipBadTablet);
tResult.setDisableFileCache(disableFileCache);
tResult.setEnableReserveMemory(enableReserveMemory);
tResult.setEnableJoinSpill(enableJoinSpill);
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,9 @@ struct TQueryOptions {
139: optional i32 query_slot_count = 0;

140: optional bool enable_auto_create_when_overwrite = false;

141: optional bool enable_reserve_memory = false

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Loading