Skip to content

Commit 22b5810

Browse files
committed
feat: add streaming support
1 parent c278a1f commit 22b5810

File tree

6 files changed

+682
-3
lines changed

6 files changed

+682
-3
lines changed

Makefile

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
11
venv:
22
uv venv
33
source .venv/bin/activate && uv pip install -e .
4-
54
run:
6-
source .venv/bin/activate && python example.py
5+
source .venv/bin/activate && python example.py
6+
7+
clean:
8+
rm -rf .venv
9+
rm -rf __pycache__
10+
rm -rf *.pyc
11+
rm -rf *.pyo
12+
rm -rf *.pyd
13+
rm -rf *.egg-info
14+
15+
lab:
16+
source .venv/bin/activate && \
17+
uv pip install jupyterlab ipykernel && \
18+
uv pip install jupyterlab-lsp python-lsp-server[all] && \
19+
uv pip install python-lsp-server[rope] pylsp-mypy pylsp-rope && \
20+
uv pip install -e . && \
21+
python -m ipykernel install --user --name=venv --display-name="Python (hugr)" && \
22+
jupyter lab --no-browser --port=8888

README.md

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,123 @@ hugr.query(
131131
)
132132
```
133133

134+
## Streaming API
135+
136+
In addition to standard HTTP queries, `hugr-client` supports asynchronous streaming of data via WebSocket. This allows you to receive large datasets in batches or row-by-row, without waiting for the entire result to be loaded into memory.
137+
138+
### Quick Start
139+
140+
```python
141+
import asyncio
142+
from hugr.stream import connect_stream
143+
144+
async def main():
145+
client = connect_stream("http://localhost:15001/ipc")
146+
147+
# HTTP query for total count
148+
result = client.query("query { devices_aggregation { _rows_count } }")
149+
print("Total devices:", result.record()['_rows_count'])
150+
151+
# Stream data in batches (Arrow RecordBatch)
152+
async with await client.stream(
153+
"""
154+
query {
155+
devices {
156+
id
157+
name
158+
geom
159+
}
160+
}
161+
"""
162+
) as stream:
163+
async for batch in stream.chunks():
164+
df = batch.to_pandas()
165+
print("Batch:", len(df), "rows")
166+
167+
# Stream data row by row
168+
async with await client.stream(
169+
"query { devices { id name status } }"
170+
) as stream:
171+
async for row in stream.rows():
172+
print(row)
173+
174+
asyncio.run(main())
175+
```
176+
177+
### Main Features
178+
179+
- **connect_stream** — create a streaming client (WebSocket).
180+
- **client.stream(query, variables=None)** — asynchronously get a stream of Arrow RecordBatch for a GraphQL query.
181+
- **stream.chunks()** — async generator for batches (RecordBatch).
182+
- **stream.rows()** — async generator for rows (dict).
183+
- **stream.to_pandas()** — collect all streamed data into a pandas.DataFrame.
184+
- **stream.count()** — count the number of rows in the stream.
185+
- **stream_data_object(data_object, fields, variables=None)** — stream a specific data object and fields.
186+
187+
### Example: Collect DataFrame via Streaming
188+
189+
```python
190+
import asyncio
191+
from hugr.stream import connect_stream
192+
193+
async def main():
194+
client = connect_stream("http://localhost:15001/ipc")
195+
async with await client.stream(
196+
"query { devices { id name geom } }"
197+
) as stream:
198+
df = await stream.to_pandas()
199+
print(df.head())
200+
201+
asyncio.run(main())
202+
```
203+
204+
### Example: Row-by-row Processing
205+
206+
```python
207+
import asyncio
208+
from hugr.stream import connect_stream
209+
210+
async def main():
211+
client = connect_stream()
212+
async with await client.stream(
213+
"query { devices { id name status } }"
214+
) as stream:
215+
async for row in stream.rows():
216+
if row.get("status") == "active":
217+
print("Active device:", row["name"])
218+
219+
asyncio.run(main())
220+
```
221+
222+
### Example: Query Cancellation
223+
224+
```python
225+
import asyncio
226+
from hugr.stream import connect_stream
227+
228+
async def main():
229+
client = connect_stream()
230+
async with await client.stream(
231+
"query { devices { id name } }"
232+
) as stream:
233+
count = 0
234+
async for batch in stream.chunks():
235+
count += batch.num_rows
236+
if count > 1000:
237+
await client.cancel_current_query()
238+
break
239+
240+
asyncio.run(main())
241+
```
242+
243+
### Notes
244+
245+
- All streaming functions are asynchronous and require `async`/`await`.
246+
- Dependencies: `websockets`, `pyarrow`, `pandas`.
247+
- You can use both a pure streaming client and an enhanced client with HTTP and WebSocket support.
248+
249+
See more in [hugr/stream.py](hugr/stream.py) and the code examples in the source files.
250+
134251
## License
135252

136253
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

hugr/__init__.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@
88
explore_map,
99
)
1010

11+
from .stream import (
12+
HugrStreamConnection,
13+
HugrStreamingClient,
14+
HugrStream,
15+
connect_stream,
16+
new_stream_connection,
17+
)
18+
1119
__all__ = [
1220
"HugrClient",
1321
"HugrIPCResponse",
@@ -16,6 +24,11 @@
1624
"connect",
1725
"query",
1826
"explore_map",
27+
"HugrStreamConnection",
28+
"HugrStreamingClient",
29+
"HugrStream",
30+
"connect_stream",
31+
"new_stream_connection",
1932
]
2033

21-
__version__ = "0.1.0"
34+
__version__ = "0.1.1"

hugr/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ def __init__(
4040
for field, fi in geom_fields.items():
4141
encoding = fi.get("format", "wkb").lower()
4242
if len(field.split(".")) == 1:
43+
if encoding == "h3cell":
44+
# H3 cells are stored as strings, no decoding needed
45+
continue
4346
self._df[field] = self._df[field].apply(
4447
lambda x: _decode_geom(x, encoding)
4548
)
@@ -357,6 +360,8 @@ def _decode_geom(val, fmt):
357360
return None
358361
if isinstance(val, BaseGeometry):
359362
return val
363+
if fmt == "h3cell":
364+
return val
360365
if fmt == "wkb":
361366
return wkb.loads(val)
362367
elif fmt == "geojson":
@@ -372,6 +377,8 @@ def _encode_geojson(val, fmt):
372377
return None
373378
if isinstance(val, BaseGeometry):
374379
return mapping(val)
380+
if fmt == "h3cell":
381+
return val
375382
if fmt == "wkb":
376383
return mapping(wkb.load(val))
377384
elif fmt == "geojson":

0 commit comments

Comments
 (0)