Skip to content

Commit

Permalink
fix wrong result for single null safe eq
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt committed Nov 18, 2024
1 parent c21a390 commit fbe9b32
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 92 deletions.
1 change: 0 additions & 1 deletion be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,6 @@ struct HashJoinSharedState : public JoinSharedState {
size_t build_exprs_size = 0;
std::shared_ptr<vectorized::Block> build_block;
std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
bool probe_ignore_null = false;
};

struct PartitionedHashJoinSharedState
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,13 +560,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*

if (eos) {
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
if (local_state._shared_state->build_block &&
(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) {
local_state._shared_state->probe_ignore_null = true;
}
local_state._dependency->set_ready_to_read();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ struct ProcessHashTableBuild {
hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _batch_size,
*has_null_key);

if (_build_raw_ptrs.size() == 1 && null_map) {
_build_raw_ptrs[0]->assume_mutable()->replace_column_null_data(null_map->data());
}

hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() : nullptr, true, true,
hash_table_ctx.hash_table->get_bucket_size());
Expand Down
27 changes: 4 additions & 23 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinProbeOperatorX>();
_shared_state->probe_ignore_null = p._probe_ignore_null;
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < _probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _probe_expr_ctxs[i]));
Expand Down Expand Up @@ -285,12 +284,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
if (local_state._probe_index < local_state._probe_block.rows()) {
DCHECK(local_state._has_set_need_null_map_for_probe);
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_judge_null) {
[&](auto&& arg, auto&& process_hashtable_ctx) {
using HashTableProbeType = std::decay_t<decltype(process_hashtable_ctx)>;
if constexpr (!std::is_same_v<HashTableProbeType, std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) {
st = process_hashtable_ctx.template process<need_judge_null>(
st = process_hashtable_ctx.template process(
arg,
local_state._null_map_column
? &local_state._null_map_column->get_data()
Expand All @@ -306,9 +305,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}
},
local_state._shared_state->hash_table_variants->method_variant,
*local_state._process_hashtable_ctx_variants,
vectorized::make_bool_variant(local_state._need_null_map_for_probe &&
local_state._shared_state->probe_ignore_null));
*local_state._process_hashtable_ctx_variants);
} else if (local_state._probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
Expand Down Expand Up @@ -493,29 +490,13 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
const bool probe_dispose_null =
_match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN || _join_op == TJoinOp::LEFT_ANTI_JOIN ||
_join_op == TJoinOp::LEFT_SEMI_JOIN;
const std::vector<TEqJoinCondition>& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts;
std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
size_t conjuncts_index = 0;
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, ctx));
_probe_expr_ctxs.push_back(ctx);
bool null_aware = eq_join_conjunct.__isset.opcode &&
eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL &&
(eq_join_conjunct.right.nodes[0].is_nullable ||
eq_join_conjunct.left.nodes[0].is_nullable);
probe_not_ignore_null[conjuncts_index] =
null_aware ||
(_probe_expr_ctxs.back()->root()->is_nullable() && probe_dispose_null);
conjuncts_index++;
}
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
_probe_ignore_null |= !probe_not_ignore_null[i];
}

if (tnode.hash_join_node.__isset.other_join_conjuncts &&
!tnode.hash_join_node.other_join_conjuncts.empty()) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca

// probe expr
vectorized::VExprContextSPtrs _probe_expr_ctxs;
bool _probe_ignore_null = false;

vectorized::DataTypes _right_table_data_types;
vectorized::DataTypes _left_table_data_types;
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "vec/columns/column.h"
#include "vec/columns/columns_number.h"
#include "vec/common/arena.h"
#include "vec/common/hash_table/join_hash_table.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -55,7 +56,7 @@ struct ProcessHashTableProbe {
int last_probe_index, bool all_match_one,
bool have_other_join_conjunct);

template <bool need_judge_null, typename HashTableType>
template <typename HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block, vectorized::Block* output_block,
uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct);
Expand All @@ -66,9 +67,9 @@ struct ProcessHashTableProbe {
// TODO: opt the visited here to reduce the size of hash table
template <bool need_judge_null, typename HashTableType, bool with_other_conjuncts,
bool is_mark_join>
Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
Status do_process(HashTableType& hash_table_ctx, vectorized::ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block, vectorized::Block* output_block,
uint32_t probe_rows);
uint32_t probe_rows, JoinProbeMethod method);
// In the presence of other join conjunct, the process of join become more complicated.
// each matching join column need to be processed by other join conjunct. so the struct of mutable block
// and output block may be different
Expand Down
52 changes: 26 additions & 26 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "util/simd/bits.h"
#include "vec/columns/column_filter_helper.h"
#include "vec/columns/column_nullable.h"
#include "vec/common/hash_table/join_hash_table.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -173,6 +174,10 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType>::_init_probe_sid
if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
hash_table_ctx.reset();
if (_parent->_probe_columns.size() == 1 && null_map) {
_parent->_probe_columns[0]->assume_mutable()->replace_column_null_data(null_map);
}

