import sys
from time import sleep, time
from typing import Any, Callable, Optional, TypeVar, Union
from warnings import catch_warnings, warn
if sys.version_info < (3, 12): # pragma: no cover
from typing_extensions import override
else: # pragma: no cover
from typing import override
from ..exceptions import SqlAlchemyDLockDatabaseError
from ..statement.postgresql import (
LOCK,
LOCK_SHARED,
LOCK_XACT,
LOCK_XACT_SHARED,
SLEEP_INTERVAL_DEFAULT,
SLEEP_INTERVAL_MIN,
TRY_LOCK,
TRY_LOCK_SHARED,
TRY_LOCK_XACT,
TRY_LOCK_XACT_SHARED,
UNLOCK,
UNLOCK_SHARED,
)
from ..types import TConnectionOrSession
from ..utils import ensure_int64, to_int64_key
from .base import BaseSadLock
TKey = TypeVar("TKey", bound=Any)
[docs]
class PostgresqlSadLockMixin:
"""A Mix-in class for PostgreSQL advisory lock"""
def __init__(
self, *, key: TKey, shared: bool = False, xact: bool = False, convert: Optional[Callable[[TKey], int]] = None, **kwargs
):
"""
Args:
key: PostgreSQL advisory lock requires the key given by ``INT64``.
* When ``key`` is :class:`int`, the constructor tries to ensure it to be ``INT64``.
:class:`OverflowError` is raised if too big or too small for that.
* When ``key`` is :class:`str` or :class:`bytes` or alike, the constructor calculates its checksum by :func:`hashlib.blake2b`, and takes the hash result integer value as actual key.
* Or you can specify a ``convert`` function to that argument::
def convert(val: Any) -> int:
int64_key: int = do_sth(val)
return int64_key
shared: :attr:`.shared`
xact: :attr:`.xact`
convert: Custom function to covert ``key`` to required data type.
""" # noqa: E501
if convert:
self._actual_key = ensure_int64(convert(key))
else:
self._actual_key = to_int64_key(key)
#
self._shared = bool(shared)
self._xact = bool(xact)
#
self._stmt_unlock = None
if not shared and not xact:
self._stmt_lock = LOCK.params(key=self._actual_key)
self._stmt_try_lock = TRY_LOCK.params(key=self._actual_key)
self._stmt_unlock = UNLOCK.params(key=self._actual_key)
elif shared and not xact:
self._stmt_lock = LOCK_SHARED.params(key=self._actual_key)
self._stmt_try_lock = TRY_LOCK_SHARED.params(key=self._actual_key)
self._stmt_unlock = UNLOCK_SHARED.params(key=self._actual_key)
elif not shared and xact:
self._stmt_lock = LOCK_XACT.params(key=self._actual_key)
self._stmt_try_lock = TRY_LOCK_XACT.params(key=self._actual_key)
else:
self._stmt_lock = LOCK_XACT_SHARED.params(key=self._actual_key)
self._stmt_try_lock = TRY_LOCK_XACT_SHARED.params(key=self._actual_key)
@property
def shared(self) -> bool:
"""Is the advisory lock shared or exclusive"""
return self._shared
@property
def xact(self) -> bool:
"""Is the advisory lock transaction level or session level"""
return self._xact
[docs]
class PostgresqlSadLock(PostgresqlSadLockMixin, BaseSadLock[int]):
"""A distributed lock implemented by PostgreSQL advisory lock
See also:
https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
Tip:
Locks can be either shared or exclusive: a shared lock does not conflict with other shared locks on the same resource, only with exclusive locks.
Locks can be taken at session level (so that they are held until released or the session ends) or at transaction level (so that they are held until the current transaction ends; there is no provision for manual release).
Multiple session-level lock requests stack, so that if the same resource identifier is locked three times there must then be three unlock requests to release the resource in advance of session end.
"""
@override
def __init__(self, connection_or_session: TConnectionOrSession, key, **kwargs):
"""
Args:
connection_or_session: see :attr:`.BaseSadLock.connection_or_session`
key: :attr:`.BaseSadLock.key`
shared: :attr:`.PostgresqlSadLockMixin.shared`
xact: :attr:`.PostgresqlSadLockMixin.xact`
convert: :class:`.PostgresqlSadLockMixin`
**kwargs: other named parameters pass to :class:`.BaseSadLock` and :class:`.PostgresqlSadLockMixin`
""" # noqa: E501
PostgresqlSadLockMixin.__init__(self, key=key, **kwargs)
BaseSadLock.__init__(self, connection_or_session, self._actual_key, **kwargs)
@override
def __exit__(self, exc_type, exc_value, exc_tb):
if sys.version_info < (3, 11):
with catch_warnings():
return super().__exit__(exc_type, exc_value, exc_tb)
else:
with catch_warnings(category=RuntimeWarning):
return super().__exit__(exc_type, exc_value, exc_tb)
[docs]
@override
def acquire(
self,
block: bool = True,
timeout: Union[float, int, None] = None,
interval: Union[float, int, None] = None,
*args,
**kwargs,
) -> bool:
"""
See Also:
:meth:`.BaseSadLock.acquire`
Attention:
PostgreSQL's advisory lock has no timeout mechanism in itself.
When ``timeout`` is a non-negative number, we simulate it by **looping** and **sleeping**.
The ``interval`` argument specifies the sleep seconds(``1`` by default).
That is:
The actual timeout won't be precise when ``interval`` is big;
while small ``interval`` will cause high CPU usage and frequent SQL execution.
""" # noqa: E501
if self._acquired:
raise ValueError("invoked on a locked lock")
if block:
if timeout is None:
# None: set the timeout period to infinite.
self.connection_or_session.execute(self._stmt_lock).all()
self._acquired = True
else:
# negative value for `timeout` are equivalent to a `timeout` of zero.
if timeout < 0:
timeout = 0
interval = SLEEP_INTERVAL_DEFAULT if interval is None else interval
if interval < SLEEP_INTERVAL_MIN: # pragma: no cover
raise ValueError("interval too small")
ts_begin = time()
while True:
ret_val = self.connection_or_session.execute(self._stmt_try_lock).scalar_one()
if ret_val: # succeed
self._acquired = True
break
if time() - ts_begin > timeout: # expired
break
sleep(interval)
else:
# This will either obtain the lock immediately and return true,
# or return false without waiting if the lock cannot be acquired immediately.
ret_val = self.connection_or_session.execute(self._stmt_try_lock).scalar_one()
self._acquired = bool(ret_val)
#
return self._acquired
[docs]
@override
def release(self):
if self._stmt_unlock is None:
warn(
"PostgreSQL transaction level advisory locks are held until the current transaction ends; there is no provision for manual release.",
RuntimeWarning,
)
return
if not self._acquired:
raise ValueError("invoked on an unlocked lock")
ret_val = self.connection_or_session.execute(self._stmt_unlock).scalar_one()
if ret_val:
self._acquired = False
else: # pragma: no cover
self._acquired = False
raise SqlAlchemyDLockDatabaseError(f"The advisory lock {self.key!r} was not held.")