Skip to content

Commit 91fdcc5

Browse files
committed
feat[array]: validate array encodings on write
Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>
1 parent a9661c3 commit 91fdcc5

File tree

4 files changed

+100
-1
lines changed

4 files changed

+100
-1
lines changed

vortex-array/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pub mod mask;
5050
mod mask_future;
5151
pub mod matchers;
5252
mod metadata;
53+
pub mod normalize;
5354
pub mod optimizer;
5455
mod partial_ord;
5556
pub mod patches;

vortex-array/src/normalize.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use itertools::Itertools;
5+
use vortex_error::VortexResult;
6+
use vortex_error::vortex_bail;
7+
use vortex_session::registry::Id;
8+
9+
use crate::Array;
10+
use crate::ArrayRef;
11+
use crate::ArrayVisitor;
12+
use crate::session::ArrayRegistry;
13+
14+
/// Options for normalizing an array.
15+
pub struct NormalizeOptions<'a> {
16+
/// The set of allowed array encodings (in addition to the canonical ones) that are permitted
17+
/// in the normalized array.
18+
pub allowed: &'a ArrayRegistry,
19+
/// The operation to perform when a non-allowed encoding is encountered.
20+
pub operation: Operation,
21+
}
22+
23+
/// The operation to perform when a non-allowed encoding is encountered.
24+
pub enum Operation {
25+
Error,
26+
// TODO(joe): add into canonical variant
27+
}
28+
29+
impl dyn Array + '_ {
30+
/// Normalize the array according to given options.
31+
///
32+
/// This operation performs a recursive traversal of the array. Any non-allowed encoding is
33+
/// normalized per the configured operation.
34+
pub fn normalize(self: ArrayRef, options: &mut NormalizeOptions) -> VortexResult<ArrayRef> {
35+
let array_ids = options.allowed.ids().collect_vec();
36+
self.normalize_with_error(&array_ids)?;
37+
// Note this takes ownership so we can at a later date remove non-allowed encodings.
38+
Ok(self)
39+
}
40+
41+
fn normalize_with_error(self: &ArrayRef, allowed: &[Id]) -> VortexResult<()> {
42+
if !allowed.contains(&self.encoding_id()) {
43+
vortex_bail!(AssertionFailed: "normalize forbids encoding ({})", self.encoding_id())
44+
}
45+
46+
for child in ArrayVisitor::children(self) {
47+
let child: ArrayRef = child;
48+
child.normalize_with_error(allowed)?
49+
}
50+
Ok(())
51+
}
52+
}

vortex-file/src/strategy.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl WriteStrategyBuilder {
7777
/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
7878
/// applied.
7979
pub fn build(self) -> Arc<dyn LayoutStrategy> {
80-
let flat = FlatLayoutStrategy::new(self.session);
80+
let mut flat = FlatLayoutStrategy::new(self.session);
8181

8282
// 7. for each chunk create a flat layout
8383
let chunked = ChunkedLayoutStrategy::new(flat.clone());

vortex-layout/src/layouts/flat/writer.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use vortex_array::ArrayContext;
88
use vortex_array::expr::stats::Precision;
99
use vortex_array::expr::stats::Stat;
1010
use vortex_array::expr::stats::StatsProvider;
11+
use vortex_array::normalize::NormalizeOptions;
12+
use vortex_array::normalize::Operation;
1113
use vortex_array::serde::SerializeOptions;
14+
use vortex_array::session::ArraySessionExt;
1215
use vortex_dtype::DType;
1316
use vortex_error::VortexResult;
1417
use vortex_error::vortex_bail;
@@ -140,6 +143,11 @@ impl LayoutStrategy for FlatLayoutStrategy {
140143
_ => {}
141144
}
142145

146+
let chunk = chunk.normalize(&mut NormalizeOptions {
147+
allowed: self.session.arrays().registry(),
148+
operation: Operation::Error,
149+
})?;
150+
143151
let buffers = chunk.serialize(
144152
&ctx,
145153
&SerializeOptions {
@@ -199,8 +207,10 @@ mod tests {
199207
use vortex_dtype::FieldNames;
200208
use vortex_dtype::Nullability;
201209
use vortex_error::VortexExpect;
210+
use vortex_error::VortexResult;
202211
use vortex_io::runtime::single::block_on;
203212
use vortex_mask::AllOr;
213+
use vortex_mask::Mask;
204214

205215
use crate::LayoutStrategy;
206216
use crate::layouts::flat::writer::FlatLayoutStrategy;
@@ -386,4 +396,40 @@ mod tests {
386396
);
387397
})
388398
}
399+
400+
#[test]
401+
fn flat_invalid_array_fails() -> VortexResult<()> {
402+
block_on(|handle| async {
403+
let prim: PrimitiveArray = (0..10).collect();
404+
let filter = prim.filter(Mask::from_indices(10, vec![2, 3]))?;
405+
406+
let ctx = ArrayContext::empty();
407+
408+
// Write the array into a byte buffer.
409+
let (layout, _segments) = {
410+
let segments = Arc::new(TestSegments::default());
411+
let (ptr, eof) = SequenceId::root().split();
412+
let layout = FlatLayoutStrategy::new(SESSION.clone())
413+
.write_stream(
414+
ctx,
415+
segments.clone(),
416+
filter.to_array_stream().sequenced(ptr),
417+
eof,
418+
handle,
419+
)
420+
.await;
421+
422+
(layout, segments)
423+
};
424+
425+
let err = layout.expect_err("expected error");
426+
assert!(
427+
err.to_string()
428+
.contains("normalize forbids encoding (vortex.filter)"),
429+
"unexpected error: {err}"
430+
);
431+
432+
Ok(())
433+
})
434+
}
389435
}

0 commit comments

Comments
 (0)