Skip to content

Commit 2c198c4

Browse files
authored
Pass limit through Python Vortex (#6239)
And fix up limit to be a u64 Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 2f9b92c commit 2c198c4

File tree

11 files changed

+83
-20
lines changed

11 files changed

+83
-20
lines changed

vortex-cxx/src/read.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl VortexScanBuilder {
9595
}
9696

9797
pub(crate) fn with_limit(&mut self, limit: usize) {
98-
take_mut::take(&mut self.inner, |inner| inner.with_limit(limit));
98+
take_mut::take(&mut self.inner, |inner| inner.with_limit(limit as u64));
9999
}
100100

101101
pub(crate) unsafe fn with_output_schema(&mut self, output_schema: *mut u8) -> Result<()> {

vortex-datafusion/src/persistent/opener.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub(crate) struct VortexOpener {
7474
/// A hint for the desired row count of record batches returned from the scan.
7575
pub batch_size: usize,
7676
/// If provided, the scan will not return more than this many rows.
77-
pub limit: Option<usize>,
77+
pub limit: Option<u64>,
7878
/// A metrics object for tracking performance of the scan.
7979
pub metrics: VortexMetrics,
8080
/// A shared cache of file readers.

vortex-datafusion/src/persistent/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl FileSource for VortexSource {
157157
expr_adapter_factory,
158158
table_schema: self.table_schema.clone(),
159159
batch_size,
160-
limit: base_config.limit,
160+
limit: base_config.limit.map(|l| l as u64),
161161
metrics: partition_metrics,
162162
layout_readers: self.layout_readers.clone(),
163163
has_output_ordering: !base_config.output_ordering.is_empty(),

vortex-python/python/vortex/_lib/file.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class VortexFile:
2626
projection: IntoProjection = None,
2727
*,
2828
expr: Expr | None = None,
29+
limit: int | None = None,
2930
indices: Array | None = None,
3031
batch_size: int | None = None,
3132
) -> ArrayIterator: ...
@@ -34,6 +35,7 @@ class VortexFile:
3435
projection: IntoProjection = None,
3536
*,
3637
expr: Expr | None = None,
38+
limit: int | None = None,
3739
indices: Array | None = None,
3840
batch_size: int | None = None,
3941
) -> RepeatedScan: ...
@@ -42,6 +44,7 @@ class VortexFile:
4244
projection: IntoProjection = None,
4345
*,
4446
expr: Expr | None = None,
47+
limit: int | None = None,
4548
batch_size: int | None = None,
4649
) -> pa.RecordBatchReader: ...
4750
def to_dataset(self) -> VortexDataset: ...

vortex-python/python/vortex/file.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def scan(
8383
projection: IntoProjection = None,
8484
*,
8585
expr: Expr | None = None,
86+
limit: int | None = None,
8687
indices: Array | None = None,
8788
batch_size: int | None = None,
8889
) -> ArrayIterator:
@@ -94,6 +95,8 @@ def scan(
9495
The projection expression to read, or else read all columns.
9596
expr : :class:`vortex.Expr` | None
9697
The predicate used to filter rows. The filter columns do not need to be in the projection.
98+
limit : :class:`int` | None
99+
The maximum number of rows to read after filtering. If None, read all rows.
97100
indices : :class:`vortex.Array` | None
98101
The indices of the rows to read. Must be sorted and non-null.
99102
batch_size : :class:`int` | None
@@ -164,13 +167,14 @@ def scan(
164167
"Mikhail"
165168
]
166169
"""
167-
return self._file.scan(projection, expr=expr, indices=indices, batch_size=batch_size)
170+
return self._file.scan(projection, expr=expr, limit=limit, indices=indices, batch_size=batch_size)
168171

169172
def to_repeated_scan(
170173
self,
171174
projection: IntoProjection = None,
172175
*,
173176
expr: Expr | None = None,
177+
limit: int | None = None,
174178
indices: Array | None = None,
175179
batch_size: int | None = None,
176180
) -> RepeatedScan:
@@ -187,12 +191,15 @@ def to_repeated_scan(
187191
batch_size : :class:`int` | None
188192
The number of rows to read per chunk.
189193
"""
190-
return RepeatedScan(self._file.prepare(projection, expr=expr, indices=indices, batch_size=batch_size))
194+
return RepeatedScan(
195+
self._file.prepare(projection, expr=expr, limit=limit, indices=indices, batch_size=batch_size)
196+
)
191197

192198
def to_arrow(
193199
self,
194200
projection: IntoProjection = None,
195201
*,
202+
limit: int | None = None,
196203
expr: Expr | None = None,
197204
batch_size: int | None = None,
198205
) -> RecordBatchReader:
@@ -209,7 +216,7 @@ def to_arrow(
209216
The number of rows to read per chunk.
210217
211218
"""
212-
return self._file.to_arrow(projection, expr=expr, batch_size=batch_size)
219+
return self._file.to_arrow(projection, expr=expr, limit=limit, batch_size=batch_size)
213220

214221
def to_dataset(self) -> VortexDataset:
215222
"""Scan the Vortex file using the :class:`pyarrow.dataset.Dataset` API."""
@@ -227,12 +234,12 @@ def to_polars(self) -> polars.LazyFrame:
227234
def _io_source(
228235
with_columns: list[str] | None,
229236
predicate: pl.Expr | None,
230-
_n_rows: int | None,
237+
n_rows: int | None,
231238
_batch_size: int | None,
232239
) -> Iterator[pl.DataFrame]:
233240
vx_predicate: Expr | None = None if predicate is None else polars_to_vortex(predicate)
234241

235-
reader = self.to_arrow(projection=with_columns, expr=vx_predicate)
242+
reader = self.to_arrow(projection=with_columns, expr=vx_predicate, limit=n_rows)
236243

237244
for batch in reader:
238245
batch = pl.DataFrame._from_arrow(batch, rechunk=False) # pyright: ignore[reportPrivateUsage]

vortex-python/src/file.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,19 @@ impl PyVortexFile {
101101
PyDType::init(slf.py(), slf.get().vxf.dtype().clone())
102102
}
103103

104-
#[pyo3(signature = (projection = None, *, expr = None, indices = None, batch_size = None))]
104+
#[pyo3(signature = (projection = None, *, expr = None, limit = None, indices = None, batch_size = None))]
105105
fn scan(
106106
slf: Bound<Self>,
107107
projection: Option<PyIntoProjection>,
108108
expr: Option<PyExpr>,
109+
limit: Option<u64>,
109110
indices: Option<PyArrayRef>,
110111
batch_size: Option<usize>,
111112
) -> PyVortexResult<PyArrayIterator> {
112113
let builder = slf.get().scan_builder(
113114
projection.map(|p| p.0),
114115
expr.map(|e| e.into_inner()),
116+
limit,
115117
indices.map(|i| i.into_inner()),
116118
batch_size,
117119
)?;
@@ -121,17 +123,19 @@ impl PyVortexFile {
121123
)))
122124
}
123125

124-
#[pyo3(signature = (projection = None, *, expr = None, indices = None, batch_size = None))]
126+
#[pyo3(signature = (projection = None, *, expr = None, limit = None, indices = None, batch_size = None))]
125127
fn prepare(
126128
slf: Bound<Self>,
127129
projection: Option<PyIntoProjection>,
128130
expr: Option<PyExpr>,
131+
limit: Option<u64>,
129132
indices: Option<PyArrayRef>,
130133
batch_size: Option<usize>,
131134
) -> PyVortexResult<PyRepeatedScan> {
132135
let builder = slf.get().scan_builder(
133136
projection.map(|p| p.0),
134137
expr.map(|e| e.into_inner()),
138+
limit,
135139
indices.map(|i| i.into_inner()),
136140
batch_size,
137141
)?;
@@ -144,11 +148,12 @@ impl PyVortexFile {
144148
})
145149
}
146150

147-
#[pyo3(signature = (projection = None, *, expr = None, batch_size = None))]
151+
#[pyo3(signature = (projection = None, *, expr = None, limit = None, batch_size = None))]
148152
fn to_arrow(
149153
slf: Bound<Self>,
150154
projection: Option<PyIntoProjection>,
151155
expr: Option<PyExpr>,
156+
limit: Option<u64>,
152157
batch_size: Option<usize>,
153158
) -> PyVortexResult<Py<PyAny>> {
154159
let vxf = slf.get().vxf.clone();
@@ -159,6 +164,10 @@ impl PyVortexFile {
159164
.with_some_filter(expr.map(|e| e.into_inner()))
160165
.with_projection(projection.map(|p| p.0).unwrap_or_else(root));
161166

167+
if let Some(limit) = limit {
168+
builder = builder.with_limit(limit);
169+
}
170+
162171
if let Some(batch_size) = batch_size {
163172
builder = builder.with_split_by(SplitBy::RowCount(batch_size));
164173
}
@@ -191,6 +200,7 @@ impl PyVortexFile {
191200
&self,
192201
projection: Option<Expression>,
193202
expr: Option<Expression>,
203+
limit: Option<u64>,
194204
indices: Option<ArrayRef>,
195205
batch_size: Option<usize>,
196206
) -> VortexResult<ScanBuilder<ArrayRef>> {
@@ -200,6 +210,10 @@ impl PyVortexFile {
200210
.with_some_filter(expr)
201211
.with_projection(projection.unwrap_or_else(root));
202212

213+
if let Some(limit) = limit {
214+
builder = builder.with_limit(limit);
215+
}
216+
203217
if let Some(indices) = indices {
204218
let indices = cast(indices.as_ref(), &DType::Primitive(PType::U64, NonNullable))?
205219
.to_primitive()

vortex-python/test/test_polars_.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
import math
5+
import os
6+
47
import polars as pl
8+
import pyarrow as pa
59
import pytest
610

11+
import vortex as vx
712
import vortex.expr as ve
813
from vortex.polars_ import polars_to_vortex
914

@@ -34,3 +39,36 @@
3439
def test_exprs(polars: pl.Expr, vortex: ve.Expr):
3540
# Dump the clickbench filters
3641
assert polars_to_vortex(polars) == vortex
42+
43+
44+
@pytest.fixture(scope="module")
45+
def vxf(tmpdir_factory): # pyright: ignore[reportUnknownParameterType, reportMissingParameterType]
46+
fname = tmpdir_factory.mktemp("data") / "polars_test.vortex" # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
47+
48+
if not os.path.exists(fname): # pyright: ignore[reportUnknownArgumentType]
49+
a = pa.array([{"index": x, "value": math.sqrt(x)} for x in range(1_000_000)])
50+
vx.io.write(vx.compress(vx.array(a)), str(fname)) # pyright: ignore[reportUnknownArgumentType]
51+
return vx.open(str(fname), without_segment_cache=True) # pyright: ignore[reportUnknownArgumentType]
52+
53+
54+
def test_to_polars_with_limit(vxf: vx.VortexFile):
55+
df = vxf.to_polars().limit(100).collect()
56+
assert len(df) == 100
57+
58+
59+
def test_to_polars_with_filter(vxf: vx.VortexFile):
60+
df = vxf.to_polars().filter(pl.col("index") < 500).collect()
61+
assert len(df) == 500
62+
assert df["index"].to_list() == list(range(500))
63+
64+
65+
def test_to_polars_with_projection(vxf: vx.VortexFile):
66+
df = vxf.to_polars().select("index").limit(10).collect()
67+
assert df.columns == ["index"]
68+
assert len(df) == 10
69+
70+
71+
def test_to_polars_with_projection_and_filter(vxf: vx.VortexFile):
72+
df = vxf.to_polars().select("index", "value").filter(pl.col("index") < 100).collect()
73+
assert df.columns == ["index", "value"]
74+
assert len(df) == 100

vortex-scan/src/layout.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ impl DataSource for LayoutReaderDataSource {
6262
}
6363

6464
if let Some(limit) = scan_request.limit {
65-
// TODO(ngates): ScanBuilder limit should be u64
66-
let limit = usize::try_from(limit)?;
6765
builder = builder.with_limit(limit);
6866
}
6967

vortex-scan/src/repeated_scan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub struct RepeatedScan<A: 'static + Send> {
5050
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
5151
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
5252
/// Maximal number of rows to read (after filtering)
53-
limit: Option<usize>,
53+
limit: Option<u64>,
5454
/// The dtype of the projected arrays.
5555
dtype: DType,
5656
}
@@ -98,7 +98,7 @@ impl<A: 'static + Send> RepeatedScan<A> {
9898
splits: Splits,
9999
concurrency: usize,
100100
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
101-
limit: Option<usize>,
101+
limit: Option<u64>,
102102
dtype: DType,
103103
) -> Self {
104104
Self {

vortex-scan/src/scan_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct ScanBuilder<A> {
7070
/// Should we try to prune the file (using stats) on open.
7171
file_stats: Option<Arc<[StatsSet]>>,
7272
/// Maximal number of rows to read (after filtering)
73-
limit: Option<usize>,
73+
limit: Option<u64>,
7474
/// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
7575
/// but not by the scan [`Selection`] which remains relative.
7676
row_offset: u64,
@@ -180,7 +180,7 @@ impl<A: 'static + Send> ScanBuilder<A> {
180180
self
181181
}
182182

183-
pub fn with_limit(mut self, limit: usize) -> Self {
183+
pub fn with_limit(mut self, limit: u64) -> Self {
184184
self.limit = Some(limit);
185185
self
186186
}

0 commit comments

Comments
 (0)