hash_table_ctx.init_serialized_keys(_parent->_probe_columns, probe_rows, null_map, true,
false, hash_table_ctx.hash_table->get_bucket_size());
hash_table_ctx.hash_table->pre_build_idxs(hash_table_ctx.bucket_nums,
Expand All @@ -193,34 +198,24 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
vectorized::ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
uint32_t probe_rows) {
uint32_t probe_rows, JoinProbeMethod method) {
if (_right_col_len && !_build_block) {
return Status::InternalError("build block is nullptr");
}

auto& probe_index = _parent->_probe_index;
auto& build_index = _parent->_build_index;
auto last_probe_index = probe_index;

{
SCOPED_TIMER(_init_probe_side_timer);
_init_probe_side<HashTableType>(
hash_table_ctx, probe_rows, with_other_conjuncts,
null_map ? null_map->data() : nullptr,
need_judge_null &&
(JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ||
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
(is_mark_join && JoinOpType != doris::TJoinOp::RIGHT_SEMI_JOIN)));
_init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts,
null_map ? null_map->data() : nullptr, need_judge_null);
}

auto& mcol = mutable_block.mutable_columns();
const bool has_mark_join_conjunct = !_parent->_mark_join_conjuncts.empty();

uint32_t current_offset = 0;
if constexpr ((JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
if (method == JoinProbeMethod::FIND_NULL_AWARE_WITH_OTHER_CONJUNCTS) {
SCOPED_TIMER(_search_hashtable_timer);

/// If `_build_index_for_null_probe_key` is not zero, it means we are in progress of handling probe null key.
Expand Down Expand Up @@ -256,11 +251,10 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
} else {
SCOPED_TIMER(_search_hashtable_timer);
auto [new_probe_idx, new_build_idx, new_current_offset] =
hash_table_ctx.hash_table->template find_batch<JoinOpType, with_other_conjuncts,
is_mark_join, need_judge_null>(
hash_table_ctx.hash_table->template find_batch<JoinOpType, need_judge_null>(
hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index,
build_index, cast_set<int32_t>(probe_rows), _probe_indexs.data(),
_probe_visited, _build_indexs.data(), has_mark_join_conjunct);
_probe_visited, _build_indexs.data(), method);
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
Expand Down Expand Up @@ -674,22 +668,32 @@ Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
}

template <int JoinOpType>
template <bool need_judge_null, typename HashTableType>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
vectorized::ConstNullMapPtr null_map,
vectorized::MutableBlock& mutable_block,
vectorized::Block* output_block,
uint32_t probe_rows, bool is_mark_join,
bool have_other_join_conjunct) {
JoinProbeMethod method = hash_table_ctx.hash_table->template get_method<JoinOpType>(
!_parent->_mark_join_conjuncts.empty(), is_mark_join, have_other_join_conjunct);
// need_keep_null_bucket_idx is true means hash_table_ctx.bucket_nums may contains 'bucket_size' value
bool need_keep_null_bucket_idx =
(null_map && (method == JoinProbeMethod::FIND_NULL_AWARE_WITH_OTHER_CONJUNCTS ||
method == JoinProbeMethod::FIND_BATCH_LEFT_SEMI_ANTI ||
method == JoinProbeMethod::FIND_BATCH_CONJUNCT ||
method == JoinProbeMethod::FIND_BATCH_CONJUNCT_MATCH_ONE));

Status res;
std::visit(
[&](auto is_mark_join, auto have_other_join_conjunct) {
[&](auto is_mark_join, auto have_other_join_conjunct, auto need_judge_null) {
res = do_process<need_judge_null, HashTableType, have_other_join_conjunct,
is_mark_join>(hash_table_ctx, null_map, mutable_block,
output_block, probe_rows);
output_block, probe_rows, method);
},
vectorized::make_bool_variant(is_mark_join),
vectorized::make_bool_variant(have_other_join_conjunct));
vectorized::make_bool_variant(have_other_join_conjunct),
vectorized::make_bool_variant(need_keep_null_bucket_idx));
return res;
}

Expand All @@ -702,11 +706,7 @@ struct ExtractType<T(U)> {
};

#define INSTANTIATION(JoinOpType, T) \
template Status ProcessHashTableProbe<JoinOpType>::process<false, ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \
uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \
template Status ProcessHashTableProbe<JoinOpType>::process<true, ExtractType<void(T)>::Type>( \
template Status ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \
vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \
uint32_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \
Expand Down
Loading

0 comments on commit fbe9b32

Please sign in to comment.