Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/bench-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ jobs:
with:
polarsignals_cloud_token: ${{ secrets.POLAR_SIGNALS_API_KEY }}
labels: "branch=${{ github.ref_name }};gh_run_id=${{ github.run_id }};benchmark=${{ matrix.benchmark.id }}"
parca_agent_version: "0.45.0"
project_uuid: "e5d846e1-b54c-46e7-9174-8bf055a3af56"
extra_args: "--off-cpu-threshold=0.001" # Personally tuned by @brancz
profiling_frequency: 199
extra_args: "--off-cpu-threshold=0.03" # Personally tuned by @brancz

- name: Run ${{ matrix.benchmark.name }} benchmark
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ jobs:
with:
polarsignals_cloud_token: ${{ secrets.POLAR_SIGNALS_API_KEY }}
labels: "branch=${{ github.ref_name }};gh_run_id=${{ github.run_id }};benchmark=${{ matrix.benchmark.id }}"
parca_agent_version: "0.45.0"
project_uuid: "e5d846e1-b54c-46e7-9174-8bf055a3af56"
extra_args: "--off-cpu-threshold=0.001" # Personally tuned by @brancz
profiling_frequency: 199
extra_args: "--off-cpu-threshold=0.03" # Personally tuned by @brancz

- name: Run ${{ matrix.benchmark.name }} benchmark
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/sql-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ jobs:
with:
polarsignals_cloud_token: ${{ secrets.POLAR_SIGNALS_API_KEY }}
labels: "branch=${{ github.ref_name }};gh_run_id=${{ github.run_id }};benchmark=${{ matrix.id }}"
parca_agent_version: "0.45.0"
project_uuid: "e5d846e1-b54c-46e7-9174-8bf055a3af56"
extra_args: "--off-cpu-threshold=0.001" # Personally tuned by @brancz
profiling_frequency: 199
extra_args: "--off-cpu-threshold=0.03" # Personally tuned by @brancz

- name: Run ${{ matrix.name }} benchmark
if: matrix.remote_storage == null || github.event.pull_request.head.repo.fork == true
Expand Down
63 changes: 52 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ cudarc = { version = "0.18.2", features = [
# be specified if a later toolkit version is installed on the system.
"cuda-12080",
] }
custom-labels = "0.4"
dashmap = "6.1.0"
datafusion = { version = "52", default-features = false, features = ["sql"] }
datafusion-catalog = { version = "52" }
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/datafusion-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ publish = false
anyhow = { workspace = true }
arrow-ipc.workspace = true
clap = { workspace = true, features = ["derive"] }
custom-labels = { workspace = true }
datafusion = { workspace = true, features = [
"parquet",
"datetime_expressions",
Expand All @@ -38,9 +39,11 @@ url = { workspace = true }
vortex-bench = { workspace = true }
vortex-cuda = { workspace = true, optional = true }
vortex-datafusion = { workspace = true }
vortex-metrics = { workspace = true }

[build-dependencies]
get_dir = { workspace = true }
custom-labels = { workspace = true }

[features]
cuda = ["dep:vortex-cuda"]
Expand Down
7 changes: 7 additions & 0 deletions benchmarks/datafusion-bench/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

fn main() {
#[cfg(not(target_os = "macos"))]
custom_labels::build::emit_build_instructions();
}
1 change: 1 addition & 0 deletions benchmarks/datafusion-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

pub mod metrics;
pub mod tracer;

use std::sync::Arc;

Expand Down
57 changes: 36 additions & 21 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use std::time::Instant;

use clap::Parser;
use clap::value_parser;
use custom_labels::asynchronous::Label;
use datafusion::arrow::array::RecordBatch;
use datafusion::common::runtime::set_join_set_tracer;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::listing::ListingTable;
use datafusion::datasource::listing::ListingTableConfig;
Expand All @@ -16,6 +18,8 @@ use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::prelude::SessionContext;
use datafusion_bench::format_to_df_format;
use datafusion_bench::metrics::MetricsSetExt;
use datafusion_bench::tracer::get_static_tracer;
use datafusion_bench::tracer::set_labels;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::collect;
use futures::StreamExt;
Expand Down Expand Up @@ -103,6 +107,7 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let opts = Opts::from(args.options);

set_join_set_tracer(get_static_tracer())?;
setup_logging_and_tracing(args.verbose, args.tracing)?;

let benchmark = create_benchmark(args.benchmark, &opts)?;
Expand Down Expand Up @@ -137,6 +142,8 @@ async fn main() -> anyhow::Result<()> {
}
}

let benchmark_name = benchmark.dataset().to_string();

let mut runner = SqlBenchmarkRunner::new(
&*benchmark,
Engine::DataFusion,
Expand Down Expand Up @@ -168,26 +175,32 @@ async fn main() -> anyhow::Result<()> {
|query_idx, (session, format), query| {
let plans = Arc::clone(&collected_plans);

Box::pin(async move {
let timer = Instant::now();
let (batches, plan) = execute_query(session, query).await?;
let time = timer.elapsed();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();

// Store plan for metrics (only store once per query/format combination)
if show_metrics {
let mut plans_mut = plans.lock();
// Only store if we don't already have this query/format combo
if !plans_mut
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
let labelset = set_labels(benchmark_name.clone(), query_idx, *format);

Box::pin(
async move {
let timer = Instant::now();
let (batches, plan) =
execute_query(session, query).with_current_labels().await?;
let time = timer.elapsed();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();

// Store plan for metrics (only store once per query/format combination)
if show_metrics {
let mut plans_mut = plans.lock();
// Only store if we don't already have this query/format combo
if !plans_mut
.iter()
.any(|(idx, f, _)| *idx == query_idx && *f == *format)
{
plans_mut.push((query_idx, *format, plan.clone()));
}
}
}

anyhow::Ok((row_count, Some(time), plan))
})
anyhow::Ok((row_count, Some(time), plan))
}
.with_labelset(labelset),
)
},
)
.await?;
Expand Down Expand Up @@ -301,11 +314,13 @@ pub async fn execute_query(
ctx: &SessionContext,
query: &str,
) -> anyhow::Result<(Vec<RecordBatch>, Arc<dyn ExecutionPlan>)> {
let df = ctx.sql(query).await?;
let df = ctx.sql(query).with_current_labels().await?;

let task_ctx = Arc::new(df.task_ctx());
let plan = df.create_physical_plan().await?;
let result = collect(plan.clone(), task_ctx).await?;
let plan = df.create_physical_plan().with_current_labels().await?;
let result = collect(plan.clone(), task_ctx)
.with_current_labels()
.await?;

Ok((result, plan))
}
Expand Down
Loading
Loading