Skip to content

Commit 91da41e

Browse files
committed
BE optimize right join
1 parent fde6c3a commit 91da41e

File tree

4 files changed

+216
-132
lines changed

4 files changed

+216
-132
lines changed

be/src/pipeline/dependency.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -659,20 +659,18 @@ struct HashJoinSharedState : public JoinSharedState {
659659
// Cached build ASOF column (computed once by executing expression on build_block)
660660
vectorized::ColumnPtr asof_build_col;
661661

662-
// ASOF RIGHT JOIN: column-based cache for efficient accumulation across probe batches
663-
// Per build row (1-indexed, 0 unused): whether a matching probe was found
662+
// ASOF RIGHT JOIN: per-task local caches submitted during finish_probing, then merged
663+
// Each probe task accumulates matches lock-free in its own AsofRightLocalCache.
664+
// The last task to enter finish_probing merges all caches and populates merged_* for output.
665+
std::vector<AsofRightLocalCache> asof_right_local_caches;
666+
std::mutex asof_right_submit_mutex; // only for submitting local caches (one lock per task)
667+
// Merged result (populated once by last task during finish_probing merge)
664668
std::vector<uint8_t> asof_right_has_match;
665-
// Per build row: index into probe cache for the best matching probe row (-1 = none)
666669
std::vector<int32_t> asof_right_cache_idx;
667-
// Append-only column of best probe ASOF values (for efficient compare_at, no Field)
668-
vectorized::MutableColumnPtr asof_right_best_probe_values;
669-
// Append-only cache of full probe row data (for output in finish_probing)
670670
std::unique_ptr<vectorized::MutableBlock> asof_right_probe_cache;
671671
size_t asof_right_output_idx = 0;
672-
// Barrier: counts down from num_probe_tasks. Only the last task (counter→0) outputs.
672+
// Barrier: counts down from num_probe_tasks. Only the last task (counter→0) merges+outputs.
673673
std::atomic<int32_t> asof_right_remaining_probers {0};
674-
// Mutex protecting concurrent writes to probe_cache/best_probe_values/has_match/cache_idx
675-
std::mutex asof_right_cache_mutex;
676674
};
677675

678676
struct PartitionedHashJoinSharedState

be/src/pipeline/exec/hashjoin_build_sink.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -378,16 +378,10 @@ Status HashJoinBuildSinkLocalState::build_asof_index(vectorized::Block& block) {
378378
}
379379
}
380380

381-
// Pre-initialize ASOF RIGHT JOIN state during build phase so it's ready
382-
// before any probe operator calls finish_probing (avoids race condition)
381+
// Pre-initialize ASOF RIGHT JOIN barrier counter during build phase.
382+
// Per-task local caches and merged result are created during probe/finish_probing.
383383
if (p._join_op == TJoinOp::ASOF_RIGHT_INNER_JOIN ||
384384
p._join_op == TJoinOp::ASOF_RIGHT_OUTER_JOIN) {
385-
// Use build_block rows (includes sentinel at index 0) to match _build_block indexing.
386-
// hash_table->size() returns next.size() which may be larger than build_block rows.
387-
size_t build_block_size =
388-
_shared_state->build_block ? _shared_state->build_block->rows() : 0;
389-
_shared_state->asof_right_has_match.resize(build_block_size, 0);
390-
_shared_state->asof_right_cache_idx.resize(build_block_size, -1);
391385
_shared_state->asof_right_remaining_probers.store(
392386
static_cast<int32_t>(
393387
std::max(_shared_state->hash_table_variant_vector.size(), (size_t)1)),

be/src/pipeline/exec/join/process_hash_table_probe.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ using MutableColumns = std::vector<vectorized::MutableColumnPtr>;
4141
using NullMap = vectorized::ColumnUInt8::Container;
4242
using ConstNullMapPtr = const NullMap*;
4343

44+
// Per-task local cache for ASOF RIGHT JOIN lock-free probe accumulation.
45+
// Each probe task appends qualifying probe entries (one per probe row per bucket).
46+
// Caches are merged by the last task in finish_probing: entries are grouped by bucket,
47+
// sorted by ASOF value, and binary-searched per build row for O(log P) matching.
48+
struct AsofRightLocalCache {
49+
std::vector<uint32_t> bucket_ids; // bucket_id for each appended probe entry
50+
MutableColumnPtr probe_asof_values; // probe ASOF column values (non-nullable) for sort/search
51+
std::unique_ptr<vectorized::MutableBlock> probe_cache; // full probe row data for output
52+
};
53+
4454
template <int JoinOpType>
4555
struct ProcessHashTableProbe {
4656
ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size);
@@ -146,6 +156,9 @@ struct ProcessHashTableProbe {
146156
// in the hash table.
147157
// -1 means null, 0 means false, 1 means true
148158
DorisVector<int8_t> mark_join_flags;
159+
160+
// ASOF RIGHT JOIN: per-task local cache for lock-free probe accumulation
161+
std::unique_ptr<AsofRightLocalCache> _asof_right_local_cache;
149162
};
150163

151164
} // namespace pipeline

0 commit comments

Comments
 (0)