Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
// under the License.

//! Functionality used both on logical and physical plans
//!
//! ## About `rehash`
//! Many helpers in this module take a `rehash: bool` argument.
//!
//! Conceptually, `hashes_buffer` is an **accumulator** of per-row hash values. When hashing a
//! *single* column, the hasher should **initialize** each row's hash. When hashing *multiple*
//! columns (e.g. for partitioning or joins), subsequent columns should **mix** their value hash
//! into the existing row hash using [`combine_hashes`].
//!
//! - `rehash = false`: initialize/overwrite the row hash for this column
//! - `rehash = true`: combine this column into an existing row hash
//!
//! [`create_hashes`] sets `rehash` to `false` for the first column and `true` for all following
//! columns, which avoids an unnecessary `combine_hashes` on the first column for performance.
//!
//! Note: some nested-type hashers currently always combine into `hashes_buffer` (see the note in
//! `hash_single_array`). This is intentional to preserve existing behavior.

use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
Expand Down Expand Up @@ -400,6 +417,8 @@ fn update_hash_for_dict_key(
dict_hashes: &[u64],
dict_values: &dyn Array,
idx: usize,
// `multi_col` is the historical name for what is now referred to as `rehash` elsewhere
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to fix the naming than add a comment trying to explain the discrepency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some changes, also did a minor refactor, there is another opportunity for the refactor to centralize the common “compute per-row nested hash → apply rehash (init vs combine) → hash null rows” logic in a shared helper, so each nested hasher mostly just computes a row_hash and delegates the buffer update/null handling. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be careful of this approach, as we'd want to pull as many checks outside the hotloop as we can; checking rehash each time we compute a hash is inefficient compared to checking once before the loop, as many other functions here do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, Thanks for the heads up. Does the changes look good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should take the approach of having a apply_row_hash function as it currently is, per my reasoning above

// in this module: if true, combine into an existing per-row hash; if false, initialize.
multi_col: bool,
) {
if dict_values.is_valid(idx) {
Expand All @@ -413,6 +432,10 @@ fn update_hash_for_dict_key(
}

/// Hash the values in a dictionary array
///
/// Note: `multi_col` is equivalent to `rehash` used by other hashing helpers:
/// - `multi_col = false`: initialize/overwrite `hashes_buffer` for this column
/// - `multi_col = true`: combine into existing per-row hashes
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
Expand Down Expand Up @@ -449,6 +472,14 @@ fn hash_struct_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// This nested-type hasher currently always combines its computed struct-row hash
// into `hashes_buffer` (equivalent to `rehash=true`). This preserves existing
// behavior for single-column hashing of nested types.
//
// If we were to add a `rehash` flag here and make `rehash=false` overwrite the
// buffer, it would change the numeric hash values produced for standalone
// Struct columns.

let nulls = array.nulls();
let row_len = array.len();

Expand Down Expand Up @@ -477,6 +508,9 @@ fn hash_map_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// This nested-type hasher currently always combines entry hashes into
// `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior.

let nulls = array.nulls();
let offsets = array.offsets();

Expand Down Expand Up @@ -515,6 +549,9 @@ fn hash_list_array<OffsetSize>(
where
OffsetSize: OffsetSizeTrait,
{
// This nested-type hasher currently always combines element hashes into
// `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior.

// In case values is sliced, hash only the bytes used by the offsets of this ListArray
let first_offset = array.value_offsets().first().cloned().unwrap_or_default();
let last_offset = array.value_offsets().last().cloned().unwrap_or_default();
Expand Down Expand Up @@ -566,6 +603,9 @@ fn hash_list_view_array<OffsetSize>(
where
OffsetSize: OffsetSizeTrait,
{
// This nested-type hasher currently always combines element hashes into
// `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior.

let values = array.values();
let offsets = array.value_offsets();
let sizes = array.value_sizes();
Expand Down Expand Up @@ -602,6 +642,9 @@ fn hash_union_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// This nested-type hasher currently always combines the chosen child hash into
// `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior.

use std::collections::HashMap;

let DataType::Union(union_fields, _mode) = array.data_type() else {
Expand Down Expand Up @@ -636,6 +679,9 @@ fn hash_fixed_list_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// This nested-type hasher currently always combines element hashes into
// `hashes_buffer` (equivalent to `rehash=true`) to preserve existing behavior.

let values = array.values();
let value_length = array.value_length() as usize;
let nulls = array.nulls();
Expand Down Expand Up @@ -760,6 +806,13 @@ fn hash_single_array(
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
// NOTE: The nested-type hashers below currently always *combine* their computed
// nested-value hash into `hashes_buffer` (i.e. they effectively behave as if
// `rehash=true`). This preserves existing hash values for these types.
//
// In other words, unlike primitive-like arrays, nested types do not currently
// "initialize" the buffer when they are the first/only column.
// Changing that would be a behavioral change for single-column hashing.
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
Expand Down Expand Up @@ -872,7 +925,10 @@ where
T: AsDynArray,
{
for (i, array) in arrays.into_iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
// `hashes_buffer` is a per-row accumulator.
//
// First column: initialize hashes (no need to call `combine_hashes`)
// Subsequent columns: combine with existing per-row hash
let rehash = i >= 1;
hash_single_array(array.as_dyn_array(), random_state, hashes_buffer, rehash)?;
}
Expand Down Expand Up @@ -1194,6 +1250,50 @@ mod tests {
assert_eq!(hashes[1], hashes[6]); // null vs empty list
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_multi_column_hash_with_list_array() -> Result<()> {
// Validate that nested types participate in multi-column hashing.
let data = vec![
Some(vec![Some(0), Some(1), Some(2)]),
None,
Some(vec![Some(3), None, Some(5)]),
Some(vec![Some(3), None, Some(5)]),
None,
Some(vec![Some(0), Some(1), Some(2)]),
Some(vec![]),
];
let list_array: ArrayRef =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(data));
let extra_col: ArrayRef =
Arc::new(Int32Array::from(vec![10, 11, 12, 12, 11, 10, 13]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut one_col_hashes = vec![0; list_array.len()];
create_hashes(
&[Arc::clone(&list_array)],
&random_state,
&mut one_col_hashes,
)?;

let mut two_col_hashes = vec![0; list_array.len()];
create_hashes(
&[Arc::clone(&list_array), Arc::clone(&extra_col)],
&random_state,
&mut two_col_hashes,
)?;

assert_ne!(one_col_hashes, two_col_hashes);

// Equalities from the underlying list content should still hold when adding a column
assert_eq!(two_col_hashes[0], two_col_hashes[5]);
assert_eq!(two_col_hashes[1], two_col_hashes[4]);
assert_eq!(two_col_hashes[2], two_col_hashes[3]);

Ok(())
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sliced_list_arrays() {
Expand Down