import sys
import warnings
from abc import ABC, abstractmethod
from threading import local
from typing import Callable, Generic, Optional, TypeVar, Union, final
if sys.version_info >= (3, 11): # pragma: no cover
from typing import Self
else: # pragma: no cover
from typing_extensions import Self
if sys.version_info < (3, 12): # pragma: no cover
from typing_extensions import override
else: # pragma: no cover
from typing import override
from ..typing import AsyncConnectionOrSessionT, ConnectionOrSessionT
KeyTV = TypeVar("KeyTV")
ActualKeyTV = TypeVar("ActualKeyTV")
ConnectionTV = TypeVar("ConnectionTV", bound=ConnectionOrSessionT)
AsyncConnectionTV = TypeVar("AsyncConnectionTV", bound=AsyncConnectionOrSessionT)
[docs]
class AbstractLockMixin(Generic[KeyTV, ActualKeyTV], ABC):
@abstractmethod
def __init__(self, *, key: KeyTV, convert: Optional[Callable[[KeyTV], ActualKeyTV]] = None, **kwargs):
raise NotImplementedError()
[docs]
@abstractmethod
def get_actual_key(self) -> ActualKeyTV:
raise NotImplementedError()
@final
@property
def actual_key(self) -> ActualKeyTV:
return self.get_actual_key()
[docs]
class BaseSadLock(AbstractLockMixin, Generic[KeyTV, ConnectionTV], local, ABC):
"""Base class of database lock implementation
Note:
* It's Thread-Local (:class:`threading.local`)
* It's an abstract class, do not manual instantiate
The :meth:`acquire` and :meth:`release` methods can be used as context managers for a :keyword:`with` statement.
:meth:`acquire` will be called when the block is entered, and :meth:`release` will be called when the block is exited.
Hence, the following snippet::
with some_lock:
# do something...
pass
is equivalent to::
some_lock.acquire()
try:
# do something...
pass
finally:
some_lock.release()
Note:
A :exc:`TimeoutError` will be thrown if acquire timeout in :keyword:`with` statement.
"""
@override
def __init__(
self, connection_or_session: ConnectionTV, key: KeyTV, /, contextual_timeout: Union[float, int, None] = None, **kwargs
):
"""
Args:
connection_or_session: Connection or Session object SQL locking functions will be invoked on it
key: ID or name of the SQL locking function
contextual_timeout: Timeout(seconds) for Context Managers.
When called in a :keyword:`with` statement, the new created lock object will pass it to ``timeout`` argument of :meth:`.BaseSadLock.acquire`.
Attention:
**ONLY** affects :keyword:`with` statements.
Example:
::
try:
with create_sadlock(conn, k, contextual_timeout=5) as lck:
# do something...
pass
except TimeoutError:
# can not acquire after 5 seconds
pass
Note:
The default value of `timeout` is still :data:`None`, when invoking :meth:`.acquire`
"""
self._acquired = False
self._connection_or_session = connection_or_session
self._key = key
self._contextual_timeout = contextual_timeout
@final
def __enter__(self) -> Self:
if self._contextual_timeout is None: # timeout period is infinite
self.acquire()
elif not self.acquire(timeout=self._contextual_timeout): # the timeout period has elapsed and not acquired
raise TimeoutError()
return self
@final
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
@final
def __str__(self) -> str:
return "<{} {} key={} at 0x{:x}>".format(
"locked" if self._acquired else "unlocked",
self.__class__.__name__,
self._key,
id(self),
)
@final
@property
def connection_or_session(self) -> ConnectionTV:
"""Connection or Session object SQL locking functions will be invoked on it
It returns ``connection_or_session`` parameter of the class's constructor.
"""
return self._connection_or_session
@final
@property
def key(self) -> KeyTV:
"""ID or name of the SQL locking function
It returns ``key`` parameter of the class's constructor"""
return self._key
@final
@property
def locked(self) -> bool:
"""locked/unlocked state property
:data:`True` if the lock is acquired, else :data:`False`
"""
return self._acquired
[docs]
@final
def acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
"""Acquire the lock in blocking or non-blocking mode.
The implementation (:meth:`do_acquire`) should provide the following behavior:
* When ``block`` is :data:`True` (the default), the method blocks until the lock is in an unlocked state,
then sets it to locked and returns :data:`True`.
* When ``block`` is :data:`False`, the method call is non-blocking.
If the lock is currently locked, it returns :data:`False`; otherwise, it sets the lock to locked state and returns :data:`True`.
* When invoked with a positive floating-point value for ``timeout``, it blocks for at most the specified number
of seconds until the lock can be acquired.
* Invocations with a negative ``timeout`` value are equivalent to a ``timeout`` of zero.
* When ``timeout`` is ``None`` (the default), the timeout period is infinite.
The ``timeout`` parameter has no effect when ``block`` is :data:`False` and is thus ignored.
* Returns :data:`True` if the lock has been acquired or :data:`False` if the timeout period has elapsed.
"""
if self._acquired:
raise ValueError("invoked on a locked lock")
self._acquired = self.do_acquire(block, timeout, *args, **kwargs)
return self._acquired
[docs]
@abstractmethod
def do_acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
raise NotImplementedError()
[docs]
@final
def release(self, *args, **kwargs):
"""Release the lock.
Since the class is thread-local, this method cannot be called from another thread or process,
nor can it be called from another connection.
(Although PostgreSQL's shared advisory lock supports this).
The implementation (:meth:`do_release`) should provide the following behavior:
* Reset the lock to unlocked state and return when the lock is currently locked.
* Allow exactly one of any other threads blocked waiting for the lock to become unlocked to proceed.
* Raise a :class:`ValueError` when invoked on an unlocked lock.
* Not return a value.
"""
if not self._acquired:
raise ValueError("invoked on an unlocked lock")
self.do_release(*args, **kwargs)
self._acquired = False
[docs]
@abstractmethod
def do_release(self, *args, **kwargs):
raise NotImplementedError()
[docs]
@final
def close(self, *args, **kwargs):
"""Same as :meth:`release`
Except that the :class:`ValueError` is **NOT** raised when invoked on an unlocked lock.
An invocation of this method is equivalent to::
if not some_lock.locked:
some_lock.release()
This method maybe useful together with :func:`contextlib.closing`, when we need a :keyword:`with` statement, but don't want it to acquire at the beginning of the block.
Example:
::
# ...
from contextlib import closing
from sqlalchemy_dlock import create_sadlock
# ...
with closing(create_sadlock(some_connection, some_key)) as lock:
# will **NOT** acquire at the begin of with-block
assert not lock.locked
# ...
# lock when need
lock.acquire()
assert lock.locked
# ...
# `close` will be called at the end with-block
assert not lock.locked
"""
if self._acquired:
self.release(*args, **kwargs)
[docs]
class BaseAsyncSadLock(AbstractLockMixin, Generic[KeyTV, AsyncConnectionTV], local, ABC):
"""Async version of :class:`.BaseSadLock`"""
@override
def __init__(
self,
connection_or_session: AsyncConnectionTV,
key: KeyTV,
/,
contextual_timeout: Union[float, int, None] = None,
**kwargs,
):
self._acquired = False
self._connection_or_session = connection_or_session
self._key = key
self._contextual_timeout = contextual_timeout
@final
async def __aenter__(self) -> Self:
if self._contextual_timeout is None:
await self.acquire()
elif not await self.acquire(timeout=self._contextual_timeout):
# the timeout period has elapsed and not acquired
raise TimeoutError()
return self
@final
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.aclose()
@final
def __str__(self):
return "<{} {} key={} at 0x{:x}>".format(
"locked" if self._acquired else "unlocked",
self.__class__.__name__,
self._key,
id(self),
)
@final
@property
def connection_or_session(self) -> AsyncConnectionTV:
return self._connection_or_session
@final
@property
def key(self) -> KeyTV:
return self._key
@final
@property
def locked(self) -> bool:
return self._acquired
[docs]
@final
async def acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
if self._acquired:
raise ValueError("invoked on a locked lock")
self._acquired = await self.do_acquire(block, timeout, *args, **kwargs)
return self._acquired
[docs]
@abstractmethod
async def do_acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
raise NotImplementedError()
[docs]
@final
async def release(self, *args, **kwargs):
if not self._acquired:
raise ValueError("invoked on an unlocked lock")
await self.do_release(*args, **kwargs)
self._acquired = False
[docs]
@abstractmethod
async def do_release(self, *args, **kwargs):
raise NotImplementedError()
[docs]
@final
async def aclose(self, *args, **kwargs):
"""Async close method for compatibility with :func:`contextlib.aclosing` (Python 3.10+).
Example::
from contextlib import aclosing
async with aclosing(create_async_sadlock(some_connection, some_key)) as lock:
# will **NOT** acquire at the begin of with-block
assert not lock.locked
# ...
# lock when need
await lock.acquire()
assert lock.locked
# ...
# `aclose` will be called at the end with-block
assert not lock.locked
.. versionadded:: 0.8.1
"""
if self._acquired:
await self.release(*args, **kwargs)
[docs]
@final
async def close(self, *args, **kwargs):
""".. deprecated:: 0.8.1
Use :meth:`aclose` instead. Will be removed in 0.9.0.
"""
warnings.warn(
"The 'close' method is deprecated and will be removed in 0.9.0. Use 'aclose' instead.",
DeprecationWarning,
stacklevel=2,
)
return await self.aclose(*args, **kwargs)