Skip to content

Comments

UN-3352 Add synchronous flag to AsyncCollatingQueue.put() to be very clear as to what's going on#332

Merged
d1donlydfink merged 6 commits intomainfrom
UN-3352-sync-queue
Jul 28, 2025
Merged

UN-3352 Add synchronous flag to AsyncCollatingQueue.put() to be very clear as to what's going on#332
d1donlydfink merged 6 commits intomainfrom
UN-3352-sync-queue

Conversation

@d1donlydfink
Copy link
Collaborator

@d1donlydfink d1donlydfink commented Jul 25, 2025

I was looking to re-use AsyncCollatingQueue and I found something I thought to be suspicious:
an asynchronous call put() was putting something in the queue synchronously.
After some digging, I found this to be OK, but its usage needed to be made much more up-front than was currently done.

A couple other minor tweaks.

Tested: http chicken scenario. All agent_thinking comes over as expected.

# with corresponding changes in manifest file.
# Update will be done every AGENT_MANIFEST_UPDATE_PERIOD_SECONDS seconds;
# if value is not specified or <= 0, no dynamic updates will be executed.
# if value is not specified or <= 0, no such dynamic updates will be executed.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor verbiage tweak.

return message

async def put(self, item: Any):
async def put(self, item: Any, synchronous: bool = False):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this synchronous parameter to allow callers to make the intentions obvious.

# as the journal messages come from inside a separate event loop from that request. The lock
# taken here ends up being harmless in the synchronous request case (like for gRPC) because
# we would only be blocking our own event loop.
await queue.put_final_item(synchronous=True)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be very specific as to why we want to do synchronous stuff.
Big-ass comment to boot.

# as the journal messages come from inside a separate event loop from that request. The lock
# taken here ends up being harmless in the synchronous request case (like for gRPC) because
# we would only be blocking our own event loop.
await self.hopper.put(message_dict, synchronous=True)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same big-ass comment here too.

from typing import List

import logging
import threading
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautify imports.

is_new: bool = False
with self.lock:
is_new = self.agents_table.get(agent_name, None) is None
is_new = self.agents_table.get(agent_name) is None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is not necessary, it's the default for get()
Below: Add a little whitespace for readability.


async def put_final_item(self):
async def put_final_item(self, synchronous: bool = False):
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why synchronous: bool is False by default? It looks like we really use it with synchronous == True,
and that's the mode that will work in all our scenarios,
that is: event loop on one side and thread + maybe event loop on the other side.
Then again, queue put operation is fast and not supposed to fail (unless we OOMed)
so it's OK to sync call it even from running event loop.

Copy link
Collaborator Author

@d1donlydfink d1donlydfink Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because what you would expect from an asynchronous call by default is to call the asynchronous version of put().
Calling the synchronous version of put() is the exception from the expectation, and that's what needs to be called out in the API when it is used like that.

Copy link
Contributor

@andreidenissov-cog andreidenissov-cog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@d1donlydfink d1donlydfink merged commit 1923e46 into main Jul 28, 2025
4 checks passed
@d1donlydfink d1donlydfink deleted the UN-3352-sync-queue branch July 28, 2025 18:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants