Skip to content

Releases: tastyware/streaq

tastyware/streaq:v6.1.0

13 Feb 20:41
12341d4

Choose a tag to compare

What's Changed

Full Changelog: v6.0.2...v6.1.0

tastyware/streaq:v6.0.2

11 Feb 20:44

Choose a tag to compare

What's Changed

  • uvloop was a required dependency. But this doesn't make much sense for a library that supports Trio. Now optional and added a docs section about ways to optimize performance.
  • Tests now run on Trio, asyncio without uvloop and asyncio+uvloop. Additionally Python 3.14 free-threaded has been added to the matrix.
  • Add a new worker parameter, grace_period, which will give tasks time to finish after receiving a signal but before doing a hard shutdown.

Full Changelog: v6.0.1...v6.0.2

tastyware/streaq:v6.0.1

09 Feb 18:37

Choose a tag to compare

What's Changed

  • Add Trio support (pending anyio support in coredis) by @Graeme22 in #80
    This is a large change, powered by simultaneous changes to the coredis dependency. Here's a summary of the changes:

    • The library now works with both asyncio and Trio. You can change the backend by passing anyio_backend to workers:
      # asyncio is the default
      worker = Worker(anyio_backend="trio", anyio_kwargs={"restrict_keyboard_interrupt_to_checkpoints": True})
    • streaQ now uses structured concurrency everywhere for more predictability. If you haven't read Nathaniel Smith's article on structured concurrency, go read it now--it's a fantastic explanation of the motivations and an introduction to some of the concepts.
    • As a consequence of SC, async context managers for the worker are back! You'll need to use the context manager for any operation that involves communicating with Redis:
      async with worker:
          task = await sleeper.enqueue()
          print(await task.result(3))
      This means the recommended way of integrating with FastAPI has changed a bit:
      @asynccontextmanager
      async def app_lifespan(app: FastAPI) -> AsyncGenerator[None]:
          async with worker:
              yield
      
      app = FastAPI(lifespan=app_lifespan)
      See here for more details.
  • dynamic cron jobs by @Graeme22 in #117
    Aside from defining cron jobs with the decorator, you can also schedule tasks dynamically:

    task = await sleeper.enqueue(1).start(schedule="*/5 * * * *")  # every 5 minutes

    This causes the task to run repeatedly with the given arguments at the given schedule. To stop scheduling a repeating task, you can use:

    await task.unschedule()
    # OR
    await worker.unschedule_by_id(task.id)
  • Modular changes by @Graeme22 in #125
    Adds new function, Worker.include(), which simply modifies the included worker's tasks to point to it instead. At its face this sounds like it could lead to problems, but since a) Only one worker is running per process at a time; and b) Workers that are included in other workers are not aware of that at import time, practically this is a clean and easy solution. To make this a bit more concrete:

    from streaq import Worker
    
    child = Worker()
    
    @child.task()
    async def foobar() -> None: ...
    from streaq import Worker
    
    from child_module import child
    
    parent = Worker()
    parent.include(child)

    Here, we can actually run either worker independently without issues: streaq run child_module:child and streaq run parent_module:parent should both work fine, because when running just the child, the parent.include(child) line will never run.

    The other element of this PR that is quite crucial is detaching the ability to access task and worker context from the worker instance itself. Under the hood this is done with a couple global ContextVars, but in practice it uses a clean, FastAPI-like dependency injection pattern:

    from streaq import TaskContext, TaskDepends, WorkerDepends
    from typing import NamedTuple
    
    class WorkerContext(NamedTuple):
        db_connection: MyDBDriver
    
    @worker.task()
    async def has_dependencies(
        task_context: TaskContext = TaskDepends(),
        worker_context: WorkerContext = WorkerDepends(),
    ) -> None:
        print(task_context.task_id)
        print(worker_context.db_connection)

    What this enables that was not previously possible is a total separation of concerns. Task definitions can now access dependencies without having to know anything about the top level Worker they will eventually be included in. This replaces the previous Worker.context and Worker.task_context() functions altogether.

  • CLI makeover: the streaq command has been split into two subcommands, streaq run and streaq web for running a worker and the web UI respectively. As a bonus this allowed for getting some extra coverage that was previously marked with # pragma: no cover.

  • Typing for pipelined tasks is drastically better for both Task.then() and Task.__or__(), with some fancy overloads and a TypeVarTuple that allow for nearly perfect typing of the complex scenario where results from one task are fed into the next.

  • Workers no longer attempt to re-enqueue tasks on shutdown--instead, we assume all shutdowns are unsafe, and allow Worker.idle_timeout to handle retrying. The default idle timeout has been decreased from 5 minutes to 1 minute to reflect this change.

  • Redis scripts are now Redis functions in a single Lua library file. This means we now use FCALL instead of EVALSHA, which slightly increases performance in pipelines. This also means Redis versions less than 7.0 are no longer supported.

  • Task abortion now handles delayed tasks better, aborting them right away instead of marking them and leaving them untouched

  • TaskResult now has slightly better typing. Instead of accessing TaskResult.result which was either the task's output or any raised exception, TaskResult.result now only contains the task's output and errors if you access it when TaskResult.success is False. Exceptions are now contained in TaskResult.exception, and errors if you access it when TaskResult.success is True.

    async with worker:
        result = await task.result(3)
        if result.success:
            print(result.result)  # this is present only if success
        else:
            raise result.exception  # this is present only if not success
  • Add date in logs by @nsteinmetz in #91

  • Ability to set custom formatters for task results and exceptions in Web UI by @espdev in #95

  • Update docs for SAQ comparison: abort support by @vikigenius in #100

  • Custom worker id feature by @espdev in #101

  • Place sleep after the first healthcheck, not before it. by @espdev in #104

  • Add the info about exception traceback serialization with tblib to the docs by @espdev in #105

  • Improve the user experience when running tests by @espdev in #107

  • StreaqRetry, StreaqError, and StreaqCancelled have been moved from utils to the streaq.types module.

  • Worker can now be initialized from a coredis.ConnectionPool instead of a connection URI.

  • The Redis health check cron job is now registered at worker startup instead of in the constructor.

  • Configure Renovate by @renovate[bot] in #133

  • Redis Sentinel now in test coverage everywhere a worker is used

  • Tests now use a Docker compose cluster

  • Added "Contributing" page to docs

  • Benchmarks updated

  • Web UI no longer uses alru_cache since it's not compatible with anyio

