Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/crewai/src/crewai/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ def _update_executor_parameters(
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn

# Clear accumulated messages and reset iterations from previous tasks
# to prevent context pollution when agent is reused across sequential tasks
self.agent_executor.messages = []
self.agent_executor.iterations = 0

if self.agent_executor.llm:
existing_stop = getattr(self.agent_executor.llm, "stop", [])
self.agent_executor.llm.stop = list(
Expand Down
135 changes: 135 additions & 0 deletions lib/crewai/tests/agents/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2351,3 +2351,138 @@ def test_agent_without_apps_no_platform_tools():

tools = crew._prepare_tools(agent, task, [])
assert tools == []


def test_agent_executor_messages_cleared_between_tasks():
"""Test that agent executor messages are cleared when agent is reused across tasks.

This test verifies the fix for issue #4319 where messages accumulated when
the same agent was reused across multiple sequential tasks, causing:
- Duplicate system messages
- Context pollution
- Eventually crashes with 'Invalid response from LLM call - None or empty'
"""
agent = Agent(
role="Test Agent",
goal="Execute tasks",
backstory="Test agent for message accumulation fix",
allow_delegation=False,
)

task1 = Task(
description="Task 1: Simple calculation",
expected_output="Result",
agent=agent,
)

task2 = Task(
description="Task 2: Another calculation",
expected_output="Result",
agent=agent,
)

task3 = Task(
description="Task 3: Yet another calculation",
expected_output="Result",
agent=agent,
)

# Create the agent executor for the first task
agent.create_agent_executor(tools=[], task=task1)
assert agent.agent_executor is not None
initial_executor = agent.agent_executor

# Simulate messages being added during task execution
agent.agent_executor.messages = [
{"role": "system", "content": "System prompt"},
{"role": "user", "content": "Task 1 prompt"},
{"role": "assistant", "content": "Task 1 response"},
]
agent.agent_executor.iterations = 5

# Reuse the agent for task 2 - this should clear messages
agent.create_agent_executor(tools=[], task=task2)

# Verify the same executor instance is reused
assert agent.agent_executor is initial_executor

# Verify messages were cleared
assert agent.agent_executor.messages == [], (
"Messages should be cleared when agent executor is reused for a new task"
)

# Verify iterations were reset
assert agent.agent_executor.iterations == 0, (
"Iterations should be reset when agent executor is reused for a new task"
)

# Simulate more messages for task 2
agent.agent_executor.messages = [
{"role": "system", "content": "System prompt"},
{"role": "user", "content": "Task 2 prompt"},
{"role": "assistant", "content": "Task 2 response"},
]
agent.agent_executor.iterations = 3

# Reuse the agent for task 3 - this should clear messages again
agent.create_agent_executor(tools=[], task=task3)

# Verify the same executor instance is still reused
assert agent.agent_executor is initial_executor

# Verify messages were cleared again
assert agent.agent_executor.messages == [], (
"Messages should be cleared each time agent executor is reused"
)

# Verify iterations were reset again
assert agent.agent_executor.iterations == 0, (
"Iterations should be reset each time agent executor is reused"
)


def test_agent_executor_state_isolation_across_crews():
"""Test that agent executor state is properly isolated when same agent is used in multiple crews.

This is a common pattern in Flow-based workflows where agents are reused
across multiple crew kickoffs.
"""
agent = Agent(
role="Reusable Agent",
goal="Execute tasks across multiple crews",
backstory="Agent that gets reused",
allow_delegation=False,
)

# First crew with first task
task1 = Task(
description="First crew task",
expected_output="Result 1",
agent=agent,
)

# Create executor for first task
agent.create_agent_executor(tools=[], task=task1)
executor = agent.agent_executor

# Simulate task execution adding messages
executor.messages = [
{"role": "system", "content": "System 1"},
{"role": "user", "content": "User 1"},
]
executor.iterations = 2

# Second crew with second task (simulating Flow pattern)
task2 = Task(
description="Second crew task",
expected_output="Result 2",
agent=agent,
)

# Reuse agent for second task
agent.create_agent_executor(tools=[], task=task2)

# State should be clean for the new task
assert executor.messages == []
assert executor.iterations == 0
assert executor.task == task2
Loading