Skip to content

Commit 029f199

Browse files
feat[array]: validate array encodings on write (#6241)
based on #6213 Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent 226874e commit 029f199

File tree

11 files changed

+293
-38
lines changed

11 files changed

+293
-38
lines changed

fuzz/fuzz_targets/file_io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ fuzz_target!(|fuzz: FuzzFileAction| -> Corpus {
6161
let write_options = match compressor_strategy {
6262
CompressorStrategy::Default => SESSION.write_options(),
6363
CompressorStrategy::Compact => {
64-
let strategy = WriteStrategyBuilder::new()
64+
let strategy = WriteStrategyBuilder::default()
6565
.with_compressor(CompactCompressor::default())
6666
.build();
6767
SESSION.write_options().with_strategy(strategy)

vortex-array/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub mod mask;
5050
mod mask_future;
5151
pub mod matchers;
5252
mod metadata;
53+
pub mod normalize;
5354
pub mod optimizer;
5455
mod partial_ord;
5556
pub mod patches;

vortex-array/src/normalize.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use itertools::Itertools;
5+
use vortex_error::VortexResult;
6+
use vortex_error::vortex_bail;
7+
use vortex_session::registry::Id;
8+
9+
use crate::Array;
10+
use crate::ArrayRef;
11+
use crate::ArrayVisitor;
12+
use crate::session::ArrayRegistry;
13+
14+
/// Options for normalizing an array.
15+
pub struct NormalizeOptions<'a> {
16+
/// The set of allowed array encodings (in addition to the canonical ones) that are permitted
17+
/// in the normalized array.
18+
pub allowed: &'a ArrayRegistry,
19+
/// The operation to perform when a non-allowed encoding is encountered.
20+
pub operation: Operation,
21+
}
22+
23+
/// The operation to perform when a non-allowed encoding is encountered.
24+
pub enum Operation {
25+
Error,
26+
// TODO(joe): add into canonical variant
27+
}
28+
29+
impl dyn Array + '_ {
30+
/// Normalize the array according to given options.
31+
///
32+
/// This operation performs a recursive traversal of the array. Any non-allowed encoding is
33+
/// normalized per the configured operation.
34+
pub fn normalize(self: ArrayRef, options: &mut NormalizeOptions) -> VortexResult<ArrayRef> {
35+
let array_ids = options.allowed.ids().collect_vec();
36+
self.normalize_with_error(&array_ids)?;
37+
// Note this takes ownership so we can at a later date remove non-allowed encodings.
38+
Ok(self)
39+
}
40+
41+
fn normalize_with_error(self: &ArrayRef, allowed: &[Id]) -> VortexResult<()> {
42+
if !allowed.contains(&self.encoding_id()) {
43+
vortex_bail!(AssertionFailed: "normalize forbids encoding ({})", self.encoding_id())
44+
}
45+
46+
for child in ArrayVisitor::children(self) {
47+
let child: ArrayRef = child;
48+
child.normalize_with_error(allowed)?
49+
}
50+
Ok(())
51+
}
52+
}

vortex-bench/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ impl CompactionStrategy {
212212
pub fn apply_options(&self, options: VortexWriteOptions) -> VortexWriteOptions {
213213
match self {
214214
CompactionStrategy::Compact => options.with_strategy(
215-
WriteStrategyBuilder::new()
215+
WriteStrategyBuilder::default()
216216
.with_compressor(CompactCompressor::default())
217217
.build(),
218218
),

vortex-file/src/strategy.rs

Lines changed: 105 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,37 @@
44
//! This module defines the default layout strategy for a Vortex file.
55
66
use std::sync::Arc;
7-
7+
use std::sync::LazyLock;
8+
9+
// Compressed encodings from encoding crates
10+
use vortex_alp::ALPRDVTable;
11+
use vortex_alp::ALPVTable;
12+
// Canonical array encodings from vortex-array
13+
use vortex_array::arrays::BoolVTable;
14+
use vortex_array::arrays::ChunkedVTable;
15+
use vortex_array::arrays::ConstantVTable;
16+
use vortex_array::arrays::DecimalVTable;
17+
use vortex_array::arrays::DictVTable;
18+
use vortex_array::arrays::ExtensionVTable;
19+
use vortex_array::arrays::FixedSizeListVTable;
20+
use vortex_array::arrays::ListVTable;
21+
use vortex_array::arrays::ListViewVTable;
22+
use vortex_array::arrays::MaskedVTable;
23+
use vortex_array::arrays::NullVTable;
24+
use vortex_array::arrays::PrimitiveVTable;
25+
use vortex_array::arrays::StructVTable;
26+
use vortex_array::arrays::VarBinVTable;
27+
use vortex_array::arrays::VarBinViewVTable;
28+
use vortex_array::session::ArrayRegistry;
29+
use vortex_bytebool::ByteBoolVTable;
30+
use vortex_datetime_parts::DateTimePartsVTable;
31+
use vortex_decimal_byte_parts::DecimalBytePartsVTable;
832
use vortex_dtype::FieldPath;
33+
use vortex_fastlanes::BitPackedVTable;
34+
use vortex_fastlanes::DeltaVTable;
35+
use vortex_fastlanes::FoRVTable;
36+
use vortex_fastlanes::RLEVTable;
37+
use vortex_fsst::FSSTVTable;
938
use vortex_layout::LayoutStrategy;
1039
use vortex_layout::layouts::buffered::BufferedStrategy;
1140
use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy;
@@ -19,10 +48,64 @@ use vortex_layout::layouts::repartition::RepartitionWriterOptions;
1948
use vortex_layout::layouts::table::TableStrategy;
2049
use vortex_layout::layouts::zoned::writer::ZonedLayoutOptions;
2150
use vortex_layout::layouts::zoned::writer::ZonedStrategy;
51+
use vortex_pco::PcoVTable;
52+
use vortex_runend::RunEndVTable;
53+
use vortex_sequence::SequenceVTable;
54+
use vortex_sparse::SparseVTable;
2255
use vortex_utils::aliases::hash_map::HashMap;
56+
use vortex_zigzag::ZigZagVTable;
57+
#[cfg(feature = "zstd")]
58+
use vortex_zstd::ZstdVTable;
2359

2460
const ONE_MEG: u64 = 1 << 20;
2561

62+
/// Static registry of all allowed array encodings for file writing.
63+
///
64+
/// This includes all canonical encodings from vortex-array plus all compressed
65+
/// encodings from the various encoding crates.
66+
pub static ALLOWED_ENCODINGS: LazyLock<ArrayRegistry> = LazyLock::new(|| {
67+
let registry = ArrayRegistry::default();
68+
69+
// Canonical encodings from vortex-array
70+
registry.register(NullVTable::ID, NullVTable);
71+
registry.register(BoolVTable::ID, BoolVTable);
72+
registry.register(PrimitiveVTable::ID, PrimitiveVTable);
73+
registry.register(DecimalVTable::ID, DecimalVTable);
74+
registry.register(VarBinVTable::ID, VarBinVTable);
75+
registry.register(VarBinViewVTable::ID, VarBinViewVTable);
76+
registry.register(ListVTable::ID, ListVTable);
77+
registry.register(ListViewVTable::ID, ListViewVTable);
78+
registry.register(FixedSizeListVTable::ID, FixedSizeListVTable);
79+
registry.register(StructVTable::ID, StructVTable);
80+
registry.register(ExtensionVTable::ID, ExtensionVTable);
81+
registry.register(ChunkedVTable::ID, ChunkedVTable);
82+
registry.register(ConstantVTable::ID, ConstantVTable);
83+
registry.register(MaskedVTable::ID, MaskedVTable);
84+
registry.register(DictVTable::ID, DictVTable);
85+
86+
// Compressed encodings from encoding crates
87+
registry.register(ALPVTable::ID, ALPVTable);
88+
registry.register(ALPRDVTable::ID, ALPRDVTable);
89+
registry.register(BitPackedVTable::ID, BitPackedVTable);
90+
registry.register(ByteBoolVTable::ID, ByteBoolVTable);
91+
registry.register(DateTimePartsVTable::ID, DateTimePartsVTable);
92+
registry.register(DecimalBytePartsVTable::ID, DecimalBytePartsVTable);
93+
registry.register(DeltaVTable::ID, DeltaVTable);
94+
registry.register(FoRVTable::ID, FoRVTable);
95+
registry.register(FSSTVTable::ID, FSSTVTable);
96+
registry.register(PcoVTable::ID, PcoVTable);
97+
registry.register(RLEVTable::ID, RLEVTable);
98+
registry.register(RunEndVTable::ID, RunEndVTable);
99+
registry.register(SequenceVTable::ID, SequenceVTable);
100+
registry.register(SparseVTable::ID, SparseVTable);
101+
registry.register(ZigZagVTable::ID, ZigZagVTable);
102+
103+
#[cfg(feature = "zstd")]
104+
registry.register(ZstdVTable::ID, ZstdVTable);
105+
106+
registry
107+
});
108+
26109
/// Build a new [writer strategy][LayoutStrategy] to compress and reorganize chunks of a Vortex file.
27110
///
28111
/// Vortex provides an out-of-the-box file writer that optimizes the layout of chunks on-disk,
@@ -32,25 +115,23 @@ pub struct WriteStrategyBuilder {
32115
compressor: Option<Arc<dyn CompressorPlugin>>,
33116
row_block_size: usize,
34117
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
118+
allow_encodings: Option<ArrayRegistry>,
35119
}
36120

37121
impl Default for WriteStrategyBuilder {
122+
/// Create a new empty builder. It can be further configured,
123+
/// and then finally built yielding the [`LayoutStrategy`].
38124
fn default() -> Self {
39-
Self::new()
40-
}
41-
}
42-
43-
impl WriteStrategyBuilder {
44-
/// Create a new empty builder. It can be further configured, and then finally built
45-
/// yielding the [`LayoutStrategy`].
46-
pub fn new() -> Self {
47125
Self {
48126
compressor: None,
49127
row_block_size: 8192,
50128
field_writers: HashMap::new(),
129+
allow_encodings: None,
51130
}
52131
}
132+
}
53133

134+
impl WriteStrategyBuilder {
54135
/// Override the [compressor][CompressorPlugin] used for compressing chunks in the file.
55136
///
56137
/// If not provided, this will use a BtrBlocks-style cascading compressor that tries to balance
@@ -77,11 +158,23 @@ impl WriteStrategyBuilder {
77158
self
78159
}
79160

161+
/// Override the allowed array encodings for normalization.
162+
pub fn with_allow_encodings(mut self, allow_encodings: ArrayRegistry) -> Self {
163+
self.allow_encodings = Some(allow_encodings);
164+
self
165+
}
166+
80167
/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
81168
/// applied.
82169
pub fn build(self) -> Arc<dyn LayoutStrategy> {
170+
let flat = if let Some(allow_encodings) = self.allow_encodings {
171+
FlatLayoutStrategy::default().with_allow_encodings(allow_encodings)
172+
} else {
173+
FlatLayoutStrategy::default()
174+
};
175+
83176
// 7. for each chunk create a flat layout
84-
let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default());
177+
let chunked = ChunkedLayoutStrategy::new(flat.clone());
85178
// 6. buffer chunks so they end up with closer segment ids physically
86179
let buffered = BufferedStrategy::new(chunked, 2 * ONE_MEG); // 2MB
87180
// 5. compress each chunk
@@ -110,9 +203,9 @@ impl WriteStrategyBuilder {
110203

111204
// 2.1. | 3.1. compress stats tables and dict values.
112205
let compress_then_flat = if let Some(ref compressor) = self.compressor {
113-
CompressingStrategy::new_opaque(FlatLayoutStrategy::default(), compressor.clone())
206+
CompressingStrategy::new_opaque(flat, compressor.clone())
114207
} else {
115-
CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), false)
208+
CompressingStrategy::new_btrblocks(flat, false)
116209
};
117210

118211
// 3. apply dict encoding or fallback

vortex-file/src/writer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ pub struct VortexWriteOptions {
6969
pub trait WriteOptionsSessionExt: SessionExt {
7070
/// Create [`VortexWriteOptions`] for writing to a Vortex file.
7171
fn write_options(&self) -> VortexWriteOptions {
72+
let session = self.session();
7273
VortexWriteOptions {
73-
session: self.session(),
74-
strategy: WriteStrategyBuilder::new().build(),
74+
strategy: WriteStrategyBuilder::default().build(),
75+
session,
7576
exclude_dtype: false,
7677
file_statistics: PRUNING_STATS.to_vec(),
7778
max_variable_length_statistics_size: 64,
@@ -84,8 +85,8 @@ impl VortexWriteOptions {
8485
/// Create a new [`VortexWriteOptions`] with the given session.
8586
pub fn new(session: VortexSession) -> Self {
8687
VortexWriteOptions {
88+
strategy: WriteStrategyBuilder::default().build(),
8789
session,
88-
strategy: WriteStrategyBuilder::new().build(),
8990
exclude_dtype: false,
9091
file_statistics: PRUNING_STATS.to_vec(),
9192
max_variable_length_statistics_size: 64,

0 commit comments

Comments
 (0)