New Contributors

Full Changelog: v5.2.2...v6.0.1

tastyware/streaq:v6.0.0b2

30 Jan 21:55

Choose a tag to compare

Pre-release

What's Changed

  • Ability to set custom formatters for task results and exceptions in Web UI by @espdev in #95

    • Show task result/exception info in the adaptive scroll area
    • Add dependency functions get_result_formatter and get_exception_formatter to customize result/exception output
  • Bump astral-sh/setup-uv from 6 to 7 by @dependabot[bot] in #96

  • Update docs for SAQ comparison: abort support by @vikigenius in #100

  • Custom worker id feature by @espdev in #101
    The PR adds ability to set a custom worker id.

  • Place sleep after the first healthcheck, not before it. by @espdev in #104

  • Add the info about exception traceback serialization with tblib to the docs by @espdev in #105

  • Improve the user experience when running tests by @espdev in #107

  • Bump actions/checkout from 5 to 6 by @dependabot[bot] in #110

  • dynamic cron jobs by @Graeme22 in #117
    Moves cron registry to Redis. This allows for dynamically adding/removing cron jobs. RegisteredCron is collapsed into RegisteredTask.

  • Bump python-multipart from 0.0.20 to 0.0.22 by @dependabot[bot] in #120

  • Bump urllib3 from 2.5.0 to 2.6.3 by @dependabot[bot] in #121

  • Bump starlette from 0.48.0 to 0.49.1 by @dependabot[bot] in #122

  • Bump filelock from 3.19.1 to 3.20.3 by @dependabot[bot] in #123

  • Modular changes by @Graeme22 in #125
    Instead of each worker maintaining a list of "coworkers" whose context managers would also have to be entered (which led to a ton of duplicate Redis clients and some messy code with AsyncExitStack), Worker.include() simply modifies the included worker's tasks to point to it instead. At its face this sounds like it could lead to problems, but since a) Only one worker is running per process at a time; and b) Workers that are included in other workers are not aware of that at import time, practically this is a clean and easy solution. To make this a bit more concrete:

    from streaq import Worker
    
    child = Worker()
    
    @child.task()
    async def foobar() -> None: ...
    from streaq import Worker
    
    from child_module import child
    
    parent = Worker()
    parent.include(child)

    Here, we can actually run either worker independently without issues: streaq run child_module:child and streaq run parent_module:parent should both work fine, because when running just the child, the parent.include(child) line will never run.

    The other element of this PR that is quite crucial is detaching the ability to access task and worker context from the worker instance itself. Under the hood this is done with a couple global ContextVars, but in practice it uses a clean, FastAPI-like dependency injection pattern:

    from streaq import TaskContext, TaskDepends, WorkerDepends
    from typing import NamedTuple
    
    class WorkerContext(NamedTuple):
        db_connection: MyDBDriver
    
    @worker.task()
    async def has_dependencies(
        task_context: TaskContext = TaskDepends(),
        worker_context: WorkerContext = WorkerDepends(),
    ) -> None:
        print(task_context.task_id)
        print(worker_context.db_connection)

    What this enables that was not previously possible is a total separation of concerns. Task definitions can now access dependencies without having to know anything about the top level Worker they will eventually be included in. This replaces the previous Worker.context and Worker.task_context() functions altogether. Since coredis 6.0.0 is around the corner, this will likely be the last breaking change for the foreseeable future.

    Finally, this PR adds a few other things in preparation for the v6.0.0 release:

    • CLI makeover: the streaq command has been split into two subcommands, streaq run and streaq web for running a worker and the web UI respectively. As a bonus this allowed for getting some extra coverage that was previously marked with # pragma: no cover.
    • Typing for pipelined tasks is drastically better for both Task.then() and Task.__or__(), which some fancy overloads and a TypeVarTuple that allow for nearly perfect typing of the complex scenario where results from one task are fed into the next.
    • StreaqRetry, StreaqError, and StreaqCancelled have been moved from utils to the streaq.types module.
    • Worker can now be initialized from a coredis.ConnectionPool instead of a connection URI.
    • The Redis health check cron job is now registered at worker startup instead of in the constructor.

