Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions examples/streaming-demo/streaming-tool-example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
import { z } from 'zod';
import {
run,
runStream, // NEW: Use runStream for real-time streaming!
Agent,
RunConfig,
Tool,
createTraceId,
createRunId,
Message,
Streaming,
agentAsTool
agentAsTool,
TraceEvent
} from '../../src/index.js';
import type { StreamProvider } from '../../src/streaming/types.js';

Expand Down Expand Up @@ -500,14 +502,53 @@ async function main() {
console.log('Running streaming tool demo...\n');
console.log('-'.repeat(60));

// Run the agent
// ========== NEW: Real-time streaming with runStream ==========
// With the new subscribe() support, runStream automatically merges
// JAF events + tool-pushed events into a single real-time stream!

console.log('\n🔴 REAL-TIME MODE: Events arrive immediately as tools push them!\n');

// Run the agent with runStream (events arrive in real-time!)
try {
const result = await run(initialState, config);
let result;

// Use for-await to consume events in real-time
for await (const event of runStream(initialState, config)) {
// Events arrive immediately as they happen (NOT buffered!)
const timestamp = new Date().toISOString().substr(11, 12);

// Check if it's a custom tool event (pushed via streamProvider)
if (event.type.startsWith('tool_partial') ||
event.type.startsWith('tool_streaming') ||
event.type.startsWith('tool_progress') ||
event.type.startsWith('subagent')) {
console.log(` 🔧 [${timestamp}] Tool Event: ${event.type}`);
if (event.data && typeof event.data === 'object') {
const data = event.data as any;
if (data.progress?.percentage) {
console.log(` Progress: ${data.progress.percentage}%`);
}
if (data.partialResult) {
console.log(` Partial: ${JSON.stringify(data.partialResult).substring(0, 50)}...`);
}
}
} else {
// JAF framework event
console.log(` 📋 [${timestamp}] JAF Event: ${event.type}`);
}

// Capture final result when run ends
if (event.type === 'run_end') {
result = (event.data as any);
}
}

const finalResult = result;

console.log('-'.repeat(60));
console.log('\n📊 Run Result:');
console.log(` Status: ${result.outcome.status}`);
console.log(` Turns: ${result.finalState.turnCount}`);
console.log(` Status: ${finalResult?.outcome?.status || 'unknown'}`);
console.log(` Turns: ${finalResult?.finalState?.turnCount || 'unknown'}`);

// Get ALL events (JAF events + tool custom events)
const allEvents = streamProvider.getEvents(sessionId);
Expand Down Expand Up @@ -551,26 +592,35 @@ async function main() {
await streamProvider.close();
console.log('\n✅ Demo complete!\n');

console.log('='.repeat(60));
console.log('='.repeat(60));
console.log('KEY TAKEAWAY');
console.log('='.repeat(60));
console.log(`
✅ Tools can push custom events during execution!
✅ NEW: Events are NOT buffered - they arrive IMMEDIATELY!

Pattern:
1. Add streamProvider to your context type
2. Pass it when creating the context
3. Tools access it via context.streamProvider
4. Push events: await context.streamProvider.push(sessionId, event)
5. NEW: Use runStream() to receive events in REAL-TIME!

NEW Real-Time Streaming:
for await (const event of runStream(state, config)) {
// Events arrive IMMEDIATELY as tools push them
// No more waiting until tool execution completes!
console.log(event.type, event.data);
}

Use Cases:
• Streaming API responses (like GPT)
• Progress updates for long operations
• Partial results from search
• Multi-step process tracking
• Real-time status updates
• Streaming API responses (like GPT) - see progress immediately!
• Progress updates for long operations - real-time status!
• Partial results from search - show results as they come!
• Multi-step process tracking - live step updates!
• Real-time status updates - no buffering!

All events (JAF + custom) go to the same stream!
All events (JAF + custom) go to the same stream in REAL-TIME!
`);
}

Expand Down
32 changes: 32 additions & 0 deletions src/core/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ export async function* runStream<Ctx, Out>(
): AsyncGenerator<TraceEvent, void, unknown> {
const stream = createAsyncEventStream<TraceEvent>();

// Check if context has streamProvider for tool event streaming
const context = initialState.context as any;
const streamProvider = context?.streamProvider;
const sessionId = context?.sessionId;
let toolEventSubscription: AsyncGenerator<any, void, unknown> | null = null;

// Subscribe to tool events if streamProvider available
if (streamProvider?.subscribe && sessionId) {
toolEventSubscription = streamProvider.subscribe(sessionId);

// Start async task to forward tool events to the stream
(async () => {
try {
for await (const toolEvent of toolEventSubscription!) {
// Convert StreamEvent to TraceEvent format
const traceEvent: TraceEvent = {
type: toolEvent.eventType as any,
data: toolEvent.data
};
try { stream.push(traceEvent); } catch { /* ignore */ }
}
} catch { /* ignore subscription errors */ }
})();
}

const onEvent = async (event: TraceEvent) => {
// First, let the stream consumer handle it (can modify before events)
let eventResult: any;
Expand Down Expand Up @@ -248,6 +273,10 @@ export async function* runStream<Ctx, Out>(

const runPromise = run<Ctx, Out>(initialState, { ...config, onEvent });
void runPromise.finally(() => {
// Clean up tool event subscription
if (toolEventSubscription) {
toolEventSubscription.return?.();
}
stream.end();
});

Expand All @@ -256,6 +285,9 @@ export async function* runStream<Ctx, Out>(
yield event;
}
} finally {
if (toolEventSubscription) {
toolEventSubscription.return?.();
}
await runPromise.catch(() => undefined);
}
}
Expand Down
34 changes: 17 additions & 17 deletions src/streaming/event-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ const createStreamEvent = (
): StreamEvent => {
// Determine event type (use mapping if provided)
const eventType = config.eventMapping?.[event.type] ?? event.type;

// Transform data if transformer provided
const eventData = event.data as Record<string, unknown>;
const data = config.eventTransformer
const data = config.eventTransformer
? config.eventTransformer(event)
: eventData;

// Extract traceId and runId from data (JAF stores them inside data)
const traceId = eventData?.traceId as string | undefined;
const runId = eventData?.runId as string | undefined;
const agentName = eventData?.agentName as string | undefined;

return {
eventType,
data,
Expand All @@ -59,11 +59,11 @@ const extractSessionId = <Ctx>(
if (typeof config.sessionId === 'string') {
return config.sessionId;
}

if (typeof config.sessionId === 'function') {
return config.sessionId(context, event);
}

return undefined;
};

Expand Down Expand Up @@ -92,13 +92,13 @@ export function withStreamOutput<Ctx = unknown>(
...DEFAULT_STREAM_OUTPUT_CONFIG,
...config
};

return (event: TraceEvent, context?: Ctx): void => {
// Apply event filter if provided
if (mergedConfig.eventFilter && !mergedConfig.eventFilter(event)) {
return;
}

// Extract session ID
const sessionId = extractSessionId(mergedConfig, context as Ctx, event);
if (!sessionId) {
Expand All @@ -108,13 +108,13 @@ export function withStreamOutput<Ctx = unknown>(
);
return;
}

// Create stream event
const streamEvent = createStreamEvent(event, mergedConfig as StreamOutputConfig<unknown>);

// Push to stream (fire and forget by default)
const pushPromise = provider.push(sessionId, streamEvent);

if (mergedConfig.blocking) {
// Blocking mode - wait for push to complete
pushPromise.then(result => {
Expand Down Expand Up @@ -212,16 +212,16 @@ export function createConsoleEventHandler(options?: {
prefix?: string;
}): (event: TraceEvent) => void {
const { eventFilter, pretty = true, prefix = '[JAF:EVENT]' } = options ?? {};

return (event: TraceEvent): void => {
if (eventFilter && !eventFilter(event)) {
return;
}

const output = pretty
? JSON.stringify(event, null, 2)
: JSON.stringify(event);

safeConsole.log(`${prefix} ${event.type}:`, output);
};
}
Expand Down Expand Up @@ -252,12 +252,12 @@ export const EventFilters = {
'tool_call_end',
'before_tool_execution'
]),

/** All message events */
messageEvents: filterEventsByType([
'assistant_message'
]),

/** All control flow events */
controlEvents: filterEventsByType([
'run_start',
Expand All @@ -266,7 +266,7 @@ export const EventFilters = {
'turn_end',
'handoff'
]),

/** Events typically pushed to external streams (like your Python implementation) */
externalStreamEvents: filterEventsByType([
'tool_call_start',
Expand Down
Loading