UN-3352 Add synchronous flag to AsyncCollatingQueue.put() to be very clear as to what's going on#332
Conversation
…clear what's going on
| # with corresponding changes in manifest file. | ||
| # Update will be done every AGENT_MANIFEST_UPDATE_PERIOD_SECONDS seconds; | ||
| # if value is not specified or <= 0, no dynamic updates will be executed. | ||
| # if value is not specified or <= 0, no such dynamic updates will be executed. |
There was a problem hiding this comment.
Minor verbiage tweak.
| return message | ||
|
|
||
| async def put(self, item: Any): | ||
| async def put(self, item: Any, synchronous: bool = False): |
There was a problem hiding this comment.
Add this synchronous parameter to allow callers to make the intentions obvious.
| # as the journal messages come from inside a separate event loop from that request. The lock | ||
| # taken here ends up being harmless in the synchronous request case (like for gRPC) because | ||
| # we would only be blocking our own event loop. | ||
| await queue.put_final_item(synchronous=True) |
There was a problem hiding this comment.
Be very specific as to why we want to do synchronous stuff.
Big-ass comment to boot.
| # as the journal messages come from inside a separate event loop from that request. The lock | ||
| # taken here ends up being harmless in the synchronous request case (like for gRPC) because | ||
| # we would only be blocking our own event loop. | ||
| await self.hopper.put(message_dict, synchronous=True) |
There was a problem hiding this comment.
Same big-ass comment here too.
| from typing import List | ||
|
|
||
| import logging | ||
| import threading |
There was a problem hiding this comment.
Beautify imports.
| is_new: bool = False | ||
| with self.lock: | ||
| is_new = self.agents_table.get(agent_name, None) is None | ||
| is_new = self.agents_table.get(agent_name) is None |
There was a problem hiding this comment.
None is not necessary, it's the default for get()
Below: Add a little whitespace for readability.
…lab/neuro-san into UN-3352-sync-queue
|
|
||
| async def put_final_item(self): | ||
| async def put_final_item(self, synchronous: bool = False): | ||
| """ |
There was a problem hiding this comment.
So why synchronous: bool is False by default? It looks like we really use it with synchronous == True,
and that's the mode that will work in all our scenarios,
that is: event loop on one side and thread + maybe event loop on the other side.
Then again, queue put operation is fast and not supposed to fail (unless we OOMed)
so it's OK to sync call it even from running event loop.
There was a problem hiding this comment.
Because what you would expect from an asynchronous call by default is to call the asynchronous version of put().
Calling the synchronous version of put() is the exception from the expectation, and that's what needs to be called out in the API when it is used like that.
I was looking to re-use AsyncCollatingQueue and I found something I thought to be suspicious:
an asynchronous call put() was putting something in the queue synchronously.
After some digging, I found this to be OK, but its usage needed to be made much more up-front than was currently done.
A couple other minor tweaks.
Tested: http chicken scenario. All agent_thinking comes over as expected.