Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions .claude/skills/ec2-cluster-provision/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
name: ec2-cluster-provision
description: uses the code present in this repository for provision an EC2 cluster for benchmarking purposes
---

This project uses a remote benchmarks EC2 cluster constructed with AWS CDK located at `benchmarks/cdk`.

There's a package.json file in `benchmarks/cdk/package.json` with relevant commands about deploying.

All the commands in this skill need to be prefixed with whatever the user declared in `./claude/settings.local.json`
in the `aws-commands-prefix` key, typically for providing the commands with the correct permissions.
(e.g., `$aws-commands-prefix npm run cdk deploy` or `$aws-commands-prexfix aws ssm ...`)

Running `npm run cdk deploy` will provision the cluster with the resources specified in `benchmarks/cdk/lib/`.
This takes a while typically (~5 mins). If the user data of the EC2 machines was changed, and you want those changes
to take effect you will need to prepend the deployment command with `USER_DATA_CAUSES_REPLACEMENT=true`.

Once the deployment is complete, the list of instance IDs will be printed to stdout.

It's usually necessary to verify that everything was deployed correctly, and it's running fine. For that
it's necessary to perform the following steps for the following engines:

## Distributed DataFusion

1. Port forward the 9000 port in a background terminal:
`aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"`
2. Issue a command to /info to see what was deployed:
`curl http://localhost:9000/info | jq .`
3. If everything's fine, you should see the list of listening workers and the build time

## Trino

1. Port forward the 8080 port in a background terminal:
`aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=8080,localPortNumber=8080"`
2. Issue a command to /v1/node to see what nodes are available for listening:
`curl -s -H "X-Trino-User: admin" http://localhost:8080/v1/node | jq .`
`curl -s -H "X-Trino-User: admin" http://localhost:8080/v1/info | jq .`
3. Make sure that the response of the above is consistent with what is supposed to be deployed by
`benchmarks/cdk/lib/trino.ts`

## Spark

1. Port forward the 9003 port in a background terminal (this is a custom Python server `benchmarks/cdk/bin/spark_http.py`):
`aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9003,localPortNumber=9003"`
2. You can issue curl queries to `http://localhost:9003/health` and `http://localhost:9003/query` to double-check that
everything is consistent with what's expected from `benchmarks/cdk/lib/spark.ts`

## Ballista

1. Port forward the 9002 port in a background terminal:
`aws ssm start-session --target $INSTANCE_ID --document-name AWS-StartPortForwardingSession --parameters "portNumber=9002,localPortNumber=9002"`
2. Verify that there are in fact the expected workers connected to the scheduler. For this, all workers need to
have the appropriate instance IP configured in their launch command, which might not have been done by default,
because this happens in a post cloud-init command that might not run because of timeout.
3. Verify with systemctl in each remote machine that all workers are running successfully. One thing that happens
very often is that `--external-host` is incorrectly set to localhost rather than the actual scheduler EC2 private IP

Remember that for running port forward commands in the background, they take like 5 secs until the
"waiting for connections" message appears. Until then, the port is still not forwarded.

If at some point you need to run a command in all machines and get its output, you can do it
with `npm run send-command your custom command`
85 changes: 85 additions & 0 deletions .claude/skills/remote-benchmark/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
name: remote-benchmark
description: deploys the code to a remote EC2 cluster with the commands available in the package.json, port-forwards
a machine port, and runs benchmarks against it.
---

This project uses a remote benchmarks EC2 cluster constructed with AWS CDK located at `benchmarks/cdk`.

There's a package.json file in `benchmarks/cdk/package.json` with relevant commands about benchmarking.

All the commands in this skill need to be prefixed with whatever the user declared in `./claude/settings.local.json`
in the `aws-commands-prefix` key, typically for providing the commands with the correct permissions.
(e.g., `$aws-commands-prefix npm run fast-deploy` or `$aws-commands-prexfix aws ssm ...`)

You can assume that the cluster is already there, and the only thing necessary is to execute the `npm run fast-deploy`
command for deploying the current code to the EC2 cluster. Remember that all npm commands need to be run from the
`benchmarks/cdk` folder.

The `npm run fast-deploy` command will compile the current code and deploy it to the EC2 machines. If it fails,
prompt the user to fix it. It will output several EC2 instance IDs: pick the first one, that's the one we will port
forward locally in order to issue queries to it.

Once the deployment is completed, port forward the first instance id to the local port 9000:

```shell
aws ssm start-session --target i-0000000000000 --document-name AWS-StartPortForwardingSession --parameters "portNumber=9000,localPortNumber=9000"
```

