Replies: 1 comment
-
"""
Async-compatible utilities for pgtrigger.
pgtrigger.ignore() uses thread-local storage, which doesn't work with async
code because Django's async ORM (asave, adelete, etc.) uses sync_to_async
internally, running DB operations in a different thread than where the context
manager was entered.
This module provides an async-compatible wrapper that ensures the ignore state
is set in the same thread where the DB operation executes.
Inspired by django-pghistory's approach:
https://github.com/AmbitionEng/django-pghistory/blob/main/pghistory/runtime.py
"""
import functools
import inspect
from collections.abc import Awaitable, Callable
from types import TracebackType
from typing import Any, ParamSpec, TypeVar, cast, overload
import pgtrigger
from asgiref.sync import sync_to_async
P = ParamSpec("P")
T = TypeVar("T")
R = TypeVar("R")
class pgtrigger_ignore: # noqa: N801
"""
Async-compatible wrapper for pgtrigger.ignore().
Works as both sync and async context manager, and as a decorator for
both sync and async functions.
Usage as async context manager:
async with pgtrigger_ignore("app.Model:trigger_name"):
await instance.asave()
Usage as sync context manager:
with pgtrigger_ignore("app.Model:trigger_name"):
instance.save()
Usage as decorator on sync functions:
@pgtrigger_ignore("app.Model:trigger_name")
def my_func():
instance.save()
Usage as decorator on async functions:
@pgtrigger_ignore("app.Model:trigger_name")
async def my_func():
await instance.asave()
"""
def __init__(self, *uris: str) -> None:
self._uris = uris
self._context: Any = None
def __enter__(self) -> Any:
self._context = pgtrigger.ignore(*self._uris)
return self._context.__enter__()
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> bool | None:
if self._context is not None:
return self._context.__exit__(exc_type, exc, tb)
return None
async def __aenter__(self) -> Any:
return await sync_to_async(self.__enter__)()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> bool | None:
return await sync_to_async(self.__exit__)(exc_type, exc, tb)
@overload
def __call__(
self,
func: Callable[P, Awaitable[R]],
) -> Callable[P, Awaitable[R]]: ...
@overload
def __call__(
self,
func: Callable[P, T],
) -> Callable[P, T]: ...
def __call__(
self,
func: Callable[P, T] | Callable[P, Awaitable[R]],
) -> Callable[P, T] | Callable[P, Awaitable[R]]:
"""Decorator support for both sync and async functions."""
if inspect.iscoroutinefunction(func):
async_func = cast("Callable[P, Awaitable[R]]", func)
@functools.wraps(async_func)
async def async_wrapper(
*args: P.args,
**kwargs: P.kwargs,
) -> R:
async with pgtrigger_ignore(*self._uris):
return await async_func(*args, **kwargs)
return async_wrapper
sync_func = cast("Callable[P, T]", func)
@functools.wraps(sync_func)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
with pgtrigger_ignore(*self._uris):
return sync_func(*args, **kwargs)
return sync_wrappershould work |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Looks like project is missing async support, similar to what was introduced at AmbitionEng/django-pghistory#220
Beta Was this translation helpful? Give feedback.
All reactions