Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions benchmarks/datafusion-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ publish = false
[dependencies]
anyhow = { workspace = true }
arrow-ipc.workspace = true
arrow-schema = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true, features = ["derive"] }
datafusion = { workspace = true, features = [
"parquet",
"datetime_expressions",
"nested_expressions",
"unicode_expressions",
] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-adapter = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-pruning = { workspace = true }
futures.workspace = true
itertools.workspace = true
object_store = { workspace = true, features = ["aws", "gcp"] }
Expand All @@ -35,9 +45,12 @@ opentelemetry_sdk.workspace = true
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }
tracing = { workspace = true }
vortex = { workspace = true }
vortex-bench = { workspace = true }
vortex-cuda = { workspace = true, optional = true }
vortex-datafusion = { workspace = true }
vortex-utils = { workspace = true }

[build-dependencies]
get_dir = { workspace = true }
Expand Down
147 changes: 147 additions & 0 deletions benchmarks/datafusion-bench/src/cuda/format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! CUDA-accelerated Vortex file format for DataFusion.

use std::any::Any;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DFResult;
use datafusion_common::Statistics;
use datafusion_common::internal_datafusion_err;
use datafusion_common::not_impl_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_plan::ExecutionPlan;
use object_store::ObjectMeta;
use object_store::ObjectStore;
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::session::VortexSession;
use vortex_datafusion::VortexFormat;

use super::source::CudaVortexSource;

/// CUDA-accelerated Vortex file format for DataFusion.
///
/// This wraps the standard `VortexFormat` but uses `CudaVortexSource` for execution.
pub struct CudaVortexFormat {
/// The underlying VortexFormat for schema inference and statistics.
inner: VortexFormat,
session: VortexSession,
}

impl Debug for CudaVortexFormat {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CudaVortexFormat").finish()
}
}

impl CudaVortexFormat {
/// Create a new CUDA-accelerated Vortex format.
pub fn new(session: VortexSession) -> Self {
Self {
inner: VortexFormat::new(session.clone()),
session,
}
}
}

#[async_trait]
impl FileFormat for CudaVortexFormat {
fn as_any(&self) -> &dyn Any {
self
}

fn compression_type(&self) -> Option<FileCompressionType> {
None
}

fn get_ext(&self) -> String {
VORTEX_FILE_EXTENSION.to_string()
}

fn get_ext_with_compression(
&self,
file_compression_type: &FileCompressionType,
) -> DFResult<String> {
match file_compression_type.get_variant() {
CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
_ => Err(DataFusionError::Internal(
"Vortex does not support file level compression.".into(),
)),
}
}

async fn infer_schema(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> DFResult<SchemaRef> {
// Delegate to inner VortexFormat
self.inner.infer_schema(state, store, objects).await
}

async fn infer_stats(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> DFResult<Statistics> {
// Delegate to inner VortexFormat
self.inner
.infer_stats(state, store, table_schema, object)
.await
}

async fn create_physical_plan(
&self,
state: &dyn Session,
file_scan_config: FileScanConfig,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Get the source from the config and replace with our CUDA source
let mut source = file_scan_config
.file_source()
.as_any()
.downcast_ref::<CudaVortexSource>()
.cloned()
.ok_or_else(|| internal_datafusion_err!("Expected CudaVortexSource"))?;

source = source
.with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());

let conf = FileScanConfigBuilder::from(file_scan_config)
.with_source(Arc::new(source))
.build();

Ok(DataSourceExec::from_data_source(conf))
}

async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &dyn Session,
_conf: datafusion_datasource::file_sink_config::FileSinkConfig,
_order_requirements: Option<LexRequirement>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
not_impl_err!("CudaVortexFormat does not support writing")
}

fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(CudaVortexSource::new(table_schema, self.session.clone()))
}
}
15 changes: 15 additions & 0 deletions benchmarks/datafusion-bench/src/cuda/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! CUDA-accelerated execution for the DataFusion benchmark.
//!
//! This module provides CUDA-accelerated projection execution for TPC-H benchmarks.
//! It duplicates the opener logic from vortex-datafusion but uses CUDA execution
//! instead of CPU execution.

mod format;
mod opener;
mod source;

pub use format::CudaVortexFormat;
pub use source::CudaVortexSource;
Loading
Loading