Releases: tastyware/streaq
tastyware/streaq:v6.1.0
tastyware/streaq:v6.0.2
What's Changed
uvloopwas a required dependency. But this doesn't make much sense for a library that supports Trio. Now optional and added a docs section about ways to optimize performance.- Tests now run on Trio, asyncio without uvloop and asyncio+uvloop. Additionally Python 3.14 free-threaded has been added to the matrix.
- Add a new worker parameter,
grace_period, which will give tasks time to finish after receiving a signal but before doing a hard shutdown.
Full Changelog: v6.0.1...v6.0.2
tastyware/streaq:v6.0.1
What's Changed
-
Add Trio support (pending anyio support in coredis) by @Graeme22 in #80
This is a large change, powered by simultaneous changes to the coredis dependency. Here's a summary of the changes:- The library now works with both asyncio and Trio. You can change the backend by passing
anyio_backendto workers:# asyncio is the default worker = Worker(anyio_backend="trio", anyio_kwargs={"restrict_keyboard_interrupt_to_checkpoints": True})
- streaQ now uses structured concurrency everywhere for more predictability. If you haven't read Nathaniel Smith's article on structured concurrency, go read it now--it's a fantastic explanation of the motivations and an introduction to some of the concepts.
- As a consequence of SC, async context managers for the worker are back! You'll need to use the context manager for any operation that involves communicating with Redis:
This means the recommended way of integrating with FastAPI has changed a bit:
async with worker: task = await sleeper.enqueue() print(await task.result(3))
See here for more details.@asynccontextmanager async def app_lifespan(app: FastAPI) -> AsyncGenerator[None]: async with worker: yield app = FastAPI(lifespan=app_lifespan)
- The library now works with both asyncio and Trio. You can change the backend by passing
-
dynamic cron jobs by @Graeme22 in #117
Aside from defining cron jobs with the decorator, you can also schedule tasks dynamically:task = await sleeper.enqueue(1).start(schedule="*/5 * * * *") # every 5 minutes
This causes the task to run repeatedly with the given arguments at the given schedule. To stop scheduling a repeating task, you can use:
await task.unschedule() # OR await worker.unschedule_by_id(task.id)
-
Modular changes by @Graeme22 in #125
Adds new function,Worker.include(), which simply modifies the included worker's tasks to point to it instead. At its face this sounds like it could lead to problems, but since a) Only one worker is running per process at a time; and b) Workers that are included in other workers are not aware of that at import time, practically this is a clean and easy solution. To make this a bit more concrete:from streaq import Worker child = Worker() @child.task() async def foobar() -> None: ...
from streaq import Worker from child_module import child parent = Worker() parent.include(child)
Here, we can actually run either worker independently without issues:
streaq run child_module:childandstreaq run parent_module:parentshould both work fine, because when running just the child, theparent.include(child)line will never run.The other element of this PR that is quite crucial is detaching the ability to access task and worker context from the worker instance itself. Under the hood this is done with a couple global
ContextVars, but in practice it uses a clean, FastAPI-like dependency injection pattern:from streaq import TaskContext, TaskDepends, WorkerDepends from typing import NamedTuple class WorkerContext(NamedTuple): db_connection: MyDBDriver @worker.task() async def has_dependencies( task_context: TaskContext = TaskDepends(), worker_context: WorkerContext = WorkerDepends(), ) -> None: print(task_context.task_id) print(worker_context.db_connection)
What this enables that was not previously possible is a total separation of concerns. Task definitions can now access dependencies without having to know anything about the top level
Workerthey will eventually be included in. This replaces the previousWorker.contextandWorker.task_context()functions altogether. -
CLI makeover: the
streaqcommand has been split into two subcommands,streaq runandstreaq webfor running a worker and the web UI respectively. As a bonus this allowed for getting some extra coverage that was previously marked with# pragma: no cover. -
Typing for pipelined tasks is drastically better for both
Task.then()andTask.__or__(), with some fancy overloads and aTypeVarTuplethat allow for nearly perfect typing of the complex scenario where results from one task are fed into the next. -
Workers no longer attempt to re-enqueue tasks on shutdown--instead, we assume all shutdowns are unsafe, and allow
Worker.idle_timeoutto handle retrying. The default idle timeout has been decreased from 5 minutes to 1 minute to reflect this change. -
Redis scripts are now Redis functions in a single Lua library file. This means we now use FCALL instead of EVALSHA, which slightly increases performance in pipelines. This also means Redis versions less than 7.0 are no longer supported.
-
Task abortion now handles delayed tasks better, aborting them right away instead of marking them and leaving them untouched
-
TaskResultnow has slightly better typing. Instead of accessingTaskResult.resultwhich was either the task's output or any raised exception,TaskResult.resultnow only contains the task's output and errors if you access it whenTaskResult.successis False. Exceptions are now contained inTaskResult.exception, and errors if you access it whenTaskResult.successis True.async with worker: result = await task.result(3) if result.success: print(result.result) # this is present only if success else: raise result.exception # this is present only if not success
-
Add date in logs by @nsteinmetz in #91
-
Ability to set custom formatters for task results and exceptions in Web UI by @espdev in #95
-
Update docs for SAQ comparison: abort support by @vikigenius in #100
-
Place sleep after the first healthcheck, not before it. by @espdev in #104
-
Add the info about exception traceback serialization with tblib to the docs by @espdev in #105
-
Improve the user experience when running tests by @espdev in #107
-
StreaqRetry,StreaqError, andStreaqCancelledhave been moved from utils to thestreaq.typesmodule. -
Workercan now be initialized from acoredis.ConnectionPoolinstead of a connection URI. -
The Redis health check cron job is now registered at worker startup instead of in the constructor.
-
Redis Sentinel now in test coverage everywhere a worker is used
-
Tests now use a Docker compose cluster
-
Added "Contributing" page to docs
-
Benchmarks updated
-
Web UI no longer uses
alru_cachesince it's not compatible with anyio
New Contributors
- @vikigenius made their first contribution in #100
Full Changelog: v5.2.2...v6.0.1
tastyware/streaq:v6.0.0b2
What's Changed
-
Ability to set custom formatters for task results and exceptions in Web UI by @espdev in #95
- Show task result/exception info in the adaptive scroll area
- Add dependency functions
get_result_formatterandget_exception_formatterto customize result/exception output
-
Bump astral-sh/setup-uv from 6 to 7 by @dependabot[bot] in #96
-
Update docs for SAQ comparison: abort support by @vikigenius in #100
-
Custom worker id feature by @espdev in #101
The PR adds ability to set a custom worker id. -
Place sleep after the first healthcheck, not before it. by @espdev in #104
-
Add the info about exception traceback serialization with tblib to the docs by @espdev in #105
-
Improve the user experience when running tests by @espdev in #107
-
Bump actions/checkout from 5 to 6 by @dependabot[bot] in #110
-
dynamic cron jobs by @Graeme22 in #117
Moves cron registry to Redis. This allows for dynamically adding/removing cron jobs.RegisteredCronis collapsed intoRegisteredTask. -
Bump python-multipart from 0.0.20 to 0.0.22 by @dependabot[bot] in #120
-
Bump urllib3 from 2.5.0 to 2.6.3 by @dependabot[bot] in #121
-
Bump starlette from 0.48.0 to 0.49.1 by @dependabot[bot] in #122
-
Bump filelock from 3.19.1 to 3.20.3 by @dependabot[bot] in #123
-
Modular changes by @Graeme22 in #125
Instead of each worker maintaining a list of "coworkers" whose context managers would also have to be entered (which led to a ton of duplicate Redis clients and some messy code withAsyncExitStack),Worker.include()simply modifies the included worker's tasks to point to it instead. At its face this sounds like it could lead to problems, but since a) Only one worker is running per process at a time; and b) Workers that are included in other workers are not aware of that at import time, practically this is a clean and easy solution. To make this a bit more concrete:from streaq import Worker child = Worker() @child.task() async def foobar() -> None: ...
from streaq import Worker from child_module import child parent = Worker() parent.include(child)
Here, we can actually run either worker independently without issues:
streaq run child_module:childandstreaq run parent_module:parentshould both work fine, because when running just the child, theparent.include(child)line will never run.The other element of this PR that is quite crucial is detaching the ability to access task and worker context from the worker instance itself. Under the hood this is done with a couple global
ContextVars, but in practice it uses a clean, FastAPI-like dependency injection pattern:from streaq import TaskContext, TaskDepends, WorkerDepends from typing import NamedTuple class WorkerContext(NamedTuple): db_connection: MyDBDriver @worker.task() async def has_dependencies( task_context: TaskContext = TaskDepends(), worker_context: WorkerContext = WorkerDepends(), ) -> None: print(task_context.task_id) print(worker_context.db_connection)
What this enables that was not previously possible is a total separation of concerns. Task definitions can now access dependencies without having to know anything about the top level
Workerthey will eventually be included in. This replaces the previousWorker.contextandWorker.task_context()functions altogether. Since coredis 6.0.0 is around the corner, this will likely be the last breaking change for the foreseeable future.Finally, this PR adds a few other things in preparation for the v6.0.0 release:
- CLI makeover: the
streaqcommand has been split into two subcommands,streaq runandstreaq webfor running a worker and the web UI respectively. As a bonus this allowed for getting some extra coverage that was previously marked with# pragma: no cover. - Typing for pipelined tasks is drastically better for both
Task.then()andTask.__or__(), which some fancy overloads and aTypeVarTuplethat allow for nearly perfect typing of the complex scenario where results from one task are fed into the next. StreaqRetry,StreaqError, andStreaqCancelledhave been moved from utils to thestreaq.typesmodule.Workercan now be initialized from acoredis.ConnectionPoolinstead of a connection URI.- The Redis health check cron job is now registered at worker startup instead of in the constructor.
- CLI makeover: the
New Contributors
- @vikigenius made their first contribution in #100
Full Changelog: v6.0.0b1...v6.0.0b2
tastyware/streaq:v6.0.0b1
What's Changed
- Add Trio support (pending anyio support in coredis) by @Graeme22 in #80
This is a large change, powered by simultaneous changes to coredis, the client used behind the scenes. This release is beta, pending the release of the next major coredis version. Here's a summary of the changes:- The library now works with both asyncio and Trio. You can change the backend by passing
anyio_backendto workers:# asyncio is the default worker = Worker(anyio_backend="trio", anyio_kwargs={"restrict_keyboard_interrupt_to_checkpoints": True})
- streaQ now uses structured concurrency everywhere for more predictability. If you haven't read Nathaniel Smith's article on structured concurrency, go read it now--it's a fantastic explanation of the motivations and an introduction to some of the concepts.
- As a consequence of SC, async context managers for the worker are back! You'll need to use the context manager for any operation that involves communicating with Redis:
This means the recommended way of integrating with FastAPI has changed a bit:
async with worker: task = await sleeper.enqueue() print(await task.result(3))
See here for more details.@asynccontextmanager async def app_lifespan(app: FastAPI) -> AsyncGenerator[None]: async with worker: yield app = FastAPI(lifespan=app_lifespan)
- The library now works with both asyncio and Trio. You can change the backend by passing
- Workers no longer attempt to re-enqueue tasks on shutdown--instead, we assume all shutdowns are unsafe, and allow
Worker.idle_timeoutto handle retrying. The default idle timeout has been decreased from 5 minutes to 1 minute to reflect this change. - Redis scripts are now Redis functions in a single Lua library file. This means we now use FCALL instead of EVALSHA, which slightly increases performance in pipelines. This also means Redis versions less than 7.0 are no longer supported.
- Task abortion now handles delayed tasks better, aborting them right away instead of marking them and leaving them untouched
TaskResultnow has slightly better typing. Instead of accessingTaskResult.resultwhich was either the task's output or any raised exception,TaskResult.resultnow only contains the task's output and errors if you access it whenTaskResult.successis False. Exceptions are now contained inTaskResult.exception, and errors if you access it whenTaskResult.successis True.async with worker: result = await task.result(3) if result.success: print(result.result) # this is present only if success else: raise result.exception # this is present only if not success
- Add date in logs by @nsteinmetz in #91
Logs now contain date as well as time. Timezone can be customized as before. - Dependencies bumped, now using dependabot (#81, #86)
- Redis Sentinel now in test coverage everywhere a worker is used
- Tests now use a Docker compose cluster
- Added "Contributing" page to docs
- Benchmarks updated
- Web UI no longer uses
alru_cachesince it's not compatible with anyio
New Contributors
- @dependabot[bot] made their first contribution in #81
Full Changelog: v5.2.2...v6.0.0b1
tastyware/streaq:v5.2.2
What's Changed
- Add function name to logs by @nsteinmetz in #78
New Contributors
- @nsteinmetz made their first contribution in #78
Full Changelog: v5.2.1...v5.2.2
tastyware/streaq:v5.2.1
What's Changed
- Fixes a bug that came up in production where a unique task may not get retried after an abrupt shutdown. Since uniqueness is controlled via a lock key in Redis, a task that is shut down abruptly may not get the chance to release the lock. This wouldn't have been a problem, except that retries of an abruptly aborted task were incorrectly prevented from obtaining the lock. This was made worse by the fact that the lock key may not have a timeout (since it inherits from the task's timeout), meaning a unique task could be locked out from running indefinitely.
- Small docs updates
Full Changelog: v5.2.0...v5.2.1
tastyware/streaq:v5.2.0
What's Changed
- Do not install uvloop on Windows by @espdev in #71
- Use XAUTOCLAIM for timeout; separate out Lua; schedule option for StreaqRetry by @Graeme22 in #72
StreaqRetrycan now schedule retries at a specific time:raise StreaqRetry("Oh no!", schedule=datetime.now() + timedelta(hours=3))
- Idle task reclamation is now simpler and more robust, built on Redis'
XAUTOCLAIMcommand. Pessimistic execution guarantees are stronger: no setting of a per-task timeout is required, and long-running tasks can be retried much sooner upon failure. Credit goes to Christopher Schleiden for the idea of repeatedly callingXCLAIMfor running tasks, which resets their idle timeouts and allows for usingXAUTOCLAIMeven when tasks have varying durations. Overall, performance should be slightly improved and robustness as well. - Lua scripts for Redis are now contained in the
lua/directory as actual Lua files for easier reading and debugging. They've also been optimized and commented. - CI now uses uv for publishing to PyPI/TestPyPI
- The warning for no longer using the async context manager has been removed, as coredis version 6 may require async initialization of the Redis client, which would mean the async syntax would need to be restored.
Full Changelog: v5.1.0...v5.2.0
tastyware/streaq:v5.1.0
tastyware/streaq:v5.0.1
What's Changed
- fixes bug where try count wasn't displayed in web UI
- web UI now allows sorting tasks by queue time in ascending or descending order
Full Changelog: v5.0.0...v5.0.1