New Contributors

Full Changelog: v6.0.0b1...v6.0.0b2

tastyware/streaq:v6.0.0b1

25 Sep 21:23

Choose a tag to compare

Pre-release

What's Changed

  • Add Trio support (pending anyio support in coredis) by @Graeme22 in #80
    This is a large change, powered by simultaneous changes to coredis, the client used behind the scenes. This release is beta, pending the release of the next major coredis version. Here's a summary of the changes:
    • The library now works with both asyncio and Trio. You can change the backend by passing anyio_backend to workers:
      # asyncio is the default
      worker = Worker(anyio_backend="trio", anyio_kwargs={"restrict_keyboard_interrupt_to_checkpoints": True})
    • streaQ now uses structured concurrency everywhere for more predictability. If you haven't read Nathaniel Smith's article on structured concurrency, go read it now--it's a fantastic explanation of the motivations and an introduction to some of the concepts.
    • As a consequence of SC, async context managers for the worker are back! You'll need to use the context manager for any operation that involves communicating with Redis:
      async with worker:
          task = await sleeper.enqueue()
          print(await task.result(3))
      This means the recommended way of integrating with FastAPI has changed a bit:
      @asynccontextmanager
      async def app_lifespan(app: FastAPI) -> AsyncGenerator[None]:
          async with worker:
              yield
      
      app = FastAPI(lifespan=app_lifespan)
      See here for more details.
  • Workers no longer attempt to re-enqueue tasks on shutdown--instead, we assume all shutdowns are unsafe, and allow Worker.idle_timeout to handle retrying. The default idle timeout has been decreased from 5 minutes to 1 minute to reflect this change.
  • Redis scripts are now Redis functions in a single Lua library file. This means we now use FCALL instead of EVALSHA, which slightly increases performance in pipelines. This also means Redis versions less than 7.0 are no longer supported.
  • Task abortion now handles delayed tasks better, aborting them right away instead of marking them and leaving them untouched
  • TaskResult now has slightly better typing. Instead of accessing TaskResult.result which was either the task's output or any raised exception, TaskResult.result now only contains the task's output and errors if you access it when TaskResult.success is False. Exceptions are now contained in TaskResult.exception, and errors if you access it when TaskResult.success is True.
    async with worker:
        result = await task.result(3)
        if result.success:
            print(result.result)  # this is present only if success
        else:
            raise result.exception  # this is present only if not success
  • Add date in logs by @nsteinmetz in #91
    Logs now contain date as well as time. Timezone can be customized as before.
  • Dependencies bumped, now using dependabot (#81, #86)
  • Redis Sentinel now in test coverage everywhere a worker is used
  • Tests now use a Docker compose cluster
  • Added "Contributing" page to docs
  • Benchmarks updated
  • Web UI no longer uses alru_cache since it's not compatible with anyio

New Contributors

Full Changelog: v5.2.2...v6.0.0b1

tastyware/streaq:v5.2.2

28 Aug 22:01
4436e17

Choose a tag to compare

What's Changed

New Contributors

Full Changelog: v5.2.1...v5.2.2

tastyware/streaq:v5.2.1

25 Aug 20:57

Choose a tag to compare

What's Changed

  • Fixes a bug that came up in production where a unique task may not get retried after an abrupt shutdown. Since uniqueness is controlled via a lock key in Redis, a task that is shut down abruptly may not get the chance to release the lock. This wouldn't have been a problem, except that retries of an abruptly aborted task were incorrectly prevented from obtaining the lock. This was made worse by the fact that the lock key may not have a timeout (since it inherits from the task's timeout), meaning a unique task could be locked out from running indefinitely.
  • Small docs updates

