Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions docs/examples/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Streaming Examples

This directory contains examples demonstrating Mellea's streaming capabilities.

## Examples

### basic_streaming.py
Basic example showing how to stream model outputs token by token. This is the simplest way to get started with streaming in Mellea.

**Run:**
```bash
uv run --with mellea docs/examples/streaming/basic_streaming.py
```

### interactive_chat.py
Interactive chat application with streaming responses. Shows how to build a conversational interface where the AI's responses appear incrementally.

**Run:**
```bash
uv run --with mellea docs/examples/streaming/interactive_chat.py
```

### advanced_streaming.py
Advanced example showing error handling, buffering, and other best practices for production streaming applications.

**Run:**
```bash
uv run --with mellea docs/examples/streaming/advanced_streaming.py
```

## Key Concepts

### Streaming Requires Async
Streaming is only available with async functions (`ainstruct`, `aact`) using `await_result=False` and `strategy=None`:

```python
# This works - async with await_result=False and strategy=None
thunk = await m.ainstruct("Hello", await_result=False, strategy=None)
last_length = 0
while not thunk.is_computed():
current_value = await thunk.astream()
new_content = current_value[last_length:]
print(new_content, end="")
last_length = len(current_value)

# This doesn't work - sync functions always await
result = m.instruct("Hello") # Already computed, cannot stream
```

**Note**: `astream()` returns the accumulated output so far, not individual chunks. You need to track what you've already displayed to show only new content.

### ModelOutputThunk Types
- **`ModelOutputThunk`**: Uncomputed, can be streamed
- **`ComputedModelOutputThunk`**: Already computed, cannot be streamed

### Limitations
- Cannot stream when using `SamplingStrategy` (validation requires complete output) - must set `strategy=None`
- Cannot stream from synchronous functions (would cause deadlock)
- Streaming requires an async context
- Default `strategy=RejectionSamplingStrategy(loop_budget=2)` must be disabled for streaming

## See Also
- [Tutorial Chapter 13: Streaming Model Outputs](../../tutorial.md#chapter-13-streaming-model-outputs)
- [Tutorial Chapter 12: Asynchronicity](../../tutorial.md#chapter-12-asynchronicity)
121 changes: 121 additions & 0 deletions docs/examples/streaming/advanced_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Advanced streaming example with error handling and buffering.

This example demonstrates production-ready streaming patterns including:
- Error handling for network issues
- Buffering for smoother UI updates
- Timeout handling
- Graceful degradation
"""

# pytest: ollama, llm

import asyncio

import mellea


async def stream_with_error_handling():
"""Stream with comprehensive error handling."""
m = mellea.start_session()

print("Streaming with error handling...\n")

try:
thunk = await m.ainstruct(
"Write a technical explanation of how neural networks learn.",
await_result=False,
)

last_length = 0
while not thunk.is_computed():
current_value = await thunk.astream()
new_content = current_value[last_length:]
print(new_content, end="", flush=True)
last_length = len(current_value)
print()

except asyncio.TimeoutError:
print("\n[Error: Request timed out]")
except Exception as e:
print(f"\n[Error: {e}]")


async def stream_with_buffering():
"""Stream with buffering for smoother output."""
m = mellea.start_session()

print("\nStreaming with buffering...\n")

thunk = await m.ainstruct(
"Explain quantum computing in simple terms.", await_result=False
)

last_length = 0
buffer = []
buffer_size = 50 # Buffer 50 characters before displaying

while not thunk.is_computed():
current_value = await thunk.astream()
new_content = current_value[last_length:]
buffer.append(new_content)
last_length = len(current_value)

# Flush buffer when it reaches the size threshold
if sum(len(s) for s in buffer) >= buffer_size:
print("".join(buffer), end="", flush=True)
buffer = []

# Flush remaining buffer
if buffer:
print("".join(buffer), end="", flush=True)
print()


async def compare_streaming_vs_blocking():
"""Compare streaming vs blocking behavior."""
m = mellea.start_session()

print("\n" + "=" * 60)
print("COMPARISON: Streaming vs Blocking")
print("=" * 60)

# Blocking (default behavior)
print("\n1. Blocking mode (await_result=True, default):")
print(" Waiting for complete response...")
result = await m.ainstruct(
"Write a haiku about programming.",
await_result=True, # This is the default
)
print(f" Result: {result.value}")

# Streaming
print("\n2. Streaming mode (await_result=False):")
print(" Tokens appear as generated: ", end="", flush=True)
thunk = await m.ainstruct(
"Write a haiku about programming.",
await_result=False,
strategy=None, # Must disable strategy for streaming
)

# Stream until complete - call astream() at least once even if already computed
last_length = 0
while True:
current_value = await thunk.astream()
new_content = current_value[last_length:]
print(new_content, end="", flush=True)
last_length = len(current_value)

if thunk.is_computed():
break
print()


async def main():
"""Run all advanced streaming examples."""
await stream_with_error_handling()
await stream_with_buffering()
await compare_streaming_vs_blocking()


if __name__ == "__main__":
asyncio.run(main())
39 changes: 39 additions & 0 deletions docs/examples/streaming/basic_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Basic streaming example showing how to stream model outputs incrementally.

This example demonstrates the fundamental streaming capability in Mellea.
"""

# pytest: ollama, llm

import asyncio

import mellea


async def stream_story():
"""Stream a short story incrementally."""
m = mellea.start_session()

print("Streaming story generation...\n")

# Get uncomputed thunk for streaming
thunk = await m.ainstruct(
"cont up 1 through 100.",
await_result=False,
strategy=None, # Must disable strategy for streaming
)

# Stream the output - astream() returns accumulated value so far
last_length = 0
while not thunk.is_computed():
current_value = await thunk.astream()
# Print only the new portion
new_content = current_value[last_length:]
print(new_content, end="", flush=True)
last_length = len(current_value)

print() # New line at end


if __name__ == "__main__":
asyncio.run(stream_story())
54 changes: 54 additions & 0 deletions docs/examples/streaming/interactive_chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Interactive chat example with streaming responses.

This example shows how to build an interactive chat application where
the AI's responses are streamed incrementally for a better user experience.

Note: This example uses ChatContext which triggers a warning about async usage.
The warning is expected but safe here because we await the result after streaming.
For production use, consider using SimpleContext or handling the context updates manually.
"""

# pytest: ollama, llm

import asyncio

import mellea
from mellea.stdlib.context import SimpleContext


async def interactive_chat():
"""Run an interactive chat session with streaming responses."""
m = mellea.start_session(ctx=SimpleContext())

print("Chat with the AI (type 'quit' to exit)")
print("-" * 50)

while True:
user_input = input("\nYou: ")
if user_input.lower() == "quit":
break

print("AI: ", end="", flush=True)

# Stream the response
thunk = await m.ainstruct(
user_input,
await_result=False,
strategy=None, # Must disable strategy for streaming
)

last_length = 0
while not thunk.is_computed():
current_value = await thunk.astream()
# Print only the new portion
new_content = current_value[last_length:]
print(new_content, end="", flush=True)
last_length = len(current_value)

# Await the final result to update context properly
await thunk.avalue()
print() # New line after response


if __name__ == "__main__":
asyncio.run(interactive_chat())
Loading