Source code for sqlalchemy_dlock.lock.base

import sys
from abc import ABC, abstractmethod
from threading import local
from typing import Callable, Generic, Optional, TypeVar, Union

if sys.version_info >= (3, 11):  # pragma: no cover
    from typing import Self
else:  # pragma: no cover
    from typing_extensions import Self

if sys.version_info < (3, 12):  # pragma: no cover
    from typing_extensions import override
else:  # pragma: no cover
    from typing import override

from ..typing import AsyncConnectionOrSessionT, ConnectionOrSessionT

KeyTV = TypeVar("KeyTV")
ActualKeyTV = TypeVar("ActualKeyTV")
ConnectionTV = TypeVar("ConnectionTV", bound=ConnectionOrSessionT)
AsyncConnectionTV = TypeVar("AsyncConnectionTV", bound=AsyncConnectionOrSessionT)


[docs] class AbstractLockMixin(Generic[KeyTV, ActualKeyTV], ABC): @abstractmethod def __init__(self, *, key: KeyTV, convert: Optional[Callable[[KeyTV], ActualKeyTV]] = None, **kwargs): raise NotImplementedError()
[docs] @abstractmethod def get_actual_key(self) -> ActualKeyTV: raise NotImplementedError()
@property def actual_key(self) -> ActualKeyTV: return self.get_actual_key()
[docs] class BaseSadLock(AbstractLockMixin, Generic[KeyTV, ConnectionTV], local, ABC): """Base class of database lock implementation Note: * It's Thread-Local (:class:`threading.local`) * It's an abstract class, do not manual instantiate The :meth:`acquire` and :meth:`release` methods can be used as context managers for a :keyword:`with` statement. :meth:`acquire` will be called when the block is entered, and :meth:`release` will be called when the block is exited. Hence, the following snippet:: with some_lock: # do something... pass is equivalent to:: some_lock.acquire() try: # do something... pass finally: some_lock.release() Note: A :exc:`TimeoutError` will be thrown if acquire timeout in :keyword:`with` statement. """ @override def __init__( self, connection_or_session: ConnectionTV, key: KeyTV, /, contextual_timeout: Union[float, int, None] = None, **kwargs ): """ Args: connection_or_session: Connection or Session object SQL locking functions will be invoked on it key: ID or name of the SQL locking function contextual_timeout: Timeout(seconds) for Context Managers. When called in a :keyword:`with` statement, the new created lock object will pass it to ``timeout`` argument of :meth:`.BaseSadLock.acquire`. Attention: **ONLY** affects :keyword:`with` statements. Example: :: try: with create_sadlock(conn, k, contextual_timeout=5) as lck: # do something... pass except TimeoutError: # can not acquire after 5 seconds pass Note: The default value of `timeout` is still :data:`None`, when invoking :meth:`.acquire` """ self._acquired = False self._connection_or_session = connection_or_session self._key = key self._contextual_timeout = contextual_timeout def __enter__(self) -> Self: if self._contextual_timeout is None: # timeout period is infinite self.acquire() elif not self.acquire(timeout=self._contextual_timeout): # the timeout period has elapsed and not acquired raise TimeoutError() return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() def __str__(self) -> str: return "<{} {} key={} at 0x{:x}>".format( "locked" if self._acquired else "unlocked", self.__class__.__name__, self._key, id(self), ) @property def connection_or_session(self) -> ConnectionTV: """Connection or Session object SQL locking functions will be invoked on it It returns ``connection_or_session`` parameter of the class's constructor. """ return self._connection_or_session @property def key(self) -> KeyTV: """ID or name of the SQL locking function It returns ``key`` parameter of the class's constructor""" return self._key @property def locked(self) -> bool: """locked/unlocked state property :data:`True` if the lock is acquired, else :data:`False` """ return self._acquired
[docs] @abstractmethod def acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool: """Acquire the lock in blocking or non-blocking mode. The implementation should provide the following behavior: * When ``block`` is :data:`True` (the default), the method blocks until the lock is in an unlocked state, then sets it to locked and returns :data:`True`. * When ``block`` is :data:`False`, the method call is non-blocking. If the lock is currently locked, it returns :data:`False`; otherwise, it sets the lock to locked state and returns :data:`True`. * When invoked with a positive floating-point value for ``timeout``, it blocks for at most the specified number of seconds until the lock can be acquired. * Invocations with a negative ``timeout`` value are equivalent to a ``timeout`` of zero. * When ``timeout`` is ``None`` (the default), the timeout period is infinite. The ``timeout`` parameter has no effect when ``block`` is :data:`False` and is thus ignored. * Returns :data:`True` if the lock has been acquired or :data:`False` if the timeout period has elapsed. """ raise NotImplementedError()
[docs] @abstractmethod def release(self, *args, **kwargs) -> None: """Release the lock. Since the class is thread-local, this method cannot be called from another thread or process, nor can it be called from another connection. (Although PostgreSQL's shared advisory lock supports this). The implementation should provide the following behavior: * Reset the lock to unlocked state and return when the lock is currently locked. * Allow exactly one of any other threads blocked waiting for the lock to become unlocked to proceed. * Raise a :class:`ValueError` when invoked on an unlocked lock. * Not return a value. """ raise NotImplementedError()
[docs] def close(self, *args, **kwargs) -> None: """Same as :meth:`release` Except that the :class:`ValueError` is **NOT** raised when invoked on an unlocked lock. An invocation of this method is equivalent to:: if not some_lock.locked: some_lock.release() This method maybe useful together with :func:`contextlib.closing`, when we need a :keyword:`with` statement, but don't want it to acquire at the beginning of the block. Example: :: # ... from contextlib import closing from sqlalchemy_dlock import create_sadlock # ... with closing(create_sadlock(some_connection, some_key)) as lock: # will **NOT** acquire at the begin of with-block assert not lock.locked # ... # lock when need lock.acquire() assert lock.locked # ... # `close` will be called at the end with-block assert not lock.locked """ if self._acquired: self.release(*args, **kwargs)
[docs] class BaseAsyncSadLock(AbstractLockMixin, Generic[KeyTV, AsyncConnectionTV], local, ABC): """Async version of :class:`.BaseSadLock`""" def __init__( self, connection_or_session: AsyncConnectionTV, key: KeyTV, /, contextual_timeout: Union[float, int, None] = None, **kwargs, ): self._acquired = False self._connection_or_session = connection_or_session self._key = key self._contextual_timeout = contextual_timeout async def __aenter__(self) -> Self: if self._contextual_timeout is None: await self.acquire() elif not await self.acquire(timeout=self._contextual_timeout): # the timeout period has elapsed and not acquired raise TimeoutError() return self async def __aexit__(self, exc_type, exc_value, exc_tb): await self.close() def __str__(self): return "<{} {} key={} at 0x{:x}>".format( "locked" if self._acquired else "unlocked", self.__class__.__name__, self._key, id(self), ) @property def connection_or_session(self) -> AsyncConnectionTV: return self._connection_or_session @property def key(self) -> KeyTV: return self._key @property def locked(self) -> bool: return self._acquired
[docs] @abstractmethod async def acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool: raise NotImplementedError()
[docs] @abstractmethod async def release(self, *args, **kwargs) -> None: raise NotImplementedError()
[docs] async def close(self, *args, **kwargs) -> None: if self._acquired: await self.release(*args, **kwargs)