Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 164 additions & 1 deletion docs/chdb/api/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,170 @@ Only the “Arrow” format supports the `record_batch()` method on the returned

**See also**
- [`query()`](#chdb-state-sqlitelike-connection-query) - For non-streaming query execution
- `StreamingResult` - Streaming result iterator
- [`StreamingResult`](#chdb-state-sqlitelike-streamingresult) - Streaming result iterator

---

### **class** `chdb.state.sqlitelike.StreamingResult` {#chdb-state-sqlitelike-streamingresult}

Bases: `object`

Streaming result iterator for processing large query results.

This class provides an iterator interface for streaming query results without
loading the entire result set into memory. It supports various output formats
and provides methods for manual result fetching and PyArrow RecordBatch streaming.

```python
class chdb.state.sqlitelike.StreamingResult
```

---

#### `fetch` {#streamingresult-fetch}

Fetch the next chunk of streaming results.

This method retrieves the next available chunk of data from the streaming
query result. The format of the returned data depends on the format specified
when the streaming query was initiated.

**Syntax**

```python
fetch() → Any
```

**Returns**

| Return Type | Description |
|-------------|-------------|
| `str` | For text formats (CSV, JSON) |
| `bytes` | For binary formats (Arrow, Parquet) |
| `None` | When the result stream is exhausted |

**Examples**

```pycon
>>> stream = conn.send_query("SELECT * FROM large_table")
>>> chunk = stream.fetch()
>>> while chunk is not None:
... process_data(chunk)
... chunk = stream.fetch()
```

---

#### `cancel` {#streamingresult-cancel}

Cancel the streaming query and cleanup resources.

This method cancels any ongoing streaming query and releases associated
resources. It should be called when you want to stop processing results
before the stream is exhausted.

**Syntax**

```python
cancel() → None
```

**Examples**

```pycon
>>> stream = conn.send_query("SELECT * FROM very_large_table")
>>> for i, chunk in enumerate(stream):
... if i >= 10: # Only process first 10 chunks
... stream.cancel()
... break
... process_data(chunk)
```

---

#### `close` {#streamingresult-close}

Close the streaming result and cleanup resources.

Alias for [`cancel()`](#streamingresult-cancel). Closes the streaming result iterator
and releases any associated resources.

**Syntax**

```python
close() → None
```

---

#### `record_batch` {#streamingresult-record_batch}

Create a PyArrow RecordBatchReader for efficient batch processing.

This method creates a PyArrow RecordBatchReader that allows efficient
iteration over the query results in Arrow format. This is the most
efficient way to process large result sets when using PyArrow.

**Syntax**

```python
record_batch(rows_per_batch: int = 1000000) → pa.RecordBatchReader
```

**Parameters**

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `rows_per_batch` | int | `1000000` | Number of rows per batch |

**Returns**

| Return Type | Description |
|-------------|-------------|
| `pa.RecordBatchReader` | PyArrow RecordBatchReader for iterating over batches |

:::note
This method is only available when the streaming query was initiated
with `format="Arrow"`. Using it with other formats will raise an error.
:::

**Examples**

```pycon
>>> stream = conn.send_query("SELECT * FROM data", format="Arrow")
>>> reader = stream.record_batch(rows_per_batch=10000)
>>> for batch in reader:
... print(f"Processing batch: {batch.num_rows} rows")
... df = batch.to_pandas()
... process_dataframe(df)
```

---

#### Iterator Protocol {#streamingresult-iterator}

StreamingResult supports the Python iterator protocol, allowing it to be
used directly in for loops:

```pycon
>>> stream = conn.send_query("SELECT number FROM numbers(1000000)")
>>> for chunk in stream:
... print(f"Chunk size: {len(chunk)} bytes")
```

---

#### Context Manager Protocol {#streamingresult-context-manager}

StreamingResult supports the context manager protocol for automatic
resource cleanup:

```pycon
>>> with conn.send_query("SELECT * FROM data") as stream:
... for chunk in stream:
... process(chunk)
>>> # Stream automatically closed
```

---

Expand Down
Loading