Full Changelog: v5.2.0...v5.2.1

tastyware/streaq:v5.2.0

14 Aug 20:37

Choose a tag to compare

What's Changed

  • Do not install uvloop on Windows by @espdev in #71
  • Use XAUTOCLAIM for timeout; separate out Lua; schedule option for StreaqRetry by @Graeme22 in #72
    • StreaqRetry can now schedule retries at a specific time:
      raise StreaqRetry("Oh no!", schedule=datetime.now() + timedelta(hours=3))
    • Idle task reclamation is now simpler and more robust, built on Redis' XAUTOCLAIM command. Pessimistic execution guarantees are stronger: no setting of a per-task timeout is required, and long-running tasks can be retried much sooner upon failure. Credit goes to Christopher Schleiden for the idea of repeatedly calling XCLAIM for running tasks, which resets their idle timeouts and allows for using XAUTOCLAIM even when tasks have varying durations. Overall, performance should be slightly improved and robustness as well.
    • Lua scripts for Redis are now contained in the lua/ directory as actual Lua files for easier reading and debugging. They've also been optimized and commented.
    • CI now uses uv for publishing to PyPI/TestPyPI
    • The warning for no longer using the async context manager has been removed, as coredis version 6 may require async initialization of the Redis client, which would mean the async syntax would need to be restored.

Full Changelog: v5.1.0...v5.2.0

tastyware/streaq:v5.1.0

02 Aug 00:22
ea27afc

Choose a tag to compare

What's Changed

Full Changelog: v5.0.1...v5.1.0

tastyware/streaq:v5.0.1

01 Aug 17:35

Choose a tag to compare

What's Changed

  • fixes bug where try count wasn't displayed in web UI
  • web UI now allows sorting tasks by queue time in ascending or descending order

Full Changelog: v5.0.0...v5.0.1