Remember to run that in the background, as that will block in place.

Once the port is correctly listening locally (you will see a "waiting for connections" message), it's fine to start
the benchmarks.

You can start the benchmarks with `npm run datafusion-bench`, you can learn how this command works by running:

```shell
$ npm run datafusion-bench -- --help

Usage: datafusion-bench [options]

Options:
--dataset <string> Dataset to run queries on
-i, --iterations <number> Number of iterations (default: "3")
--bytes-processed-per-partition <number> How many bytes each partition is expected to process (default: "134217728")
--batch-size <number> Standard Batch coalescing size (number of rows) (default: "32768")
--shuffle-batch-size <number> Shuffle batch coalescing size (number of rows) (default: "32768")
--children-isolator-unions <number> Use children isolator unions (default: "true")
--broadcast-joins <boolean> Use broadcast joins (default: "false")
--collect-metrics <boolean> Propagates metric collection (default: "true")
--compression <string> Compression algo to use within workers (lz4, zstd, none) (default: "lz4")
--queries <string> Specific queries to run
--debug <boolean> Print the generated plans to stdout
-h, --help display help for command
```

The --dataset command is mandatory, and its value can be any of the folder names in `benchmarks/data`, for example:
clickbench_0-100, tpcds_sf1, tpch_sf1, tpch_sf10 or tpch_sf100.

Also, the --queries argument can be used for executing just a partial subset of queries, for example:
```shell
--queries q1,q2,q3
```

When benchmarking a very specific feature, it's convenient to choose wisely a relevant query and just execute that one.

The user provided the following arguments: $ARGUMENTS

parse those and make sure you parse them correctly, for example `tpch_sf100 q1,q2,q4` means
`--dataset tpch_sf100 --queries q1,q2,q4`. Note that the user might also give natural language instructions in the
arguments, be smart while parsing those.

### analyzing results

results for individual queries will be dumped in the respective dataset folders, for example:

`benchmarks/data/tpch_sf10/.results-remote/datafusion-distributed-main/q1.json`
or
`benchmarks/data/tpch_sf1/.results-remote/datafusion-distributed-new-branch/q2.json`

You can inspect the results and the plan by reading the JSONs. Tip: use jq for printing nice results.

As the results of previous branches are already stored in disk, they usually can be analyzed without re-running them
again, that can be done by either:
- Just looking at the latencies and plans in the output folders.
- Running the `npm run datafusion-bench -- compare --dataset tpch_sfX datafusion-distributed-<base branch> datafusion-distributed-<compare branch>`
5 changes: 4 additions & 1 deletion benchmarks/cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ cargo zigbuild -p datafusion-distributed-benchmarks --release --bin worker --tar

## CDK deploy

This only needs to be done once, after that, `npm run fast-deploy` can be used for fast deploying new code in
seconds.

```shell
npm run cdk deploy
```
Expand Down Expand Up @@ -119,5 +122,5 @@ Several arguments can be passed for running the benchmarks against different sca
for example:

```shell
npm run datafusion-bench -- --datset tpch_sf10 --files-per-task 4 --query 7
npm run datafusion-bench -- --datset tpch_sf10
```
2 changes: 2 additions & 0 deletions benchmarks/cdk/ballista/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
Cargo.lock
35 changes: 35 additions & 0 deletions benchmarks/cdk/ballista/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "ballista-benchmarks"
version = "0.1.0"
edition = "2024"

# Empty workspace table to detach from root workspace
[workspace]

[[bin]]
name = "ballista-http"
path = "src/ballista_http.rs"

[[bin]]
name = "ballista-scheduler"
path = "src/ballista_scheduler.rs"

[[bin]]
name = "ballista-executor"
path = "src/ballista_executor.rs"

[dependencies]
ballista = "51.0.0"
ballista-core = "51.0.0"
ballista-executor = "51.0.0"
ballista-scheduler = "51.0.0"
object_store = { version = "0.12", features = ["aws"] }
tokio = { version = "1", features = ["full"] }
url = "2"
clap = { version = "4", features = ["derive"] }
structopt = "0.3"
axum = "0.8"
serde = { version = "1", features = ["derive"] }
futures = "0.3"
log = "0.4"
env_logger = "0.11"
36 changes: 36 additions & 0 deletions benchmarks/cdk/ballista/src/ballista_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
use ballista::datafusion::prelude::SessionConfig;
use ballista_executor::config::Config;
use ballista_executor::executor_process::{ExecutorProcessConfig, start_executor_process};
use clap::Parser;
use object_store::aws::AmazonS3Builder;
use std::env;
use std::sync::Arc;
use url::Url;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let opt = Config::parse();

