Skip to content

Commit 332fd06

Browse files
authored
feat[gpu]: cuda runend decoding (#6275)
Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 3c2421f commit 332fd06

File tree

8 files changed

+647
-1
lines changed

8 files changed

+647
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/runend/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,8 +409,8 @@ impl RunEndArray {
409409
#[inline]
410410
pub fn into_parts(self) -> RunEndArrayParts {
411411
RunEndArrayParts {
412-
values: self.values,
413412
ends: self.ends,
413+
values: self.values,
414414
}
415415
}
416416
}

vortex-cuda/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ vortex-fastlanes = { workspace = true }
4141
vortex-io = { workspace = true }
4242
vortex-mask = { workspace = true }
4343
vortex-nvcomp = { path = "nvcomp" }
44+
vortex-runend = { workspace = true }
45+
vortex-scalar = { workspace = true }
4446
vortex-sequence = { workspace = true }
4547
vortex-session = { workspace = true }
4648
vortex-utils = { workspace = true }
@@ -75,3 +77,7 @@ harness = false
7577
[[bench]]
7678
name = "filter_cuda"
7779
harness = false
80+
81+
[[bench]]
82+
name = "runend_cuda"
83+
harness = false

vortex-cuda/benches/runend_cuda.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! CUDA benchmarks for run-end decoding.
5+
6+
#![allow(clippy::unwrap_used)]
7+
#![allow(clippy::cast_possible_truncation)]
8+
9+
use std::mem::size_of;
10+
use std::time::Duration;
11+
12+
use criterion::BenchmarkId;
13+
use criterion::Criterion;
14+
use criterion::Throughput;
15+
use cudarc::driver::DeviceRepr;
16+
use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC;
17+
use futures::executor::block_on;
18+
use vortex_array::IntoArray;
19+
use vortex_array::ToCanonical;
20+
use vortex_array::arrays::PrimitiveArray;
21+
use vortex_array::validity::Validity;
22+
use vortex_buffer::Buffer;
23+
use vortex_cuda::CudaBufferExt;
24+
use vortex_cuda::CudaExecutionCtx;
25+
use vortex_cuda::CudaSession;
26+
use vortex_cuda_macros::cuda_available;
27+
use vortex_cuda_macros::cuda_not_available;
28+
use vortex_dtype::NativePType;
29+
use vortex_dtype::PType;
30+
use vortex_error::VortexExpect;
31+
use vortex_runend::RunEndArray;
32+
use vortex_session::VortexSession;
33+
34+
/// Creates a run-end encoded array with the specified output length and average run length.
35+
fn make_runend_array_typed<T>(output_len: usize, avg_run_len: usize) -> RunEndArray
36+
where
37+
T: NativePType + From<u8>,
38+
{
39+
let num_runs = output_len.div_ceil(avg_run_len);
40+
let mut ends: Vec<u64> = Vec::with_capacity(num_runs);
41+
let mut values: Vec<T> = Vec::with_capacity(num_runs);
42+
43+
let mut pos: usize = 0;
44+
for i in 0..num_runs {
45+
pos += avg_run_len;
46+
if pos > output_len {
47+
pos = output_len;
48+
}
49+
ends.push(pos as u64);
50+
values.push(<T as From<u8>>::from((i % 256) as u8));
51+
}
52+
53+
let ends_array = PrimitiveArray::new(Buffer::from(ends), Validity::NonNullable).into_array();
54+
let values_array =
55+
PrimitiveArray::new(Buffer::from(values), Validity::NonNullable).into_array();
56+
RunEndArray::new(ends_array, values_array)
57+
}
58+
59+
/// Launches runend decode kernel and returns elapsed GPU time.
60+
fn launch_runend_kernel_timed_typed<T>(
61+
runend_array: &RunEndArray,
62+
cuda_ctx: &mut CudaExecutionCtx,
63+
) -> vortex_error::VortexResult<Duration>
64+
where
65+
T: NativePType + DeviceRepr,
66+
{
67+
let ends_prim = runend_array.ends().to_primitive();
68+
let values_prim = runend_array.values().to_primitive();
69+
70+
let output_len = runend_array.len();
71+
let num_runs = ends_prim.len();
72+
let offset = runend_array.offset();
73+
74+
let ends_device = block_on(
75+
cuda_ctx
76+
.copy_to_device(ends_prim.as_slice::<u64>().to_vec())
77+
.unwrap(),
78+
)
79+
.vortex_expect("failed to copy ends to device");
80+
81+
let values_device = block_on(
82+
cuda_ctx
83+
.copy_to_device(values_prim.as_slice::<T>().to_vec())
84+
.unwrap(),
85+
)
86+
.vortex_expect("failed to copy values to device");
87+
88+
let output_device = block_on(
89+
cuda_ctx
90+
.copy_to_device(vec![T::default(); output_len])
91+
.unwrap(),
92+
)
93+
.vortex_expect("failed to allocate output buffer");
94+
95+
let ends_view = ends_device
96+
.cuda_view::<u64>()
97+
.vortex_expect("failed to get ends view");
98+
let values_view = values_device
99+
.cuda_view::<T>()
100+
.vortex_expect("failed to get values view");
101+
let output_view = output_device
102+
.cuda_view::<T>()
103+
.vortex_expect("failed to get output view");
104+
105+
let events = vortex_cuda::launch_cuda_kernel!(
106+
execution_ctx: cuda_ctx,
107+
module: "runend",
108+
ptypes: &[T::PTYPE, PType::U64],
109+
launch_args: [ends_view, num_runs, values_view, offset, output_len, output_view],
110+
event_recording: CU_EVENT_BLOCKING_SYNC,
111+
array_len: output_len
112+
);
113+
114+
events.duration()
115+
}
116+
117+
/// Benchmark run-end decoding for a specific type with varying run lengths
118+
fn benchmark_runend_typed<T>(c: &mut Criterion, type_name: &str)
119+
where
120+
T: NativePType + DeviceRepr + From<u8>,
121+
{
122+
let mut group = c.benchmark_group("runend_cuda");
123+
group.sample_size(10);
124+
125+
for (len, len_str) in [
126+
(1_000_000usize, "1M"),
127+
(10_000_000usize, "10M"),
128+
(100_000_000usize, "100M"),
129+
] {
130+
group.throughput(Throughput::Bytes((len * size_of::<T>()) as u64));
131+
132+
for run_len in [10, 100, 1000, 10000, 100000] {
133+
let runend_array = make_runend_array_typed::<T>(len, run_len);
134+
135+
group.bench_with_input(
136+
BenchmarkId::new("runend", format!("{len_str}_{type_name}_runlen_{run_len}")),
137+
&runend_array,
138+
|b, runend_array| {
139+
b.iter_custom(|iters| {
140+
let mut cuda_ctx =
141+
CudaSession::create_execution_ctx(&VortexSession::empty())
142+
.vortex_expect("failed to create execution context");
143+
144+
let mut total_time = Duration::ZERO;
145+
146+
for _ in 0..iters {
147+
let kernel_time =
148+
launch_runend_kernel_timed_typed::<T>(runend_array, &mut cuda_ctx)
149+
.vortex_expect("kernel launch failed");
150+
total_time += kernel_time;
151+
}
152+
153+
total_time
154+
});
155+
},
156+
);
157+
}
158+
}
159+
160+
group.finish();
161+
}
162+
163+
/// Benchmark run-end decoding with varying run lengths for all types
164+
fn benchmark_runend(c: &mut Criterion) {
165+
benchmark_runend_typed::<i32>(c, "i32");
166+
}
167+
168+
criterion::criterion_group!(benches, benchmark_runend);
169+
170+
#[cuda_available]
171+
criterion::criterion_main!(benches);
172+
173+
#[cuda_not_available]
174+
fn main() {}

vortex-cuda/kernels/src/runend.cu

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#include <cuda.h>
5+
#include <cuda_fp16.h>
6+
#include <cuda_runtime.h>
7+
#include <stdint.h>
8+
#include <thrust/binary_search.h>
9+
#include <thrust/execution_policy.h>
10+
11+
#include "config.cuh"
12+
#include "types.cuh"
13+
14+
constexpr uint32_t MAX_CACHED_RUNS = 512;
15+
16+
/// Binary search for the first element strictly greater than `value`.
17+
///
18+
/// Uses `thrust::upper_bound` with sequential execution policy. `thrust::seq`
19+
/// is chosen as the binary search runs on a single GPU thread. This is
20+
/// preferred over `thrust::device` as this would spawn an additional kernel
21+
/// launch.
22+
/// See: https://nvidia.github.io/cccl/thrust/api/group__binary__search_1gac85cc9ea00f4bdd8f80ad25fff16741d.html#thrust-upper-bound
23+
///
24+
/// Returns the index of the first element that is greater than `value`, or
25+
/// `len` if no such element exists.
26+
template<typename T>
27+
__device__ __forceinline__ uint64_t upper_bound(const T *data, uint64_t len, uint64_t value) {
28+
29+
auto it = thrust::upper_bound(thrust::seq, data, data + len, value);
30+
return it - data;
31+
}
32+
33+
34+
// Decodes run-end encoded data on the GPU.
35+
//
36+
// Run-end stores data as pairs of (value, end_position) where each run contains
37+
// repeated values from the previous end position to the current end position.
38+
//
39+
// Steps:
40+
// 1. Each CUDA block processes a contiguous chunk of output elements (elements_per_block).
41+
//
42+
// 2. Block Initialization (Thread 0 only):
43+
// - Compute the global position range [block_start + offset, block_end + offset) for this block
44+
// - Use binary search (upper_bound) to find the first and last runs that overlap this range
45+
// - Store the run range in shared memory (block_first_run, block_num_runs)
46+
//
47+
// 3. Shared Memory Caching:
48+
// - If the number of runs for this block fits in shared memory (< MAX_CACHED_RUNS),
49+
// all threads cooperatively load the relevant ends[] and values[] into shared memory
50+
// - This is to reduce global memory access during decoding
51+
//
52+
// 4. Decoding:
53+
// a) Cached path: Each thread decodes multiple elements using a forward scan.
54+
// Since thread positions are strided (idx += blockDim.x), and positions are monotonically
55+
// increasing across iterations, we maintain a current_run index that only moves forward.
56+
//
57+
// b) Fallback path: If too many runs span this block (exceeds MAX_CACHED_RUNS),
58+
// fall back to binary search in global memory for each element.
59+
//
60+
// TODO(0ax1): Investigate whether there are faster solutions.
61+
template<typename ValueT, typename EndsT>
62+
__device__ void runend_decode_kernel(
63+
const EndsT *const __restrict ends,
64+
uint64_t num_runs,
65+
const ValueT *const __restrict values,
66+
uint64_t offset,
67+
uint64_t output_len,
68+
ValueT *const __restrict output
69+
) {
70+
__shared__ EndsT shared_ends[MAX_CACHED_RUNS];
71+
__shared__ ValueT shared_values[MAX_CACHED_RUNS];
72+
__shared__ uint64_t block_first_run;
73+
__shared__ uint32_t block_num_runs;
74+
75+
const uint32_t elements_per_block = blockDim.x * ELEMENTS_PER_THREAD;
76+
const uint64_t block_start = static_cast<uint64_t>(blockIdx.x) * elements_per_block;
77+
const uint64_t block_end = min(block_start + elements_per_block, output_len);
78+
79+
if (block_start >= output_len) return;
80+
81+
// Thread 0 finds the run range for this block.
82+
if (threadIdx.x == 0) {
83+
uint64_t first_pos = block_start + offset;
84+
uint64_t last_pos = (block_end - 1) + offset;
85+
86+
uint64_t first_run = upper_bound(ends, num_runs, first_pos);
87+
uint64_t last_run = upper_bound(ends, num_runs, last_pos);
88+
89+
block_first_run = first_run;
90+
block_num_runs = static_cast<uint32_t>(min(last_run - first_run + 1, static_cast<uint64_t>(MAX_CACHED_RUNS)));
91+
}
92+
__syncthreads();
93+
94+
// Cooperatively load ends and values into shared memory.
95+
if (block_num_runs < MAX_CACHED_RUNS) {
96+
for (uint32_t i = threadIdx.x; i < block_num_runs; i += blockDim.x) {
97+
shared_ends[i] = ends[block_first_run + i];
98+
shared_values[i] = values[block_first_run + i];
99+
}
100+
}
101+
__syncthreads();
102+
103+
if (block_num_runs < MAX_CACHED_RUNS) {
104+
uint32_t current_run = 0;
105+
for (uint64_t idx = block_start + threadIdx.x; idx < block_end; idx += blockDim.x) {
106+
uint64_t pos = idx + offset;
107+
108+
// Scan forward to find the run containing this position
109+
while (current_run < block_num_runs && static_cast<uint64_t>(shared_ends[current_run]) <= pos) {
110+
current_run++;
111+
}
112+
113+
output[idx] = shared_values[current_run < block_num_runs ? current_run : block_num_runs - 1];
114+
}
115+
} else {
116+
// Fallback for blocks with very short runs. Search the full `num_runs`
117+
// array. `block_num_runs` is clamped to `MAX_CACHED_RUNS`.
118+
for (uint64_t idx = block_start + threadIdx.x; idx < block_end; idx += blockDim.x) {
119+
uint64_t pos = idx + offset;
120+
uint64_t run_idx = upper_bound(ends, num_runs, pos);
121+
if (run_idx >= num_runs) run_idx = num_runs - 1;
122+
output[idx] = values[run_idx];
123+
}
124+
}
125+
}
126+
127+
#define GENERATE_RUNEND_KERNEL(value_suffix, ValueType, ends_suffix, EndsType) \
128+
extern "C" __global__ void runend_##value_suffix##_##ends_suffix( \
129+
const EndsType *const __restrict ends, \
130+
uint64_t num_runs, \
131+
const ValueType *const __restrict values, \
132+
uint64_t offset, \
133+
uint64_t output_len, \
134+
ValueType *const __restrict output \
135+
) { \
136+
runend_decode_kernel<ValueType, EndsType>(ends, num_runs, values, offset, output_len, output); \
137+
}
138+
139+
#define GENERATE_RUNEND_KERNELS_FOR_VALUE(value_suffix, ValueType) \
140+
GENERATE_RUNEND_KERNEL(value_suffix, ValueType, u8, uint8_t) \
141+
GENERATE_RUNEND_KERNEL(value_suffix, ValueType, u16, uint16_t) \
142+
GENERATE_RUNEND_KERNEL(value_suffix, ValueType, u32, uint32_t) \
143+
GENERATE_RUNEND_KERNEL(value_suffix, ValueType, u64, uint64_t)
144+
145+
GENERATE_RUNEND_KERNELS_FOR_VALUE(u8, uint8_t)
146+
GENERATE_RUNEND_KERNELS_FOR_VALUE(i8, int8_t)
147+
GENERATE_RUNEND_KERNELS_FOR_VALUE(u16, uint16_t)
148+
GENERATE_RUNEND_KERNELS_FOR_VALUE(i16, int16_t)
149+
GENERATE_RUNEND_KERNELS_FOR_VALUE(u32, uint32_t)
150+
GENERATE_RUNEND_KERNELS_FOR_VALUE(i32, int32_t)
151+
GENERATE_RUNEND_KERNELS_FOR_VALUE(u64, uint64_t)
152+
GENERATE_RUNEND_KERNELS_FOR_VALUE(i64, int64_t)
153+
GENERATE_RUNEND_KERNELS_FOR_VALUE(f16, __half)
154+
GENERATE_RUNEND_KERNELS_FOR_VALUE(f32, float)
155+
GENERATE_RUNEND_KERNELS_FOR_VALUE(f64, double)

vortex-cuda/src/kernel/encodings/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ mod alp;
55
mod bitpacked;
66
mod decimal_byte_parts;
77
mod for_;
8+
mod runend;
89
mod sequence;
910
mod zigzag;
1011
mod zstd;
@@ -13,6 +14,7 @@ pub use alp::ALPExecutor;
1314
pub use bitpacked::BitPackedExecutor;
1415
pub use decimal_byte_parts::DecimalBytePartsExecutor;
1516
pub use for_::FoRExecutor;
17+
pub use runend::RunEndExecutor;
1618
pub use sequence::SequenceExecutor;
1719
pub use zigzag::ZigZagExecutor;
1820
pub use zstd::ZstdExecutor;

0 commit comments

Comments
 (0)