From 314d074c61d0c0ca2c244d5d1c6593b6766d2c3f Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:58:12 -0300 Subject: [PATCH 1/7] Context summarization feature implementation. --- src/pipecat/frames/frames.py | 50 +++ .../aggregators/llm_context_summarizer.py | 315 ++++++++++++++ .../aggregators/llm_response_universal.py | 52 ++- src/pipecat/services/anthropic/llm.py | 8 +- src/pipecat/services/aws/llm.py | 10 +- src/pipecat/services/google/llm.py | 10 +- src/pipecat/services/llm_service.py | 136 +++++- src/pipecat/services/openai/base_llm.py | 14 +- src/pipecat/utils/context/__init__.py | 0 .../context/llm_context_summarization.py | 396 ++++++++++++++++++ 10 files changed, 984 insertions(+), 7 deletions(-) create mode 100644 src/pipecat/processors/aggregators/llm_context_summarizer.py create mode 100644 src/pipecat/utils/context/__init__.py create mode 100644 src/pipecat/utils/context/llm_context_summarization.py diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 7b821a9bc4..5634d79ee2 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -1991,6 +1991,56 @@ def __post_init__(self): self.skip_tts = None +@dataclass +class LLMContextSummaryRequestFrame(ControlFrame): + """Frame requesting context summarization from an LLM service. + + Sent by aggregators to LLM services when conversation context needs to be + compressed. The LLM service generates a summary of older messages while + preserving recent conversation history. + + Parameters: + request_id: Unique identifier to match this request with its response. + Used to handle async responses and avoid race conditions. + context: The full LLM context containing all messages to analyze and summarize. + min_messages_to_keep: Number of recent messages to preserve uncompressed. + These messages will not be included in the summary. + target_context_tokens: Maximum token size for the generated summary. This value + is passed directly to the LLM as the max_tokens parameter when generating + the summary text. + summarization_prompt: System prompt instructing the LLM how to generate + the summary. + """ + + request_id: str + context: "LLMContext" + min_messages_to_keep: int + target_context_tokens: int + summarization_prompt: str + + +@dataclass +class LLMContextSummaryResultFrame(ControlFrame, UninterruptibleFrame): + """Frame containing the result of context summarization. + + Sent by LLM services back to aggregators after generating a summary. + Contains the formatted summary message and metadata about what was summarized. + + Parameters: + request_id: Identifier matching the original request. Used to correlate + async responses. + summary: The formatted summary message ready to be inserted into context. + last_summarized_index: Index (0-based) of the last message that was + included in the summary. Messages after this index are preserved. + error: Error message if summarization failed, None on success. + """ + + request_id: str + summary: str + last_summarized_index: int + error: Optional[str] = None + + @dataclass class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame): """Frame signaling that a function call is currently executing. diff --git a/src/pipecat/processors/aggregators/llm_context_summarizer.py b/src/pipecat/processors/aggregators/llm_context_summarizer.py new file mode 100644 index 0000000000..a1a613ccc9 --- /dev/null +++ b/src/pipecat/processors/aggregators/llm_context_summarizer.py @@ -0,0 +1,315 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""This module defines a summarizer for managing LLM context summarization.""" + +import uuid +from typing import Optional + +from loguru import logger + +from pipecat.frames.frames import ( + Frame, + InterruptionFrame, + LLMContextSummaryRequestFrame, + LLMContextSummaryResultFrame, + LLMFullResponseStartFrame, +) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.utils.asyncio.task_manager import BaseTaskManager +from pipecat.utils.base_object import BaseObject +from pipecat.utils.context.llm_context_summarization import ( + LLMContextSummarizationConfig, + LLMContextSummarizationUtil, +) + + +class LLMContextSummarizer(BaseObject): + """Summarizer for managing LLM context summarization. + + This class manages automatic context summarization when token or message + limits are reached. It monitors the LLM context size, triggers + summarization requests, and applies the results to compress conversation history. + + Event handlers available: + + - on_request_summarization: Emitted when summarization should be triggered. + The aggregator should broadcast this frame to the LLM service. + + Example:: + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame: LLMContextSummaryRequestFrame): + await aggregator.broadcast_frame( + LLMContextSummaryRequestFrame, + request_id=frame.request_id, + context=frame.context, + ... + ) + """ + + def __init__( + self, + *, + context: LLMContext, + config: Optional[LLMContextSummarizationConfig] = None, + ): + """Initialize the context summarizer. + + Args: + context: The LLM context to monitor and summarize. + config: Configuration for summarization behavior. If None, uses default config. + """ + super().__init__() + + self._context = context + self._config = config or LLMContextSummarizationConfig() + + self._task_manager: Optional[BaseTaskManager] = None + + self._summarization_in_progress = False + self._pending_summary_request_id: Optional[str] = None + + self._register_event_handler("on_request_summarization", sync=True) + + @property + def task_manager(self) -> BaseTaskManager: + """Returns the configured task manager.""" + if not self._task_manager: + raise RuntimeError(f"{self} context summarizer was not properly setup") + return self._task_manager + + async def setup(self, task_manager: BaseTaskManager): + """Initialize the summarizer with the given task manager. + + Args: + task_manager: The task manager to be associated with this instance. + """ + self._task_manager = task_manager + + async def cleanup(self): + """Cleanup the summarizer.""" + await super().cleanup() + await self._clear_summarization_state() + + async def process_frame(self, frame: Frame): + """Process an incoming frame to detect when summarization is needed. + + Args: + frame: The frame to be processed. + """ + if isinstance(frame, LLMFullResponseStartFrame): + await self._handle_llm_response_start(frame) + elif isinstance(frame, LLMContextSummaryResultFrame): + await self._handle_summary_result(frame) + elif isinstance(frame, InterruptionFrame): + await self._handle_interruption() + + async def _handle_llm_response_start(self, frame: LLMFullResponseStartFrame): + """Handle LLM response start to check if summarization is needed. + + Args: + frame: The LLM response start frame. + """ + if self._should_summarize(): + await self._request_summarization() + + async def _handle_interruption(self): + """Handle interruption by canceling summarization in progress. + + Args: + frame: The interruption frame. + """ + # Reset summarization state to allow new requests. This is necessary because + # the request frame (LLMContextSummaryRequestFrame) may have been cancelled + # during interruption. We preserve _pending_summary_request_id to handle the + # response frame (LLMContextSummaryResultFrame), which is uninterruptible and + # will still be delivered. + self._summarization_in_progress = False + + async def _clear_summarization_state(self): + """Cancel pending summarization.""" + if self._summarization_in_progress: + logger.debug(f"{self}: Clearing pending summarization") + self._summarization_in_progress = False + self._pending_summary_request_id = None + + def _should_summarize(self) -> bool: + """Determine if context summarization should be triggered. + + Evaluates whether the current context has reached either the token + threshold or message count threshold that warrants compression. + + Returns: + True if all conditions are met: + - No summarization currently in progress + - AND either: + - Token count exceeds max_context_tokens + - OR message count exceeds max_unsummarized_messages since last summary + """ + logger.trace(f"{self}: Checking if context summarization is needed") + + if self._summarization_in_progress: + logger.debug(f"{self}: Summarization already in progress") + return False + + # Estimate tokens in context + total_tokens = LLMContextSummarizationUtil.estimate_context_tokens(self._context) + num_messages = len(self._context.messages) + + # Check if we've reached the token limit + token_limit = self._config.max_context_tokens + token_limit_exceeded = total_tokens >= token_limit + + # Check if we've exceeded max unsummarized messages + messages_since_summary = len(self._context.messages) - 1 + message_threshold_exceeded = ( + messages_since_summary >= self._config.max_unsummarized_messages + ) + + logger.trace( + f"{self}: Context has {num_messages} messages, " + f"~{total_tokens} tokens (limit: {token_limit}), " + f"{messages_since_summary} messages since last summary " + f"(message threshold: {self._config.max_unsummarized_messages})" + ) + + # Trigger if either limit is exceeded + if not token_limit_exceeded and not message_threshold_exceeded: + logger.trace( + f"{self}: Neither token limit nor message threshold exceeded, skipping summarization" + ) + return False + + reason = [] + if token_limit_exceeded: + reason.append(f"~{total_tokens} tokens (>={token_limit} limit)") + if message_threshold_exceeded: + reason.append( + f"{messages_since_summary} messages (>={self._config.max_unsummarized_messages} threshold)" + ) + + logger.debug(f"{self}: ✓ Summarization needed - {', '.join(reason)}") + return True + + async def _request_summarization(self): + """Request context summarization from LLM service. + + Creates a summarization request frame and emits it via event handler. + Tracks the request ID to match async responses and prevent race conditions. + """ + # Generate unique request ID + request_id = str(uuid.uuid4()) + min_keep = self._config.min_messages_after_summary + + # Mark summarization in progress + self._summarization_in_progress = True + self._pending_summary_request_id = request_id + + logger.debug(f"{self}: Sending summarization request (request_id={request_id})") + + # Create the request frame + request_frame = LLMContextSummaryRequestFrame( + request_id=request_id, + context=self._context, + min_messages_to_keep=min_keep, + target_context_tokens=self._config.target_context_tokens, + summarization_prompt=self._config.summary_prompt, + ) + + # Emit event for aggregator to broadcast + await self._call_event_handler("on_request_summarization", request_frame) + + async def _handle_summary_result(self, frame: LLMContextSummaryResultFrame): + """Handle context summarization result from LLM service. + + Processes the summary result by validating the request ID, checking for + errors, validating context state, and applying the summary. + + Args: + frame: The summary result frame containing the generated summary. + """ + logger.debug(f"{self}: Received summary result (request_id={frame.request_id})") + + # Check if this is the result we're waiting for + if frame.request_id != self._pending_summary_request_id: + logger.debug(f"{self}: Ignoring stale summary result (request_id={frame.request_id})") + return + + # Clear pending state + await self._clear_summarization_state() + + # Check for errors + if frame.error: + logger.error(f"{self}: Context summarization failed: {frame.error}") + return + + # Validate context state + if not self._validate_summary_context(frame.last_summarized_index): + logger.warning(f"{self}: Context state changed, skipping summary application") + return + + # Apply summary + await self._apply_summary(frame.summary, frame.last_summarized_index) + + def _validate_summary_context(self, last_summarized_index: int) -> bool: + """Validate that context state is still valid for applying summary. + + Args: + last_summarized_index: The index of the last summarized message. + + Returns: + True if the context state is still consistent with the summary. + """ + if last_summarized_index < 0: + return False + + # Check if we still have enough messages + if last_summarized_index >= len(self._context.messages): + return False + + min_keep = self._config.min_messages_after_summary + remaining = len(self._context.messages) - 1 - last_summarized_index + if remaining < min_keep: + return False + + return True + + async def _apply_summary(self, summary: str, last_summarized_index: int): + """Apply summary to compress the conversation context. + + Reconstructs the context with: + [first_system_message] + [summary_message] + [recent_messages] + + Args: + summary: The generated summary text. + last_summarized_index: Index of the last message that was summarized. + """ + messages = self._context.messages + + # Find the first system message to preserve + first_system_msg = next((msg for msg in messages if msg.get("role") == "system"), None) + + # Get recent messages to keep + recent_messages = messages[last_summarized_index + 1 :] + + # Create summary message as an assistant message + summary_message = {"role": "assistant", "content": f"Conversation summary: {summary}"} + + # Reconstruct context + new_messages = [] + if first_system_msg: + new_messages.append(first_system_msg) + new_messages.append(summary_message) + new_messages.extend(recent_messages) + + # Update context + self._context.set_messages(new_messages) + + logger.info( + f"{self}: Applied context summary, compressed {last_summarized_index + 1} messages " + f"into summary. Context now has {len(new_messages)} messages (was {len(messages)})" + ) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 574af7bbf6..6b28b5bf22 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -37,6 +37,7 @@ InterruptionFrame, LLMContextAssistantTimestampFrame, LLMContextFrame, + LLMContextSummaryRequestFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMMessagesAppendFrame, @@ -68,6 +69,7 @@ LLMSpecificMessage, NotGiven, ) +from pipecat.processors.aggregators.llm_context_summarizer import LLMContextSummarizer from pipecat.processors.frame_processor import FrameCallback, FrameDirection, FrameProcessor from pipecat.turns.user_idle_controller import UserIdleController from pipecat.turns.user_mute import BaseUserMuteStrategy @@ -76,6 +78,7 @@ from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig from pipecat.turns.user_turn_controller import UserTurnController from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies +from pipecat.utils.context.llm_context_summarization import LLMContextSummarizationConfig from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text from pipecat.utils.time import time_now_iso8601 @@ -121,9 +124,17 @@ class LLMAssistantAggregatorParams: in text frames by adding spaces between tokens. This parameter is ignored when used with the newer LLMAssistantAggregator, which handles word spacing automatically. + enable_context_summarization: Enable automatic context summarization when token + limits are reached (disabled by default). When enabled, older conversation + messages are automatically compressed into summaries to manage context size. + context_summarization_config: Configuration for context summarization behavior. + Controls thresholds, message preservation, and summarization prompts. If None + and summarization is enabled, uses default configuration values. """ expect_stripped_words: bool = True + enable_context_summarization: bool = False + context_summarization_config: Optional[LLMContextSummarizationConfig] = None @dataclass @@ -807,6 +818,17 @@ def __init__( self._thought_aggregation: List[TextPartForConcatenation] = [] self._thought_start_time: str = "" + # Context summarization + self._summarizer: Optional[LLMContextSummarizer] = None + if self._params.enable_context_summarization: + self._summarizer = LLMContextSummarizer( + context=self._context, + config=self._params.context_summarization_config, + ) + self._summarizer.add_event_handler( + "on_request_summarization", self._on_request_summarization + ) + self._register_event_handler("on_assistant_turn_started") self._register_event_handler("on_assistant_turn_stopped") self._register_event_handler("on_assistant_thought") @@ -840,7 +862,12 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): """ await super().process_frame(frame, direction) - if isinstance(frame, InterruptionFrame): + if isinstance(frame, StartFrame): + # Push StartFrame before start(), because we want StartFrame to be + # processed by every processor before any other frame is processed. + await self.push_frame(frame, direction) + await self._start(frame) + elif isinstance(frame, InterruptionFrame): await self._handle_interruptions(frame) await self.push_frame(frame, direction) elif isinstance(frame, (EndFrame, CancelFrame)): @@ -883,6 +910,14 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): else: await self.push_frame(frame, direction) + # Pass frames to summarizer for monitoring + if self._summarizer: + await self._summarizer.process_frame(frame) + + async def _start(self, frame: StartFrame): + if self._summarizer: + await self._summarizer.setup(self.task_manager) + async def push_aggregation(self) -> str: """Push the current assistant aggregation with timestamp.""" if not self._aggregation: @@ -921,6 +956,8 @@ async def _handle_interruptions(self, frame: InterruptionFrame): async def _handle_end_or_cancel(self, frame: Frame): await self._trigger_assistant_turn_stopped() + if self._summarizer: + await self._summarizer.cleanup() async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame): function_names = [f"{f.function_name}:{f.tool_call_id}" for f in frame.function_calls] @@ -1197,6 +1234,19 @@ def _maybe_strip_turn_completion_markers(self, text: str) -> str: # Only strip whitespace if we removed a marker return text.strip() if marker_found else text + async def _on_request_summarization( + self, summarizer: LLMContextSummarizer, frame: LLMContextSummaryRequestFrame + ): + """Handle summarization request from the summarizer. + + Push the request frame UPSTREAM to the LLM service for processing. + + Args: + summarizer: The summarizer that generated the request. + frame: The summarization request frame to broadcast. + """ + await self.push_frame(frame, FrameDirection.UPSTREAM) + class LLMContextAggregatorPair: """Pair of LLM context aggregators for updating context with user and assistant messages.""" diff --git a/src/pipecat/services/anthropic/llm.py b/src/pipecat/services/anthropic/llm.py index b184ea29d7..a21296fe3f 100644 --- a/src/pipecat/services/anthropic/llm.py +++ b/src/pipecat/services/anthropic/llm.py @@ -261,11 +261,15 @@ async def _create_message_stream(self, api_call, params): response = await api_call(**params) return response - async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]: + async def run_inference( + self, context: LLMContext | OpenAILLMContext, max_tokens: Optional[int] = None + ) -> Optional[str]: """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. If provided, + overrides the service's default max_tokens setting. Returns: The LLM's response as a string, or None if no response is generated. @@ -290,7 +294,7 @@ async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optiona # Build params using the same method as streaming completions params = { "model": self.model_name, - "max_tokens": self._settings["max_tokens"], + "max_tokens": max_tokens if max_tokens is not None else self._settings["max_tokens"], "stream": False, "temperature": self._settings["temperature"], "top_k": self._settings["top_k"], diff --git a/src/pipecat/services/aws/llm.py b/src/pipecat/services/aws/llm.py index 32562109a5..1778ae74e2 100644 --- a/src/pipecat/services/aws/llm.py +++ b/src/pipecat/services/aws/llm.py @@ -844,11 +844,15 @@ def _build_inference_config(self) -> Dict[str, Any]: inference_config["topP"] = self._settings["top_p"] return inference_config - async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]: + async def run_inference( + self, context: LLMContext | OpenAILLMContext, max_tokens: Optional[int] = None + ) -> Optional[str]: """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. If provided, + overrides the service's default max_tokens setting. Returns: The LLM's response as a string, or None if no response is generated. @@ -868,6 +872,10 @@ async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optiona # Prepare request parameters using the same method as streaming inference_config = self._build_inference_config() + # Override maxTokens if provided + if max_tokens is not None: + inference_config["maxTokens"] = max_tokens + request_params = { "modelId": self.model_name, "messages": messages, diff --git a/src/pipecat/services/google/llm.py b/src/pipecat/services/google/llm.py index 0e7556f833..563acadb32 100644 --- a/src/pipecat/services/google/llm.py +++ b/src/pipecat/services/google/llm.py @@ -799,11 +799,15 @@ def create_client(self): """Create the Gemini client instance. Subclasses can override this.""" self._client = genai.Client(api_key=self._api_key, http_options=self._http_options) - async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]: + async def run_inference( + self, context: LLMContext | OpenAILLMContext, max_tokens: Optional[int] = None + ) -> Optional[str]: """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. If provided, + overrides the service's default max_tokens setting. Returns: The LLM's response as a string, or None if no response is generated. @@ -828,6 +832,10 @@ async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optiona system_instruction=system, tools=tools if tools else None ) + # Override max_output_tokens if provided + if max_tokens is not None: + generation_params["max_output_tokens"] = max_tokens + generation_config = GenerateContentConfig(**generation_params) # Use the new google-genai client's async method diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index c59b102b45..af7e691b06 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -39,6 +39,8 @@ FunctionCallsStartedFrame, InterruptionFrame, LLMConfigureOutputFrame, + LLMContextSummaryRequestFrame, + LLMContextSummaryResultFrame, LLMFullResponseEndFrame, LLMFullResponseStartFrame, LLMTextFrame, @@ -57,6 +59,9 @@ from pipecat.processors.frame_processor import FrameDirection from pipecat.services.ai_service import AIService from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionLLMServiceMixin +from pipecat.utils.context.llm_context_summarization import ( + LLMContextSummarizationUtil, +) # Type alias for a callable that handles LLM function calls. FunctionCallHandler = Callable[["FunctionCallParams"], Awaitable[None]] @@ -195,6 +200,7 @@ def __init__( self._sequential_runner_task: Optional[asyncio.Task] = None self._tracing_enabled: bool = False self._skip_tts: Optional[bool] = None + self._summary_task: Optional[asyncio.Task] = None self._register_event_handler("on_function_calls_started") self._register_event_handler("on_completion_timeout") @@ -218,13 +224,17 @@ def create_llm_specific_message(self, message: Any) -> LLMSpecificMessage: """ return self.get_llm_adapter().create_llm_specific_message(message) - async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]: + async def run_inference( + self, context: LLMContext | OpenAILLMContext, max_tokens: Optional[int] = None + ) -> Optional[str]: """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Must be implemented by subclasses. Args: context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. If provided, + overrides the service's default max_tokens/max_completion_tokens setting. Returns: The LLM's response as a string, or None if no response is generated. @@ -286,6 +296,7 @@ async def stop(self, frame: EndFrame): await super().stop(frame) if not self._run_in_parallel: await self._cancel_sequential_runner_task() + await self._cancel_summary_task() async def cancel(self, frame: CancelFrame): """Cancel the LLM service. @@ -296,6 +307,7 @@ async def cancel(self, frame: CancelFrame): await super().cancel(frame) if not self._run_in_parallel: await self._cancel_sequential_runner_task() + await self._cancel_summary_task() async def _update_settings(self, settings: Mapping[str, Any]): """Update LLM service settings. @@ -339,6 +351,8 @@ async def process_frame(self, frame: Frame, direction: FrameDirection): await self._handle_interruptions(frame) elif isinstance(frame, LLMConfigureOutputFrame): self._skip_tts = frame.skip_tts + elif isinstance(frame, LLMContextSummaryRequestFrame): + await self._handle_summary_request(frame) async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Pushes a frame. @@ -372,6 +386,121 @@ async def _handle_interruptions(self, _: InterruptionFrame): if entry.cancel_on_interruption: await self._cancel_function_call(function_name) + async def _handle_summary_request(self, frame: LLMContextSummaryRequestFrame): + """Handle context summarization request from aggregator. + + Processes a summarization request by generating a compressed summary + of conversation history. Uses the adapter to format the summary + according to the provider's requirements. Broadcasts the result back + to the aggregator for context reconstruction. + + Args: + frame: The summary request frame containing context and parameters. + """ + logger.debug(f"{self}: Processing summarization request {frame.request_id}") + + # Create a background task to generate the summary without blocking + self._summary_task = self.create_task(self._generate_summary_task(frame)) + + async def _generate_summary_task(self, frame: LLMContextSummaryRequestFrame): + """Background task to generate summary without blocking the pipeline. + + Args: + frame: The summary request frame containing context and parameters. + """ + summary = "" + last_index = -1 + error = None + + try: + summary, last_index = await self._generate_summary(frame) + except Exception as e: + error = f"Error generating context summary: {e}" + await self.push_error(error, exception=e) + + await self.broadcast_frame( + LLMContextSummaryResultFrame, + request_id=frame.request_id, + summary=summary, + last_summarized_index=last_index, + error=error, + ) + + self._summary_task = None + + async def _generate_summary(self, frame: LLMContextSummaryRequestFrame) -> tuple[str, int]: + """Generate a compressed summary of conversation context. + + Uses the message selection logic to identify which messages + to summarize, formats them as a transcript, and invokes the LLM to + generate a concise summary. The summary is formatted according to the + LLM provider's requirements using the adapter. + + Args: + frame: The summary request frame containing context and configuration. + + Returns: + Tuple of (formatted summary message, last_summarized_index). + + Raises: + RuntimeError: If there are no messages to summarize, the service doesn't + support run_inference(), or the LLM returns an empty summary. + + Note: + Requires the service to implement run_inference() method for + synchronous LLM calls. + """ + # Get messages to summarize using utility method + result = LLMContextSummarizationUtil.get_messages_to_summarize( + frame.context, frame.min_messages_to_keep + ) + + if not result.messages: + logger.debug(f"{self}: No messages to summarize") + raise RuntimeError("No messages to summarize") + + logger.debug( + f"{self}: Generating summary for {len(result.messages)} messages " + f"(index 0 to {result.last_summarized_index}), " + f"target_context_tokens={frame.target_context_tokens}" + ) + + # Create summary context + transcript = LLMContextSummarizationUtil.format_messages_for_summary(result.messages) + prompt_messages = [ + { + "role": "system", + "content": frame.summarization_prompt, + }, + { + "role": "user", + "content": f"Conversation history:\n{transcript}", + }, + ] + summary_context = LLMContext(messages=prompt_messages) + + # Generate summary using run_inference + # This will be overridden by each LLM service implementation + try: + summary_text = await self.run_inference( + summary_context, max_tokens=frame.target_context_tokens + ) + except NotImplementedError: + raise RuntimeError( + f"LLM service {self.__class__.__name__} does not implement run_inference" + ) + + if not summary_text: + raise RuntimeError("LLM returned empty summary") + + summary_text = summary_text.strip() + logger.info( + f"{self}: Generated summary of {len(summary_text)} characters " + f"for {len(result.messages)} messages" + ) + + return summary_text, result.last_summarized_index + def register_function( self, function_name: Optional[str], @@ -588,6 +717,11 @@ async def _cancel_sequential_runner_task(self): await self.cancel_task(self._sequential_runner_task) self._sequential_runner_task = None + async def _cancel_summary_task(self): + if self._summary_task: + await self.cancel_task(self._summary_task) + self._summary_task = None + async def _sequential_runner_handler(self): while True: runner_item = await self._sequential_runner_queue.get() diff --git a/src/pipecat/services/openai/base_llm.py b/src/pipecat/services/openai/base_llm.py index 54e5145083..d8669f6224 100644 --- a/src/pipecat/services/openai/base_llm.py +++ b/src/pipecat/services/openai/base_llm.py @@ -265,11 +265,15 @@ def build_chat_completion_params(self, params_from_context: OpenAILLMInvocationP params.update(self._settings["extra"]) return params - async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optional[str]: + async def run_inference( + self, context: LLMContext | OpenAILLMContext, max_tokens: Optional[int] = None + ) -> Optional[str]: """Run a one-shot, out-of-band (i.e. out-of-pipeline) inference with the given LLM context. Args: context: The LLM context containing conversation history. + max_tokens: Optional maximum number of tokens to generate. If provided, + overrides the service's default max_tokens/max_completion_tokens setting. Returns: The LLM's response as a string, or None if no response is generated. @@ -291,6 +295,14 @@ async def run_inference(self, context: LLMContext | OpenAILLMContext) -> Optiona params["stream"] = False params.pop("stream_options", None) + # Override max_tokens if provided + if max_tokens is not None: + # Use max_completion_tokens for newer models, fallback to max_tokens + if "max_completion_tokens" in params: + params["max_completion_tokens"] = max_tokens + else: + params["max_tokens"] = max_tokens + # LLM completion response = await self._client.chat.completions.create(**params) diff --git a/src/pipecat/utils/context/__init__.py b/src/pipecat/utils/context/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/pipecat/utils/context/llm_context_summarization.py b/src/pipecat/utils/context/llm_context_summarization.py new file mode 100644 index 0000000000..6865a00d9f --- /dev/null +++ b/src/pipecat/utils/context/llm_context_summarization.py @@ -0,0 +1,396 @@ +# +# Copyright (c) 2024–2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Utility for context summarization in LLM services. + +This module provides reusable functionality for automatically compressing conversation +context when token limits are reached, enabling efficient long-running conversations. +""" + +from dataclasses import dataclass +from typing import List, Optional + +from loguru import logger + +from pipecat.processors.aggregators.llm_context import LLMContext + +# Token estimation constants +CHARS_PER_TOKEN = 4 # Industry-standard heuristic: 1 token ≈ 4 characters +TOKEN_OVERHEAD_PER_MESSAGE = 10 # Estimated structural overhead per message +IMAGE_TOKEN_ESTIMATE = 500 # Rough estimate for image content +SUMMARY_TOKEN_BUFFER = 0.8 # Keep summary at 80% of available space for safety +MIN_SUMMARY_TOKENS = 100 # Minimum tokens to allocate for summary + +DEFAULT_SUMMARIZATION_PROMPT = """You are summarizing a conversation between a user and an AI assistant. + +Your task: +1. Create a concise summary that preserves: + - Key facts, decisions, and agreements + - Important context needed to continue the conversation + - User preferences and requirements mentioned + - Any unresolved questions or action items + +2. Format: + - Use clear, factual statements + - Group related information + - Prioritize information likely to be referenced later + - Keep the summary concise to fit within the specified token budget + +3. Omit: + - Greetings and small talk + - Redundant information + - Tangential discussions that were resolved + +The conversation transcript follows. Generate only the summary, no other text.""" + + +@dataclass +class LLMContextSummarizationConfig: + """Configuration for context summarization behavior. + + Controls when and how conversation context is automatically compressed + to manage token limits in long-running conversations. + + Parameters: + max_context_tokens: Maximum allowed context size in tokens. When this + limit is reached, summarization is triggered to compress the context. + The tokens are calculated using the industry-standard approximation + of 1 token ≈ 4 characters. + target_context_tokens: Maximum token size for the generated summary. + This value is passed directly to the LLM as the max_tokens parameter + when generating the summary. Should be sized appropriately to allow + the summary plus recent preserved messages to fit within reasonable + context limits. + max_unsummarized_messages: Maximum number of new messages that can + accumulate since the last summary before triggering a new + summarization. This ensures regular compression even if token + limits are not reached. + min_messages_after_summary: Number of recent messages to preserve + uncompressed after each summarization. These messages maintain + immediate conversational context. + summarization_prompt: Custom prompt for the LLM to use when generating + summaries. If None, uses DEFAULT_SUMMARIZATION_PROMPT. + """ + + max_context_tokens: int = 8000 + target_context_tokens: int = 6000 + max_unsummarized_messages: int = 20 + min_messages_after_summary: int = 4 + summarization_prompt: Optional[str] = None + + def __post_init__(self): + """Validate configuration parameters.""" + if self.max_context_tokens <= 0: + raise ValueError("max_context_tokens must be positive") + if self.target_context_tokens <= 0: + raise ValueError("target_context_tokens must be positive") + + # Auto-adjust target_context_tokens if it exceeds max_context_tokens + if self.target_context_tokens > self.max_context_tokens: + # Use 80% of max_context_tokens as a reasonable default + self.target_context_tokens = int(self.max_context_tokens * 0.8) + + if self.max_unsummarized_messages < 1: + raise ValueError("max_unsummarized_messages must be at least 1") + if self.min_messages_after_summary < 0: + raise ValueError("min_messages_after_summary must be positive") + + @property + def summary_prompt(self) -> str: + """Get the summarization prompt to use. + + Returns: + The custom prompt if set, otherwise the default summarization prompt. + """ + return self.summarization_prompt or DEFAULT_SUMMARIZATION_PROMPT + + +@dataclass +class LLMMessagesToSummarize: + """Result of get_messages_to_summarize operation. + + Parameters: + messages: Messages to include in the summary + last_summarized_index: Index of the last message being summarized + """ + + messages: List[dict] + last_summarized_index: int + + +class LLMContextSummarizationUtil: + """Utility providing context summarization capabilities for LLM processing. + + This utility enables automatic conversation context compression when token + limits are reached. It provides functionality for both aggregators + (which decide when to summarize) and LLM services (which generate summaries). + + Key features: + - Token estimation using character-count heuristics (chars // 4) + - Smart message selection (preserves system messages and recent context) + - Function call awareness (avoids summarizing incomplete tool interactions) + - Flexible transcript formatting for summarization + - Maximum summary token calculation with safety buffers + + Usage: + Use the static methods directly on the class: + + tokens = LLMContextSummarizationUtil.estimate_context_tokens(context) + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 4) + transcript = LLMContextSummarizationUtil.format_messages_for_summary(messages) + + Note: + Token estimation uses the industry-standard heuristic of 1 token ≈ 4 characters. + """ + + @staticmethod + def estimate_tokens(text: str) -> int: + """Estimate token count for text using character count heuristic. + + Uses the industry-standard approximation of 1 token ≈ 4 characters. + This works well across different content types (prose, code, etc.) + and languages. + + Note: + For more accurate token counts, use the model's official tokenizer. + This is a rough estimate suitable for threshold checks and budgeting. + + Args: + text: Text to estimate tokens for + + Returns: + Estimated token count (characters // 4) + """ + if not text: + return 0 + return len(text) // CHARS_PER_TOKEN + + @staticmethod + def estimate_context_tokens(context: LLMContext) -> int: + """Estimate total token count for a context. + + Calculates an approximate token count by analyzing all messages, + including text content, tool calls, and structural overhead. + + Args: + context: LLM context to estimate. + + Returns: + Estimated total token count including: + - Message content (text, images) + - Tool calls and their arguments + - Tool results + - Structural overhead (TOKEN_OVERHEAD_PER_MESSAGE per message) + """ + total = 0 + + for message in context.messages: + # Role and structure overhead + total += TOKEN_OVERHEAD_PER_MESSAGE + + # Message content + content = message.get("content", "") + if isinstance(content, str): + total += LLMContextSummarizationUtil.estimate_tokens(content) + elif isinstance(content, list): + for item in content: + if isinstance(item, dict): + item_type = item.get("type", "") + # Text content + if item_type == "text": + total += LLMContextSummarizationUtil.estimate_tokens( + item.get("text", "") + ) + # Image content + elif item_type in ("image_url", "image"): + # Images are expensive, rough estimate + total += IMAGE_TOKEN_ESTIMATE + + # Tool calls + if "tool_calls" in message: + tool_calls = message["tool_calls"] + if isinstance(tool_calls, list): + for tool_call in tool_calls: + if isinstance(tool_call, dict): + func = tool_call.get("function", {}) + if isinstance(func, dict): + total += LLMContextSummarizationUtil.estimate_tokens( + func.get("name", "") + func.get("arguments", "") + ) + + # Tool call ID + if "tool_call_id" in message: + total += TOKEN_OVERHEAD_PER_MESSAGE + + return total + + @staticmethod + def _get_function_calls_in_progress_index(messages: List[dict], start_idx: int) -> int: + """Find the earliest message index with incomplete function calls. + + Scans messages to identify function/tool calls that haven't received + their results yet. This prevents summarizing incomplete tool interactions + which would break the request-response pairing. + + Args: + messages: List of messages to check. + start_idx: Index to start checking from. + + Returns: + Index of first message with function call in progress, or -1 if all + function calls are complete. + """ + # Track tool call IDs mapped to their message index + pending_tool_calls: dict[str, int] = {} + + for i in range(start_idx, len(messages)): + msg = messages[i] + role = msg.get("role") + + # Check for tool calls in assistant messages + if role == "assistant" and "tool_calls" in msg: + tool_calls = msg.get("tool_calls", []) + if isinstance(tool_calls, list): + for tool_call in tool_calls: + if isinstance(tool_call, dict): + tool_call_id = tool_call.get("id") + if tool_call_id: + pending_tool_calls[tool_call_id] = i + + # Check for tool results + if role == "tool": + tool_call_id = msg.get("tool_call_id") + if tool_call_id and tool_call_id in pending_tool_calls: + pending_tool_calls.pop(tool_call_id) + + # If we have pending tool calls, return the earliest index + if pending_tool_calls: + return min(pending_tool_calls.values()) + + return -1 + + @staticmethod + def get_messages_to_summarize( + context: LLMContext, min_messages_to_keep: int + ) -> LLMMessagesToSummarize: + """Determine which messages should be included in summarization. + + Intelligently selects messages for summarization while preserving: + - The first system message (defines assistant behavior) + - The last N messages (maintains immediate conversation context) + - Incomplete function call sequences (preserves tool interaction integrity) + + Args: + context: The LLM context containing all messages. + min_messages_to_keep: Number of recent messages to exclude from + summarization. + + Returns: + LLMMessagesToSummarize containing the messages to summarize and the + index of the last message included. + """ + messages = context.messages + if len(messages) <= min_messages_to_keep: + return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) + + # Find first system message index + first_system_index = next( + (i for i, msg in enumerate(messages) if msg.get("role") == "system"), -1 + ) + + # Messages to summarize are between first system and recent messages + # We exclude the first system message itself + if first_system_index >= 0: + summary_start = first_system_index + 1 + else: + summary_start = 0 + + # Get messages to keep (last N messages) + summary_end = len(messages) - min_messages_to_keep + + if summary_start >= summary_end: + return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) + + # Check for function calls in progress in the range we want to summarize + function_call_start = LLMContextSummarizationUtil._get_function_calls_in_progress_index( + messages, summary_start + ) + if function_call_start >= 0 and function_call_start < summary_end: + # Stop summarization before the function call + logger.debug( + f"ContextSummarization: Found function call in progress at index {function_call_start}, " + f"stopping summary before it (was going to summarize up to {summary_end})" + ) + # Count how many messages we're skipping + skipped_messages = summary_end - function_call_start + summary_end = function_call_start + if skipped_messages > 0: + logger.info( + f"ContextSummarization: Skipping {skipped_messages} messages with " + f"function calls in progress (will summarize after results are available)" + ) + + if summary_start >= summary_end: + return LLMMessagesToSummarize(messages=[], last_summarized_index=-1) + + messages_to_summarize = messages[summary_start:summary_end] + last_summarized_index = summary_end - 1 + + return LLMMessagesToSummarize( + messages=messages_to_summarize, last_summarized_index=last_summarized_index + ) + + @staticmethod + def format_messages_for_summary(messages: List[dict]) -> str: + """Format messages as a transcript for summarization. + + Args: + messages: Messages to format + + Returns: + Formatted transcript string + """ + transcript_parts = [] + + for msg in messages: + role = msg.get("role", "unknown") + content = msg.get("content", "") + + # Handle different content types + if isinstance(content, str): + text = content + elif isinstance(content, list): + text_parts = [] + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + text_parts.append(item.get("text", "")) + text = " ".join(text_parts) + else: + text = str(content) + + if text: + # Capitalize role for readability + formatted_role = role.upper() + transcript_parts.append(f"{formatted_role}: {text}") + + # Include tool calls if present + if "tool_calls" in msg: + tool_calls = msg.get("tool_calls", []) + if isinstance(tool_calls, list): + for tool_call in tool_calls: + if isinstance(tool_call, dict): + func = tool_call.get("function", {}) + if isinstance(func, dict): + name = func.get("name", "unknown") + args = func.get("arguments", "") + transcript_parts.append(f"TOOL_CALL: {name}({args})") + + # Include tool results + if role == "tool": + tool_call_id = msg.get("tool_call_id", "unknown") + transcript_parts.append(f"TOOL_RESULT[{tool_call_id}]: {text}") + + return "\n\n".join(transcript_parts) From 92b6ecd945497b35c2b3f5e450dc5946360e2301 Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:58:22 -0300 Subject: [PATCH 2/7] New Claude skill to help refactor and cleanup the code. --- .claude/skills/cleanup/SKILL.md | 307 ++++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 .claude/skills/cleanup/SKILL.md diff --git a/.claude/skills/cleanup/SKILL.md b/.claude/skills/cleanup/SKILL.md new file mode 100644 index 0000000000..f7dd6ea98e --- /dev/null +++ b/.claude/skills/cleanup/SKILL.md @@ -0,0 +1,307 @@ +# Code Cleanup Skill + +The **Code Cleanup Skill** reviews, refactors, and documents code changes in your current branch, ensuring alignment with **Pipecat’s architecture, coding standards, and example patterns**. +It focuses on **readability, correctness, performance, and consistency**, while avoiding breaking changes. + +--- + +## Skill Overview + +This skill analyzes all changes introduced in your branch and performs the following actions: + +1. **Analyze Branch Changes** + - Review uncommitted changes and outgoing commits +2. **Refactor for Readability** + - Improve clarity, naming, structure, and modern Python usage +3. **Enhance Performance** + - Identify safe, conservative optimization opportunities +4. **Add Documentation** + - Apply Pipecat-style, Google-format docstrings +5. **Ensure Pattern Consistency** + - Match existing Pipecat services, pipelines, and examples +6. **Validate Examples** + - Ensure examples follow foundational patterns (e.g. `07-interruptible.py`) + +--- + +## Usage + +Invoke the skill using any of the following commands: + +- “Clean up my branch code” +- “Refactor the changes in my branch” +- “Review and improve my branch code” +- `/cleanup` + +--- + +## What This Skill Does + +### 1. Analyze Branch Changes + +The skill retrieves all uncommitted changes and outgoing commits to understand: + +- New files added +- Modified files +- Code additions and deletions +- Overall scope and intent of changes + +--- + +### 2. Code Refactoring + +#### Readability Improvements + +- Replace tuples with named classes or dataclasses +- Improve variable, method, and class naming +- Extract complex logic into well-named helper methods +- Add missing type hints +- Simplify nested or complex conditionals +- Replace deprecated methods and features +- Normalize formatting to match Pipecat style + +#### Performance Enhancements + +- Identify inefficient loops or repeated work +- Suggest appropriate data structures +- Optimize async workflows and I/O +- Remove redundant operations + +> Performance changes are conservative and non-breaking. + +--- + +### 3. Documentation + +Documentation follows **Google-style docstrings**, consistent with Pipecat conventions. + +#### Class Documentation + +```python +class ExampleService: + """Brief one-line description. + + Detailed explanation of the class purpose, responsibilities, + and important behaviors. + + Supported features: + + - Feature 1 + - Feature 2 + - Feature 3 + """ +``` + +#### Method Documentation + +```python +def process_data(self, data: str, options: Optional[dict] = None) -> bool: + """Process incoming data with optional configuration. + + Args: + data: The input data to process. + options: Optional configuration dictionary. + + Returns: + True if processing succeeded, False otherwise. + + Raises: + ValueError: If data is empty or invalid. + """ +``` + +#### Pydantic Model Parameters + +```python +class InputParams(BaseModel): + """Configuration parameters for the service. + + Parameters: + timeout: Request timeout in seconds. + retry_count: Number of retry attempts. + enable_logging: Whether to enable debug logging. + """ + + timeout: Optional[float] = None + retry_count: int = 3 + enable_logging: bool = False +``` + +--- + +### 4. Pattern Consistency Checks + +#### Service Classes + +- Correct inheritance (`TTSService`, `STTService`, `LLMService`) +- Consistent constructor signatures +- Frame emission patterns +- Metrics support: + - `can_generate_metrics()` + - TTFB metrics + - Usage metrics +- Alignment with similar existing services + +#### Examples + +Validated against `examples/foundational/07-interruptible.py`: + +- Proper `create_transport()` usage +- Correct pipeline structure +- Task setup and observers +- Event handler registration +- Runner and bot entrypoint consistency + +--- + +### 5. Specific Implementation Patterns + +#### Service Implementation + +```python +class ExampleTTSService(TTSService): + + def __init__(self, *, api_key: Optional[str] = None, **kwargs): + super().__init__(**kwargs) + self._api_key = api_key or os.getenv("SERVICE_API_KEY") + + def can_generate_metrics(self) -> bool: + return True + + async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]: + try: + await self.start_ttfb_metrics() + yield TTSStartedFrame() + # ... processing ... + yield TTSAudioRawFrame(...) + finally: + await self.stop_ttfb_metrics() +``` + +--- + +#### Example Structure Pattern + +```python +transport_params = { + "daily": lambda: DailyParams(...), + "twilio": lambda: FastAPIWebsocketParams(...), + "webrtc": lambda: TransportParams(...), +} + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + stt = DeepgramSTTService(...) + tts = SomeTTSService(...) + llm = OpenAILLMService(...) + + context = LLMContext(messages) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair(...) + + pipeline = Pipeline([...]) + task = PipelineTask(pipeline, params=..., observers=[...]) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + await task.queue_frames([LLMRunFrame()]) + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + await runner.run(task) + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) +``` + +--- + +## Execution Flow + +1. Fetch uncommitted and outgoing changes +2. Categorize files (services, examples, tests, utilities) +3. Analyze each file: + - Readability + - Performance + - Documentation + - Pattern consistency +4. Generate actionable recommendations +5. Apply Pipecat standards + +--- + +## Examples + +### Before: Tuple Usage + +```python +def get_audio_info(self) -> Tuple[int, int]: + return (48000, 1) +``` + +### After: Named Class + +```python +class AudioInfo: + """Audio configuration information. + + Parameters: + sample_rate: Sample rate in Hz. + num_channels: Number of audio channels. + """ + + sample_rate: int + num_channels: int + +def get_audio_info(self) -> AudioInfo: + return AudioInfo(sample_rate=48000, num_channels=1) +``` + +--- + +### Before: Missing Documentation + +```python +class NewTTSService(TTSService): + def __init__(self, api_key: str, voice: str): + self._api_key = api_key + self._voice = voice +``` + +### After: Fully Documented + +```python +class NewTTSService(TTSService): + """Text-to-speech service using NewProvider API. + + Streams PCM audio and emits TTSAudioRawFrame frames compatible + with Pipecat transports. + + Supported features: + - Text-to-speech synthesis + - Streaming PCM audio + - Voice customization + - TTFB metrics + """ + + def __init__(self, *, api_key: str, voice: str, **kwargs): + """Initialize the NewTTSService. + + Args: + api_key: API key for authentication. + voice: Voice identifier to use. + **kwargs: Additional arguments passed to the parent service. + """ + super().__init__(**kwargs) + self._api_key = api_key + self.set_voice(voice) +``` + +--- + +## Notes + +- Non-breaking improvements only +- Backward compatibility preserved +- Conservative performance changes +- Google-style docstrings +- Pattern checks follow recent Pipecat code From 9d89afa7d454ce97014bd5d1c6ff51d73a624d1d Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:58:33 -0300 Subject: [PATCH 3/7] Automated tests for the context summarization feature. --- tests/test_context_summarization.py | 606 ++++++++++++++++++++++++++++ 1 file changed, 606 insertions(+) create mode 100644 tests/test_context_summarization.py diff --git a/tests/test_context_summarization.py b/tests/test_context_summarization.py new file mode 100644 index 0000000000..87aaa74d3b --- /dev/null +++ b/tests/test_context_summarization.py @@ -0,0 +1,606 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Tests for context summarization feature.""" + +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from pipecat.frames.frames import LLMContextSummaryRequestFrame +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.services.llm_service import LLMService +from pipecat.utils.context.llm_context_summarization import ( + LLMContextSummarizationConfig, + LLMContextSummarizationUtil, +) + + +class TestContextSummarizationMixin(unittest.TestCase): + """Tests for LLMContextSummarizationUtil.""" + + def test_estimate_tokens_simple_text(self): + """Test token estimation with simple text.""" + # Simple sentence: "Hello world" = 11 chars / 4 = 2.75 -> 2 tokens + tokens = LLMContextSummarizationUtil.estimate_tokens("Hello world") + self.assertEqual(tokens, 2) + + # More words: "This is a test message" = 22 chars / 4 = 5.5 -> 5 tokens + tokens = LLMContextSummarizationUtil.estimate_tokens("This is a test message") + self.assertEqual(tokens, 5) + + def test_estimate_tokens_empty(self): + """Test token estimation with empty text.""" + tokens = LLMContextSummarizationUtil.estimate_tokens("") + self.assertEqual(tokens, 0) + + def test_estimate_context_tokens(self): + """Test context token estimation.""" + context = LLMContext() + + # Empty context + self.assertEqual(LLMContextSummarizationUtil.estimate_context_tokens(context), 0) + + # Add messages + context.add_message({"role": "system", "content": "You are helpful"}) # ~4 words + context.add_message({"role": "user", "content": "Hello"}) # ~1 word + context.add_message({"role": "assistant", "content": "Hi there"}) # ~2 words + + # Each message has ~10 token overhead + # Total content: ~7 words * 1.3 = ~9 tokens + # Total overhead: 3 * 10 = 30 tokens + # Expected: ~39 tokens + total = LLMContextSummarizationUtil.estimate_context_tokens(context) + self.assertGreater(total, 30) # At least overhead + self.assertLess(total, 50) # Not too much + + def test_get_messages_to_summarize_basic(self): + """Test basic message extraction for summarization.""" + context = LLMContext() + + # Add messages + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "Message 2"}) + context.add_message({"role": "assistant", "content": "Response 2"}) + context.add_message({"role": "user", "content": "Message 3"}) + context.add_message({"role": "assistant", "content": "Response 3"}) + + # Keep last 2 messages + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 2) + + # Get first system message from context + first_system = None + for msg in context.messages: + if msg.get("role") == "system": + first_system = msg + break + + # Should get system message + self.assertIsNotNone(first_system) + self.assertEqual(first_system["content"], "System prompt") + + # Should get middle messages (indices 1-4) + self.assertEqual(len(result.messages), 4) + self.assertEqual(result.messages[0]["content"], "Message 1") + self.assertEqual(result.messages[-1]["content"], "Response 2") + + # Last index should be 4 (0-indexed) + self.assertEqual(result.last_summarized_index, 4) + + def test_get_messages_to_summarize_no_system(self): + """Test message extraction when there's no system message.""" + context = LLMContext() + + # Add messages without system prompt + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "Message 2"}) + context.add_message({"role": "assistant", "content": "Response 2"}) + + # Keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Get first system message from context + first_system = None + for msg in context.messages: + if msg.get("role") == "system": + first_system = msg + break + + # Should have no system message + self.assertIsNone(first_system) + + # Should get first 3 messages + self.assertEqual(len(result.messages), 3) + self.assertEqual(result.last_summarized_index, 2) + + def test_get_messages_to_summarize_insufficient(self): + """Test when there aren't enough messages to summarize.""" + context = LLMContext() + + # Add only 2 messages + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + + # Try to keep 2 messages (same as total) + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 2) + + # Should return empty + self.assertEqual(len(result.messages), 0) + self.assertEqual(result.last_summarized_index, -1) + + def test_format_messages_for_summary(self): + """Test message formatting for summary.""" + + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"}, + {"role": "user", "content": "How are you?"}, + ] + + transcript = LLMContextSummarizationUtil.format_messages_for_summary(messages) + + self.assertIn("USER: Hello", transcript) + self.assertIn("ASSISTANT: Hi there", transcript) + self.assertIn("USER: How are you?", transcript) + + def test_format_messages_with_list_content(self): + """Test formatting messages with list content.""" + + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "First part"}, + {"type": "text", "text": "Second part"}, + ], + } + ] + + transcript = LLMContextSummarizationUtil.format_messages_for_summary(messages) + + self.assertIn("USER: First part Second part", transcript) + + +class TestLLMContextSummarizationConfig(unittest.TestCase): + """Tests for LLMContextSummarizationConfig.""" + + def test_default_config(self): + """Test default configuration values.""" + config = LLMContextSummarizationConfig() + + self.assertEqual(config.max_context_tokens, 8000) + self.assertEqual(config.max_unsummarized_messages, 20) + self.assertEqual(config.min_messages_after_summary, 4) + self.assertIsNone(config.summarization_prompt) + + def test_custom_config(self): + """Test custom configuration.""" + config = LLMContextSummarizationConfig( + max_context_tokens=2500, + target_context_tokens=2000, + max_unsummarized_messages=15, + min_messages_after_summary=4, + summarization_prompt="Custom prompt", + ) + + self.assertEqual(config.max_context_tokens, 2500) + self.assertEqual(config.target_context_tokens, 2000) + self.assertEqual(config.max_unsummarized_messages, 15) + self.assertEqual(config.min_messages_after_summary, 4) + self.assertEqual(config.summary_prompt, "Custom prompt") + + def test_summary_prompt_property(self): + """Test summary_prompt property uses default when None.""" + config = LLMContextSummarizationConfig() + self.assertIn("summarizing a conversation", config.summary_prompt.lower()) + + config_with_custom = LLMContextSummarizationConfig(summarization_prompt="Custom") + self.assertEqual(config_with_custom.summary_prompt, "Custom") + + +class TestFunctionCallHandling(unittest.TestCase): + """Tests for function call handling in summarization.""" + + def test_function_call_in_progress_not_summarized(self): + """Test that messages with function calls in progress are not summarized.""" + context = LLMContext() + + # Add messages including a function call without result + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "What time is it?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + } + ], + } + ) + # No tool result yet - function call is in progress + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should only get the first user message, stopping before the function call + self.assertEqual(len(result.messages), 1) + self.assertEqual(result.messages[0]["content"], "What time is it?") + self.assertEqual(result.last_summarized_index, 1) + + def test_completed_function_call_can_be_summarized(self): + """Test that completed function calls can be summarized.""" + context = LLMContext() + + # Add messages including a complete function call sequence + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "What time is it?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + } + ], + } + ) + # Tool result completes the function call + context.add_message( + {"role": "tool", "tool_call_id": "call_123", "content": '{"time": "10:30 AM"}'} + ) + context.add_message({"role": "assistant", "content": "It's 10:30 AM"}) + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should get all messages except the last one (complete function call is included) + self.assertEqual(len(result.messages), 4) + self.assertEqual(result.messages[0]["content"], "What time is it?") + self.assertEqual(result.messages[-1]["content"], "It's 10:30 AM") + self.assertEqual(result.last_summarized_index, 4) + + def test_multiple_function_calls_in_progress(self): + """Test handling of multiple function calls in progress.""" + context = LLMContext() + + # Add messages with multiple function calls + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "What's the time and date?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_time", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + }, + { + "id": "call_date", + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + }, + ], + } + ) + # Only one tool result - other call still in progress + context.add_message( + {"role": "tool", "tool_call_id": "call_time", "content": '{"time": "10:30 AM"}'} + ) + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should stop before the function call that's in progress + # Messages to summarize: indices 1, 2, 3 (stops before index 4 where incomplete call is) + self.assertEqual(len(result.messages), 3) + self.assertEqual(result.last_summarized_index, 3) + + def test_multiple_completed_function_calls(self): + """Test that multiple completed function calls can be summarized.""" + context = LLMContext() + + # Add messages with multiple completed function calls + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "What's the time and date?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_time", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + }, + { + "id": "call_date", + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + }, + ], + } + ) + # Both tool results provided + context.add_message( + {"role": "tool", "tool_call_id": "call_time", "content": '{"time": "10:30 AM"}'} + ) + context.add_message( + { + "role": "tool", + "tool_call_id": "call_date", + "content": '{"date": "January 1, 2024"}', + } + ) + context.add_message({"role": "assistant", "content": "It's 10:30 AM on January 1, 2024"}) + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should get all messages except the last one (all function calls completed) + self.assertEqual(len(result.messages), 5) + self.assertEqual(result.last_summarized_index, 5) + + def test_sequential_function_calls_mixed_completion(self): + """Test sequential function calls with mixed completion states.""" + context = LLMContext() + + # Add messages with sequential function calls + context.add_message({"role": "system", "content": "System prompt"}) + + # First function call - completed + context.add_message({"role": "user", "content": "What time is it?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_1", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + } + ], + } + ) + context.add_message( + {"role": "tool", "tool_call_id": "call_1", "content": '{"time": "10:30 AM"}'} + ) + context.add_message({"role": "assistant", "content": "It's 10:30 AM"}) + + # Second function call - in progress + context.add_message({"role": "user", "content": "What's the date?"}) + context.add_message( + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_2", + "type": "function", + "function": {"name": "get_date", "arguments": "{}"}, + } + ], + } + ) + # No result for call_2 yet + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should get messages up to and including the first completed function call + # but stop before the second function call that's in progress + # Messages to summarize: indices 1, 2, 3, 4, 5 (stops before index 6 where incomplete call is) + self.assertEqual(len(result.messages), 5) + self.assertEqual(result.messages[-1]["content"], "What's the date?") + self.assertEqual(result.last_summarized_index, 5) + + def test_function_call_formatting_in_transcript(self): + """Test that function calls are properly formatted in transcript.""" + + messages = [ + {"role": "user", "content": "What time is it?"}, + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_123", + "type": "function", + "function": {"name": "get_time", "arguments": "{}"}, + } + ], + }, + {"role": "tool", "tool_call_id": "call_123", "content": '{"time": "10:30 AM"}'}, + {"role": "assistant", "content": "It's 10:30 AM"}, + ] + + transcript = LLMContextSummarizationUtil.format_messages_for_summary(messages) + + # Check that function call is included + self.assertIn("TOOL_CALL: get_time({})", transcript) + # Check that tool result is included + self.assertIn('TOOL_RESULT[call_123]: {"time": "10:30 AM"}', transcript) + + def test_no_function_calls(self): + """Test that summarization works normally without function calls.""" + context = LLMContext() + + # Add normal conversation without function calls + context.add_message({"role": "system", "content": "System prompt"}) + context.add_message({"role": "user", "content": "Hello"}) + context.add_message({"role": "assistant", "content": "Hi"}) + context.add_message({"role": "user", "content": "How are you?"}) + context.add_message({"role": "assistant", "content": "I'm good"}) + context.add_message({"role": "user", "content": "Latest message"}) + + # Try to keep last 1 message + result = LLMContextSummarizationUtil.get_messages_to_summarize(context, 1) + + # Should get all messages except the last one + self.assertEqual(len(result.messages), 4) + self.assertEqual(result.last_summarized_index, 4) + + +class TestSummaryGenerationExceptions(unittest.IsolatedAsyncioTestCase): + """Tests for summary generation exception handling.""" + + async def test_generate_summary_raises_on_no_messages(self): + """Test that _generate_summary raises RuntimeError when there are no messages to summarize.""" + llm_service = LLMService() + context = LLMContext() + + # Add only one message (system), which isn't enough to summarize + context.add_message({"role": "system", "content": "System prompt"}) + + frame = LLMContextSummaryRequestFrame( + request_id="test", + context=context, + min_messages_to_keep=1, + target_context_tokens=1000, + summarization_prompt="Summarize this", + ) + + with self.assertRaises(RuntimeError) as cm: + await llm_service._generate_summary(frame) + + self.assertEqual(str(cm.exception), "No messages to summarize") + + async def test_generate_summary_raises_on_no_run_inference(self): + """Test that _generate_summary raises RuntimeError when run_inference is not implemented.""" + # Create a minimal LLM service - base class raises NotImplementedError + llm_service = LLMService() + + context = LLMContext() + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "Message 2"}) + + frame = LLMContextSummaryRequestFrame( + request_id="test", + context=context, + min_messages_to_keep=1, + target_context_tokens=1000, + summarization_prompt="Summarize this", + ) + + with self.assertRaises(RuntimeError) as cm: + await llm_service._generate_summary(frame) + + self.assertIn("does not implement run_inference", str(cm.exception)) + self.assertIn("LLMService", str(cm.exception)) + + async def test_generate_summary_raises_on_empty_response(self): + """Test that _generate_summary raises RuntimeError when LLM returns empty summary.""" + llm_service = LLMService() + # Mock run_inference to return None + llm_service.run_inference = AsyncMock(return_value=None) + + context = LLMContext() + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "Message 2"}) + + frame = LLMContextSummaryRequestFrame( + request_id="test", + context=context, + min_messages_to_keep=1, + target_context_tokens=1000, + summarization_prompt="Summarize this", + ) + + with self.assertRaises(RuntimeError) as cm: + await llm_service._generate_summary(frame) + + self.assertEqual(str(cm.exception), "LLM returned empty summary") + + async def test_generate_summary_task_handles_exceptions(self): + """Test that _generate_summary_task properly handles exceptions from _generate_summary.""" + llm_service = LLMService() + + # Mock broadcast_frame to capture the result + broadcast_calls = [] + + async def mock_broadcast(frame_class, **kwargs): + broadcast_calls.append((frame_class, kwargs)) + + llm_service.broadcast_frame = mock_broadcast + + # Mock push_error + llm_service.push_error = AsyncMock() + + context = LLMContext() + context.add_message({"role": "system", "content": "System prompt"}) + + frame = LLMContextSummaryRequestFrame( + request_id="test_123", + context=context, + min_messages_to_keep=1, + target_context_tokens=1000, + summarization_prompt="Summarize this", + ) + + # Execute the task + await llm_service._generate_summary_task(frame) + + # Verify broadcast_frame was called with error + self.assertEqual(len(broadcast_calls), 1) + frame_class, kwargs = broadcast_calls[0] + self.assertEqual(kwargs["request_id"], "test_123") + self.assertEqual(kwargs["summary"], "") + self.assertEqual(kwargs["last_summarized_index"], -1) + self.assertEqual( + kwargs["error"], "Error generating context summary: No messages to summarize" + ) + + # Verify push_error was called + llm_service.push_error.assert_called_once() + + async def test_generate_summary_success(self): + """Test that _generate_summary returns successfully with valid input.""" + llm_service = LLMService() + # Mock run_inference to return a summary + llm_service.run_inference = AsyncMock(return_value="This is a summary of the conversation") + + context = LLMContext() + context.add_message({"role": "user", "content": "Message 1"}) + context.add_message({"role": "assistant", "content": "Response 1"}) + context.add_message({"role": "user", "content": "Message 2"}) + + frame = LLMContextSummaryRequestFrame( + request_id="test", + context=context, + min_messages_to_keep=1, + target_context_tokens=1000, + summarization_prompt="Summarize this", + ) + + summary, last_index = await llm_service._generate_summary(frame) + + self.assertEqual(summary, "This is a summary of the conversation") + self.assertGreater(last_index, -1) + self.assertEqual(last_index, 1) # Should be the index of the last summarized message + + +if __name__ == "__main__": + unittest.main() From 4a00e6829faafc67f8f54f64f261e508a7a1280d Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:58:44 -0300 Subject: [PATCH 4/7] Automated tests for the context summarizer. --- tests/test_llm_context_summarizer.py | 296 +++++++++++++++++++++++++++ 1 file changed, 296 insertions(+) create mode 100644 tests/test_llm_context_summarizer.py diff --git a/tests/test_llm_context_summarizer.py b/tests/test_llm_context_summarizer.py new file mode 100644 index 0000000000..7555a87629 --- /dev/null +++ b/tests/test_llm_context_summarizer.py @@ -0,0 +1,296 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import unittest + +from pipecat.frames.frames import ( + InterruptionFrame, + LLMContextSummaryRequestFrame, + LLMContextSummaryResultFrame, + LLMFullResponseStartFrame, +) +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_context_summarizer import LLMContextSummarizer +from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams +from pipecat.utils.context.llm_context_summarization import LLMContextSummarizationConfig + + +class TestLLMContextSummarizer(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.task_manager = TaskManager() + self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop())) + + self.context = LLMContext( + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + ] + ) + + async def test_summarization_triggered_by_token_limit(self): + """Test that summarization is triggered when token limit is reached.""" + config = LLMContextSummarizationConfig( + max_context_tokens=100, # Very low to trigger easily + max_unsummarized_messages=100, # High so it doesn't trigger by message count + ) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + request_frame = None + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_frame + request_frame = frame + + # Add messages to exceed token limit + for i in range(10): + self.context.add_message( + { + "role": "user", + "content": "This is a test message that adds tokens to the context.", + } + ) + + # Trigger check by processing LLMFullResponseStartFrame + await summarizer.process_frame(LLMFullResponseStartFrame()) + + # Should have triggered summarization + self.assertIsNotNone(request_frame) + self.assertIsInstance(request_frame, LLMContextSummaryRequestFrame) + self.assertEqual(request_frame.context, self.context) + + await summarizer.cleanup() + + async def test_summarization_triggered_by_message_count(self): + """Test that summarization is triggered when message count threshold is reached.""" + config = LLMContextSummarizationConfig( + max_context_tokens=100000, # Very high so it doesn't trigger by tokens + max_unsummarized_messages=5, # Low to trigger easily + ) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + request_frame = None + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_frame + request_frame = frame + + # Add messages to exceed message count + for i in range(6): + self.context.add_message({"role": "user", "content": f"Message {i}"}) + + # Trigger check + await summarizer.process_frame(LLMFullResponseStartFrame()) + + # Should have triggered summarization + self.assertIsNotNone(request_frame) + self.assertIsInstance(request_frame, LLMContextSummaryRequestFrame) + + await summarizer.cleanup() + + async def test_summarization_not_triggered_below_thresholds(self): + """Test that summarization is not triggered when below thresholds.""" + config = LLMContextSummarizationConfig( + max_context_tokens=10000, + max_unsummarized_messages=20, + ) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + request_frame = None + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_frame + request_frame = frame + + # Add a few messages (below threshold) + for i in range(3): + self.context.add_message({"role": "user", "content": "Short message"}) + + # Trigger check + await summarizer.process_frame(LLMFullResponseStartFrame()) + + # Should NOT have triggered summarization + self.assertIsNone(request_frame) + + await summarizer.cleanup() + + async def test_summarization_in_progress_prevents_duplicate(self): + """Test that a summarization in progress prevents triggering another.""" + config = LLMContextSummarizationConfig( + max_context_tokens=50, # Very low + max_unsummarized_messages=100, + ) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + request_count = 0 + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_count + request_count += 1 + + # Add enough messages to trigger + for i in range(10): + self.context.add_message({"role": "user", "content": "Test message to add tokens."}) + + # First trigger - should request summarization + await summarizer.process_frame(LLMFullResponseStartFrame()) + self.assertEqual(request_count, 1) + + # Second trigger while first is in progress - should NOT request again + await summarizer.process_frame(LLMFullResponseStartFrame()) + self.assertEqual(request_count, 1) + + await summarizer.cleanup() + + async def test_summary_result_handling(self): + """Test that summary results are processed and applied correctly.""" + config = LLMContextSummarizationConfig(max_context_tokens=50, min_messages_after_summary=2) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + # Add messages and trigger summarization + for i in range(10): + self.context.add_message({"role": "user", "content": "Test message."}) + + request_frame = None + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_frame + request_frame = frame + + original_message_count = len(self.context.messages) + await summarizer.process_frame(LLMFullResponseStartFrame()) + self.assertIsNotNone(request_frame) + + # Simulate receiving a summary result + summary_result = LLMContextSummaryResultFrame( + request_id=request_frame.request_id, + summary="This is a test summary.", + last_summarized_index=5, + error=None, + ) + + await summarizer.process_frame(summary_result) + + # Should have applied the summary and reduced message count + # Expected: system message + summary message + 2 recent messages = 4 messages + # (since last_summarized_index=5, we keep messages after index 5) + self.assertLess(len(self.context.messages), original_message_count) + + # Check that summary was added + summary_messages = [ + msg + for msg in self.context.messages + if "Conversation summary:" in msg.get("content", "") + ] + self.assertEqual(len(summary_messages), 1) + + await summarizer.cleanup() + + async def test_interruption_cancels_summarization(self): + """Test that an interruption cancels pending summarization.""" + config = LLMContextSummarizationConfig(max_context_tokens=50) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + # Add messages and trigger summarization + for i in range(10): + self.context.add_message({"role": "user", "content": "Test message."}) + + request_count = 0 + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_count + request_count += 1 + + await summarizer.process_frame(LLMFullResponseStartFrame()) + self.assertEqual(request_count, 1) + + # Process interruption + await summarizer.process_frame(InterruptionFrame()) + + # Try to trigger again - should work since the previous one was canceled + await summarizer.process_frame(LLMFullResponseStartFrame()) + self.assertEqual(request_count, 2) + + await summarizer.cleanup() + + async def test_stale_summary_result_ignored(self): + """Test that stale summary results are ignored.""" + config = LLMContextSummarizationConfig(max_context_tokens=50, min_messages_after_summary=2) + + summarizer = LLMContextSummarizer(context=self.context, config=config) + await summarizer.setup(self.task_manager) + + # Add messages and trigger summarization + for i in range(10): + self.context.add_message({"role": "user", "content": "Test message."}) + + request_frame = None + + @summarizer.event_handler("on_request_summarization") + async def on_request_summarization(summarizer, frame): + nonlocal request_frame + request_frame = frame + + original_message_count = len(self.context.messages) + await summarizer.process_frame(LLMFullResponseStartFrame()) + valid_request_id = request_frame.request_id + + # Send a stale summary result (wrong request_id) + stale_result = LLMContextSummaryResultFrame( + request_id="stale-id-123", + summary="Stale summary", + last_summarized_index=3, + error=None, + ) + + await summarizer.process_frame(stale_result) + + # Should be ignored - message count should not change + self.assertEqual(len(self.context.messages), original_message_count) + + # Send the correct summary result + valid_result = LLMContextSummaryResultFrame( + request_id=valid_request_id, + summary="Valid summary", + last_summarized_index=5, + error=None, + ) + + await summarizer.process_frame(valid_result) + + # Should be processed - message count should decrease + self.assertLess(len(self.context.messages), original_message_count) + + # Check that summary was added + summary_messages = [ + msg + for msg in self.context.messages + if "Conversation summary:" in msg.get("content", "") + ] + self.assertEqual(len(summary_messages), 1) + + await summarizer.cleanup() + + +if __name__ == "__main__": + unittest.main() From 5deb80932baf25e37ac10ca1651c7a8f6f104afb Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:58:55 -0300 Subject: [PATCH 5/7] Context summarization example with OpenAI --- .../54-context-summarization-openai.py | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 examples/foundational/54-context-summarization-openai.py diff --git a/examples/foundational/54-context-summarization-openai.py b/examples/foundational/54-context-summarization-openai.py new file mode 100644 index 0000000000..652a3af13d --- /dev/null +++ b/examples/foundational/54-context-summarization-openai.py @@ -0,0 +1,188 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Example demonstrating context summarization feature. + +This example shows how to enable and configure context summarization to automatically +compress conversation history when token limits are approached. It also demonstrates +that summarization correctly handles function calls, preserving incomplete function +call sequences. +""" + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMAssistantAggregatorParams, + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy +from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.utils.context.llm_context_summarization import LLMContextSummarizationConfig + +load_dotenv(override=True) + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +# Tool functions for the LLM +async def get_current_weather(params: FunctionCallParams): + """Get the current time in a readable format.""" + logger.info("Tool called: get_current_weather") + await asyncio.sleep(1) # Simulate some processing + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info("Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + + # Register tool functions + llm.register_function("get_current_weather", get_current_weather) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + tools = ToolsSchema(standard_tools=[weather_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way. You have access to tools to get the current weather - use them when relevant.", + }, + ] + + context = LLMContext(messages, tools=tools) + + # Create aggregators with summarization enabled + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] + ), + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + ), + assistant_params=LLMAssistantAggregatorParams( + enable_context_summarization=True, + # Optional: customize context summarization behavior + # Using low limits to demonstrate the feature quickly + context_summarization_config=LLMContextSummarizationConfig( + max_context_tokens=1000, # Trigger summarization at 1000 tokens + target_context_tokens=800, # Target context size for the summarization + max_unsummarized_messages=10, # Or when 10 new messages accumulate + min_messages_after_summary=2, # Keep last 2 messages uncompressed + ), + ), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() From ba242d487525415ba57431516cea6d774391943f Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:59:03 -0300 Subject: [PATCH 6/7] Context summarization example with Google --- .../54a-context-summarization-google.py | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 examples/foundational/54a-context-summarization-google.py diff --git a/examples/foundational/54a-context-summarization-google.py b/examples/foundational/54a-context-summarization-google.py new file mode 100644 index 0000000000..a7fe4ba5e9 --- /dev/null +++ b/examples/foundational/54a-context-summarization-google.py @@ -0,0 +1,188 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Example demonstrating context summarization feature. + +This example shows how to enable and configure context summarization to automatically +compress conversation history when token limits are approached. It also demonstrates +that summarization correctly handles function calls, preserving incomplete function +call sequences. +""" + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMAssistantAggregatorParams, + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.cartesia.tts import CartesiaTTSService +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.google import GoogleLLMService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy +from pipecat.turns.user_turn_strategies import UserTurnStrategies +from pipecat.utils.context.llm_context_summarization import LLMContextSummarizationConfig + +load_dotenv(override=True) + +# We use lambdas to defer transport parameter creation until the transport +# type is selected at runtime. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + ), +} + + +# Tool functions for the LLM +async def get_current_weather(params: FunctionCallParams): + """Get the current time in a readable format.""" + logger.info("Tool called: get_current_weather") + await asyncio.sleep(1) # Simulate some processing + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info("Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + + llm = GoogleLLMService(api_key=os.getenv("GOOGLE_API_KEY")) + + # Register tool functions + llm.register_function("get_current_weather", get_current_weather) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + tools = ToolsSchema(standard_tools=[weather_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way. You have access to tools to get the current weather - use them when relevant.", + }, + ] + + context = LLMContext(messages, tools=tools) + + # Create aggregators with summarization enabled + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_turn_strategies=UserTurnStrategies( + stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] + ), + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + ), + assistant_params=LLMAssistantAggregatorParams( + enable_context_summarization=True, + # Optional: customize context summarization behavior + # Using low limits to demonstrate the feature quickly + context_summarization_config=LLMContextSummarizationConfig( + max_context_tokens=1000, # Trigger summarization at 1000 tokens + target_context_tokens=800, # Target context size for the summarization + max_unsummarized_messages=10, # Or when 10 new messages accumulate + min_messages_after_summary=2, # Keep last 2 messages uncompressed + ), + ), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, + user_aggregator, # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + assistant_aggregator, # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info("Client connected") + # Kick off the conversation. + messages.append({"role": "system", "content": "Please introduce yourself to the user."}) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info("Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() From 247569795588919345136152276702b6265c4e8d Mon Sep 17 00:00:00 2001 From: filipi87 Date: Tue, 10 Feb 2026 18:59:12 -0300 Subject: [PATCH 7/7] Changelog entries for context summarization --- changelog/3621.added.2.md | 1 + changelog/3621.added.md | 5 +++++ 2 files changed, 6 insertions(+) create mode 100644 changelog/3621.added.2.md create mode 100644 changelog/3621.added.md diff --git a/changelog/3621.added.2.md b/changelog/3621.added.2.md new file mode 100644 index 0000000000..395c8bdfb4 --- /dev/null +++ b/changelog/3621.added.2.md @@ -0,0 +1 @@ +- Added new frames for context summarization: `LLMContextSummaryRequestFrame` and `LLMContextSummaryResultFrame`. diff --git a/changelog/3621.added.md b/changelog/3621.added.md new file mode 100644 index 0000000000..8b2197eea1 --- /dev/null +++ b/changelog/3621.added.md @@ -0,0 +1,5 @@ +- Added context summarization feature to automatically compress conversation history when conversation length limits (by token or message count) are reached, enabling efficient long-running conversations. + - Configure via `enable_context_summarization=True` in `LLMAssistantAggregatorParams` + - Customize behavior with `LLMContextSummarizationConfig` (max tokens, thresholds, etc.) + - Automatically preserves incomplete function call sequences during summarization + - See new examples: `examples/foundational/54-context-summarization-openai.py` and `examples/foundational/54a-context-summarization-google.py`