let mut config: ExecutorProcessConfig = opt.try_into()?;

let bucket = env::var("BUCKET").unwrap_or("datafusion-distributed-benchmarks".to_string());
let s3_url = Url::parse(&format!("s3://{bucket}"))?;

let s3 = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(s3_url.host().unwrap().to_string())
.build()?,
);
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

config.override_runtime_producer = Some(Arc::new(
move |_: &SessionConfig| -> ballista::datafusion::common::Result<Arc<RuntimeEnv>> {
Ok(runtime_env.clone())
},
));

start_executor_process(Arc::new(config)).await?;
Ok(())
}
124 changes: 124 additions & 0 deletions benchmarks/cdk/ballista/src/ballista_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use axum::{Json, Router, extract::Query, http::StatusCode, routing::get};
use ballista::datafusion::common::instant::Instant;
use ballista::datafusion::execution::SessionStateBuilder;
use ballista::datafusion::execution::runtime_env::RuntimeEnv;
use ballista::datafusion::physical_plan::displayable;
use ballista::datafusion::physical_plan::execute_stream;
use ballista::datafusion::prelude::SessionConfig;
use ballista::datafusion::prelude::SessionContext;
use ballista::prelude::*;
use futures::{StreamExt, TryFutureExt};
use log::{error, info};
use object_store::aws::AmazonS3Builder;
use serde::Serialize;
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Display;
use std::sync::Arc;
use structopt::StructOpt;
use url::Url;

#[derive(Serialize)]
struct QueryResult {
plan: String,
count: usize,
}

#[derive(Debug, StructOpt, Clone)]
#[structopt(about = "worker spawn command")]
struct Cmd {
/// The bucket name.
#[structopt(long, default_value = "datafusion-distributed-benchmarks")]
bucket: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_default_env()
.init();

let cmd = Cmd::from_args();

const LISTENER_ADDR: &str = "0.0.0.0:9002";

info!("Starting HTTP listener on {LISTENER_ADDR}...");
let listener = tokio::net::TcpListener::bind(LISTENER_ADDR).await?;

// Register S3 object store
let s3_url = Url::parse(&format!("s3://{}", cmd.bucket))?;

info!("Building shared SessionContext for the whole lifetime of the HTTP listener...");
let s3 = Arc::new(
AmazonS3Builder::from_env()
.with_bucket_name(s3_url.host().unwrap().to_string())
.build()?,
);
let runtime_env = Arc::new(RuntimeEnv::default());
runtime_env.register_object_store(&s3_url, s3);

let config = SessionConfig::new_with_ballista().with_ballista_job_name("Benchmarks");

let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.with_runtime_env(Arc::clone(&runtime_env))
.build();
let ctx = SessionContext::remote_with_state("df://localhost:50050", state).await?;

let http_server = axum::serve(
listener,
Router::new().route(
"/",
get(move |Query(params): Query<HashMap<String, String>>| {
let ctx = ctx.clone();

async move {
let sql = params.get("sql").ok_or(err("Missing 'sql' parameter"))?;

let mut df_opt = None;
for sql in sql.split(";") {
if sql.trim().is_empty() {
continue;
}
let df = ctx.sql(sql).await.map_err(err)?;
df_opt = Some(df);
}
let Some(df) = df_opt else {
return Err(err("Empty 'sql' parameter"));
};

let start = Instant::now();

info!("Executing query...");
let physical = df.create_physical_plan().await.map_err(err)?;
let mut stream =
execute_stream(physical.clone(), ctx.task_ctx()).map_err(err)?;
let mut count = 0;
while let Some(batch) = stream.next().await {
count += batch.map_err(err)?.num_rows();
info!("Gathered {count} rows, query still in progress..")
}
let plan = displayable(physical.as_ref()).indent(true).to_string();
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
info!("Returned {count} rows in {ms} ms");

Ok::<_, (StatusCode, String)>(Json(QueryResult { count, plan }))
}
.inspect_err(|(_, msg)| {
error!("Error executing query: {msg}");
})
}),
),
);

info!("Started listener HTTP server in {LISTENER_ADDR}");
http_server.await?;
Ok(())
}

fn err(s: impl Display) -> (StatusCode, String) {
(StatusCode::INTERNAL_SERVER_ERROR, s.to_string())
}
Loading