feat(memory): Optimize API calls with batched storage for better latency/cost#223
feat(memory): Optimize API calls with batched storage for better latency/cost#223
Conversation
Add opt-in storage_version="v2" parameter to AgentCoreMemorySessionManager
that enables batched API calls by using unified actorId.
Changes:
- Add storage_version parameter ("v1" default, "v2" opt-in)
- v2 batches message + agent state in single API call
e730d72 to
46ce469
Compare
| MessageAddedEvent, lambda event: self.save_message_with_state(event.message, event.agent) | ||
| ) | ||
| registry.add_callback(AfterInvocationEvent, lambda event: self._sync_agent_state(event.agent)) | ||
| registry.add_callback(MessageAddedEvent, lambda event: self.retrieve_customer_context(event)) |
There was a problem hiding this comment.
issue: This v2 branch is missing the multi-agent hooks that the parent class registers:
MultiAgentInitializedEvent→initialize_multi_agentAfterNodeCallEvent→sync_multi_agentAfterMultiAgentInvocationEvent→sync_multi_agent
Multi-agent scenarios will silently fail to persist state with v2. Consider either:
- Adding these hooks explicitly here, or
- Calling
RepositorySessionManager.register_hooks()first, then overriding specific callbacks
Suggested fix:
# Add after line 735:
registry.add_callback(MultiAgentInitializedEvent, lambda event: self.initialize_multi_agent(event.source))
registry.add_callback(AfterNodeCallEvent, lambda event: self.sync_multi_agent(event.source))
registry.add_callback(AfterMultiAgentInvocationEvent, lambda event: self.sync_multi_agent(event.source))
There was a problem hiding this comment.
Added multi-agent hooks from strands.experimental.hooks.multiagent
| "Synced agent state: event=%s, agent=%s", event.get("event", {}).get("eventId"), agent.agent_id | ||
| ) | ||
| except Exception as e: | ||
| logger.error("Failed to sync agent state: %s", e) |
There was a problem hiding this comment.
small: This silently swallows the exception, but save_message_with_state (line 243) raises SessionException on failure. Should this also raise to maintain consistent error handling? Silent failures here could lead to undetected data loss.
Suggested fix:
except Exception as e:
logger.error("Failed to sync agent state: %s", e)
raise SessionException(f"Failed to sync agent state: {e}") from e
There was a problem hiding this comment.
Fixed - raises SessionException on failure
| agent_data = json.loads(events[0].get("payload", {})[0].get("blob")) | ||
| return SessionAgent.from_dict(agent_data) | ||
| try: | ||
| if self.storage_version == "v2": |
There was a problem hiding this comment.
issue: V2 only queries config.actor_id, but V1 stored agent state under agent_{id}. If a user switches from v1 to v2 on an existing session, their agent state becomes inaccessible (appears as a new agent, losing previous state).
This silently loses access to existing data when upgrading.
Suggested fix - add fallback to v1 location:
# After the v2 loop finds nothing, before returning None:
# Fallback: check v1 location for migration support
events = self.memory_client.list_events(
memory_id=self.config.memory_id,
actor_id=self._get_full_agent_id(agent_id),
session_id=session_id,
max_results=1,
)
if events:
agent_data = json.loads(events[0].get("payload", {})[0].get("blob"))
return SessionAgent.from_dict(agent_data)
There was a problem hiding this comment.
Fixed - dual-read with legacy fallback
| logger.error("Failed to save message with state: %s", e) | ||
| raise SessionException(f"Failed to save message with state: {e}") from e | ||
|
|
||
| def _sync_agent_state(self, agent: "Agent") -> None: |
There was a problem hiding this comment.
optimization: I know you stated one of the main purposes of the PR is to reduce redundant API calls, this isn't a big issue but I believe there is a chance for more optimization here. Agent state is saved twice at the end of each invocation:
- With the final message via
save_message_with_state(line 732) - Again here via
_sync_agent_state
This adds an extra API call per invocation. The final sync is meant to capture conversation_manager_state updates that happen after the last message, but in most cases the state hasn't changed.
Suggested optimization: Track whether state changed since last save, and skip the final sync if unchanged:
# In save_message_with_state, store a hash/snapshot:
self._last_synced_state = hash(json.dumps(session_agent.to_dict(), sort_keys=True))
# In syncagent_state, check before saving:
def syncagent_state(self, agent: "Agent") -> None:
current_state = hash(json.dumps(SessionAgent.from_agent(agent).to_dict(), sort_keys=True))
if current_state == getattr(self, '_last_synced_state', None):
return # No change, skip sync
# ... rest of method
There was a problem hiding this comment.
Fixed - skips sync when state unchanged
5f872f9 to
ba2fab9
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #223 +/- ##
=======================================
Coverage ? 90.25%
=======================================
Files ? 35
Lines ? 3488
Branches ? 518
=======================================
Hits ? 3148
Misses ? 190
Partials ? 150
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ba2fab9 to
4ff0e93
Compare
aidandaly24
left a comment
There was a problem hiding this comment.
looks good to me -- approved
Closes #222
Summary
API Call Reduction
Implementation Details
Batched Storage
_save_message_with_state()_typemarkers:"message"and"agent_state"State Hash Tracking
_compute_state_hash()computes hash excluding timestamps (created_at, updated_at)AfterInvocationEventsync is skipped when state is unchangedDual-Read for Backward Compatibility
read_agent()tries new format first (unified actorId with type markers)agent_{id}actorId) for existing dataFiles Changed
session_manager.py: Batched storage, hash tracking, dual-readbedrock_converter.py: Parse new payload formatTest Plan
Breaking Changes
None - existing data is read via legacy fallback, new data uses optimized format.