Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 50 additions & 68 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,31 @@ void hybrid_scan_reader_impl::reset_column_selection()
_is_payload_columns_selected = false;
}

std::pair<named_to_reference_converter, std::vector<cudf::data_type>>
hybrid_scan_reader_impl::prepare_filter_and_output_types(parquet_reader_options const& options)
{
CUDF_EXPECTS(options.get_filter().has_value(), "Empty input filter expression encountered");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);

return {std::move(expr_conv), std::move(output_dtypes)};
}

void hybrid_scan_reader_impl::prepare_materialization(read_columns_mode read_columns_mode,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open to suggestions for names of these two helpers.

std::size_t num_sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
reset_internal_state();
initialize_options(options, num_sources, stream, mr);
select_columns(read_columns_mode, options);
}

std::vector<std::vector<cudf::size_type>>
hybrid_scan_reader_impl::filter_row_groups_with_byte_range(
cudf::host_span<std::vector<size_type> const> row_group_indices,
Expand All @@ -230,13 +255,7 @@ std::vector<std::vector<size_type>> hybrid_scan_reader_impl::filter_row_groups_w
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);
auto [expr_conv, output_dtypes] = prepare_filter_and_output_types(options);

return _extended_metadata->filter_row_groups_with_stats(row_group_indices,
output_dtypes,
Expand All @@ -251,12 +270,7 @@ hybrid_scan_reader_impl::secondary_filters_byte_ranges(
parquet_reader_options const& options)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Filter expression must not be empty");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);
auto [expr_conv, output_dtypes] = prepare_filter_and_output_types(options);

auto const bloom_filter_bytes =
_extended_metadata->get_bloom_filter_bytes(row_group_indices,
Expand All @@ -280,13 +294,7 @@ hybrid_scan_reader_impl::filter_row_groups_with_dictionary_pages(
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);
auto [expr_conv, output_dtypes] = prepare_filter_and_output_types(options);

// Collect literal and operator pairs for each input column with an (in)equality predicate
auto const [literals, operators] =
Expand Down Expand Up @@ -344,13 +352,7 @@ std::vector<std::vector<size_type>> hybrid_scan_reader_impl::filter_row_groups_w
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);
auto [expr_conv, output_dtypes] = prepare_filter_and_output_types(options);

return _extended_metadata->filter_row_groups_with_bloom_filters(
bloom_filter_data,
Expand Down Expand Up @@ -380,13 +382,7 @@ std::unique_ptr<cudf::column> hybrid_scan_reader_impl::build_row_mask_with_page_
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
auto expr_conv = build_converted_expression(options);
auto output_dtypes = get_output_types(_output_buffers_template);
auto [expr_conv, output_dtypes] = prepare_filter_and_output_types(options);

return _extended_metadata->build_row_mask_with_page_index_stats(
row_group_indices,
Expand Down Expand Up @@ -490,17 +486,14 @@ table_with_metadata hybrid_scan_reader_impl::materialize_filter_columns(
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");
CUDF_EXPECTS(options.get_filter().has_value(), "Empty input filter expression encountered");
CUDF_EXPECTS(not row_mask.is_empty(),
"Row mask must be non-empty when materializing filter columns");

reset_internal_state();

initialize_options(row_group_indices, options, stream, mr);

select_columns(read_columns_mode::FILTER_COLUMNS, options);
prepare_materialization(
read_columns_mode::FILTER_COLUMNS, row_group_indices.size(), options, stream, mr);

// Convert the input expression (must be done after column selection)
// Convert the input expression (must be done after prepare_materialization)
_expr_conv = build_converted_expression(options);

auto data_page_mask = thrust::host_vector<bool>{};
Expand All @@ -527,11 +520,8 @@ table_with_metadata hybrid_scan_reader_impl::materialize_payload_columns(
CUDF_EXPECTS(row_mask.null_count() == 0,
"Row mask must not have any nulls when materializing payload column");

reset_internal_state();

initialize_options(row_group_indices, options, stream, mr);

select_columns(read_columns_mode::PAYLOAD_COLUMNS, options);
prepare_materialization(
read_columns_mode::PAYLOAD_COLUMNS, row_group_indices.size(), options, stream, mr);

auto data_page_mask = thrust::host_vector<bool>{};
if (not row_mask.is_empty() and mask_data_pages == use_data_page_mask::YES) {
Expand All @@ -553,13 +543,10 @@ table_with_metadata hybrid_scan_reader_impl::materialize_all_columns(
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");

reset_internal_state();

initialize_options(row_group_indices, options, stream, mr);

select_columns(read_columns_mode::ALL_COLUMNS, options);
prepare_materialization(
read_columns_mode::ALL_COLUMNS, row_group_indices.size(), options, stream, mr);

// Convert the input expression (must be done after column selection)
// Convert the input expression (must be done after prepare_materialization)
_expr_conv = build_converted_expression(options);

prepare_data(read_mode::READ_ALL, row_group_indices, column_chunk_data, {});
Expand All @@ -580,19 +567,17 @@ void hybrid_scan_reader_impl::setup_chunking_for_filter_columns(
rmm::device_async_resource_ref mr)
{
CUDF_EXPECTS(not row_group_indices.empty(), "Empty input row group indices encountered");
CUDF_EXPECTS(options.get_filter().has_value(), "Encountered empty converted filter expression");
CUDF_EXPECTS(options.get_filter().has_value(), "Empty input filter expression encountered");
CUDF_EXPECTS(not row_mask.is_empty(),
"Row mask must be non-empty when setting up chunking for filter columns");

reset_internal_state();
prepare_materialization(
read_columns_mode::FILTER_COLUMNS, row_group_indices.size(), options, stream, mr);

initialize_options(row_group_indices, options, stream, mr);
_input_pass_read_limit = pass_read_limit;
_output_chunk_read_limit = chunk_read_limit;

select_columns(read_columns_mode::FILTER_COLUMNS, options);

// Convert the input expression (must be done after column selection)
// Convert the input expression (must be done after prepare_materialization)
_expr_conv = build_converted_expression(options);

auto data_page_mask = thrust::host_vector<bool>{};
Expand Down Expand Up @@ -638,14 +623,12 @@ void hybrid_scan_reader_impl::setup_chunking_for_payload_columns(
CUDF_EXPECTS(row_mask.null_count() == 0,
"Row mask must not have any nulls when materializing payload column");

reset_internal_state();
prepare_materialization(
read_columns_mode::PAYLOAD_COLUMNS, row_group_indices.size(), options, stream, mr);

initialize_options(row_group_indices, options, stream, mr);
_input_pass_read_limit = pass_read_limit;
_output_chunk_read_limit = chunk_read_limit;

select_columns(read_columns_mode::PAYLOAD_COLUMNS, options);

auto data_page_mask = thrust::host_vector<bool>{};
if (not row_mask.is_empty() and mask_data_pages == use_data_page_mask::YES) {
data_page_mask = _extended_metadata->compute_data_page_mask(
Expand Down Expand Up @@ -706,11 +689,10 @@ void hybrid_scan_reader_impl::reset_internal_state()
_mr = cudf::get_current_device_resource_ref();
}

void hybrid_scan_reader_impl::initialize_options(
cudf::host_span<std::vector<size_type> const> row_group_indices,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
void hybrid_scan_reader_impl::initialize_options(parquet_reader_options const& options,
std::size_t num_sources,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();
Expand All @@ -722,7 +704,7 @@ void hybrid_scan_reader_impl::initialize_options(
// Binary columns can be read as binary or strings
_reader_column_schema = options.get_column_schema();

_num_sources = row_group_indices.size();
_num_sources = num_sources;

// CUDA stream to use for internal operations
_stream = stream;
Expand Down
29 changes: 26 additions & 3 deletions cpp/src/io/parquet/experimental/hybrid_scan_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,12 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
/**
* @brief Initialize the necessary options related internal variables for use later on
*
* @param row_group_indices Row group indices to read
* @param options Reader options
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
*/
void initialize_options(cudf::host_span<std::vector<size_type> const> row_group_indices,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only ever need row_group_indices.size() in here so just directly take in num_sources

parquet_reader_options const& options,
void initialize_options(parquet_reader_options const& options,
std::size_t num_sources,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

Expand Down Expand Up @@ -333,6 +332,30 @@ class hybrid_scan_reader_impl : public parquet::detail::reader_impl {
get_input_column_chunk_byte_ranges(
cudf::host_span<std::vector<size_type> const> row_group_indices) const;

/**
* @brief Helper to prepare converted filter expression and output column data types
*
* @param options Parquet reader options
* @return A pair of a converted filter expression and a vector of ouptut column data types
*/
std::pair<named_to_reference_converter, std::vector<cudf::data_type>>
prepare_filter_and_output_types(parquet_reader_options const& options);

/**
* @brief Helper to prepare column materialization
*
* @param read_columns_mode Read mode indicating if we are reading filter or payload columns
* @param num_sources Number of input sources
* @param options Parquet reader options
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the device memory for the output columns
*/
void prepare_materialization(read_columns_mode read_columns_mode,
std::size_t num_sources,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr);

/**
* @brief Perform the necessary data preprocessing for parsing file later on
*
Expand Down
Loading