import sys
from typing import Any, Callable, Optional, TypeVar, Union
if sys.version_info < (3, 12): # pragma: no cover
from typing_extensions import override
else: # pragma: no cover
from typing import override
from ..exceptions import SqlAlchemyDLockDatabaseError
from ..statement.mysql import LOCK, UNLOCK
from ..typing import AsyncConnectionOrSessionT, ConnectionOrSessionT
from .base import AbstractLockMixin, BaseAsyncSadLock, BaseSadLock
MYSQL_LOCK_NAME_MAX_LENGTH = 64
ConvertibleKT = Union[bytes, bytearray, memoryview, str, int, float]
KT = Any
KTV = TypeVar("KTV", bound=KT)
[docs]
class MysqlSadLockMixin(AbstractLockMixin[KTV, str]):
"""A Mix-in class for MySQL named lock"""
@override
def __init__(self, *, key: KTV, convert: Optional[Callable[[KTV], str]] = None, **kwargs):
"""
Args:
key: MySQL named lock requires the key given by string.
If ``key`` is not a :class:`str`:
- When :class:`bytes` or alike, the constructor tries to decode it with default encoding::
key = key.decode()
- Otherwise the constructor force convert it to :class:`str`::
key = str(key)
- Or you can specify a ``convert`` function to that argument
convert: Custom function to covert ``key`` to required data type.
Example:
::
def convert(value) -> str:
# get a string key by `value`
return the_string_covert_from_value
"""
if convert:
self._actual_key = convert(key)
else:
self._actual_key = self.convert(key)
if not isinstance(self._actual_key, str):
raise TypeError("MySQL named lock requires the key given by string")
if len(self._actual_key) > MYSQL_LOCK_NAME_MAX_LENGTH:
raise ValueError(f"MySQL enforces a maximum length on lock names of {MYSQL_LOCK_NAME_MAX_LENGTH} characters.")
[docs]
@override
def get_actual_key(self) -> str:
"""The actual key used in MySQL named lock"""
return self._actual_key
[docs]
@classmethod
def convert(cls, k: ConvertibleKT) -> str:
"""The default key converter for MySQL named lock"""
if isinstance(k, str):
return k
if isinstance(k, (int, float)):
return str(k)
if isinstance(k, (bytes, bytearray)):
return k.decode()
if isinstance(k, memoryview):
return k.tobytes().decode()
raise TypeError(type(k).__name__)
[docs]
class MysqlSadLock(MysqlSadLockMixin, BaseSadLock[str, ConnectionOrSessionT]):
"""A distributed lock implemented by MySQL named-lock
See Also:
https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html
Caution:
To MySQL locking function, it is even possible for a given session to acquire multiple locks for the same name.
Other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name.
When perform multiple :meth:`.acquire` for a key on the **same** SQLAlchemy connection, latter :meth:`.acquire` will success immediately no wait and never block, it causes cascade lock instead!
""" # noqa: E501
@override
def __init__(self, connection_or_session: ConnectionOrSessionT, key: KT, **kwargs):
"""
Args:
connection_or_session: :attr:`.BaseSadLock.connection_or_session`
key: :attr:`.BaseSadLock.key`
**kwargs: other named parameters pass to :class:`.BaseSadLock` and :class:`.MysqlSadLockMixin`
"""
MysqlSadLockMixin.__init__(self, key=key, **kwargs)
BaseSadLock.__init__(self, connection_or_session, self.actual_key, **kwargs)
[docs]
@override
def do_acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
if block:
# None: set the timeout period to infinite.
if timeout is None:
timeout = -1
# negative value for `timeout` are equivalent to a `timeout` of zero
elif timeout < 0:
timeout = 0
else:
timeout = 0
stmt = LOCK.params(str=self.key, timeout=timeout)
ret_val = self.connection_or_session.execute(stmt).scalar_one()
if ret_val == 1:
return True
elif ret_val == 0:
return False # 直到超时也没有成功锁定
elif ret_val is None: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"An error occurred while attempting to obtain the lock {self.key!r}")
else: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"GET_LOCK({self.key!r}, {timeout}) returns {ret_val}")
[docs]
@override
def do_release(self):
stmt = UNLOCK.params(str=self.key)
ret_val = self.connection_or_session.execute(stmt).scalar_one()
if ret_val == 1:
return
elif ret_val == 0:
raise SqlAlchemyDLockDatabaseError(
f"The named lock {self.key!r} was not established by this thread, and the lock is not released."
)
elif ret_val is None:
raise SqlAlchemyDLockDatabaseError(
f"The named lock {self.key!r} did not exist, "
"was never obtained by a call to GET_LOCK(), "
"or has previously been released."
)
else: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"RELEASE_LOCK({self.key!r}) returns {ret_val}")
[docs]
class MysqlAsyncSadLock(MysqlSadLockMixin, BaseAsyncSadLock[str, AsyncConnectionOrSessionT]):
"""Async IO version of :class:`MysqlSadLock`"""
@override
def __init__(self, connection_or_session: AsyncConnectionOrSessionT, key: KT, **kwargs):
MysqlSadLockMixin.__init__(self, key=key, **kwargs)
BaseAsyncSadLock.__init__(self, connection_or_session, self.actual_key, **kwargs)
[docs]
@override
async def do_acquire(self, block: bool = True, timeout: Union[float, int, None] = None, *args, **kwargs) -> bool:
if block:
# None: set the timeout period to infinite.
if timeout is None:
timeout = -1
# negative value for `timeout` are equivalent to a `timeout` of zero
elif timeout < 0:
timeout = 0
else:
timeout = 0
stmt = LOCK.params(str=self.key, timeout=timeout)
ret_val = (await self.connection_or_session.execute(stmt)).scalar_one()
if ret_val == 1:
return True
elif ret_val == 0:
return False # 直到超时也没有成功锁定
elif ret_val is None: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"An error occurred while attempting to obtain the lock {self.key!r}")
else: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"GET_LOCK({self.key!r}, {timeout}) returns {ret_val}")
[docs]
@override
async def do_release(self):
stmt = UNLOCK.params(str=self.key)
ret_val = (await self.connection_or_session.execute(stmt)).scalar_one()
if ret_val == 1:
return
elif ret_val == 0:
raise SqlAlchemyDLockDatabaseError(
f"The named lock {self.key!r} was not established by this thread, and the lock is not released."
)
elif ret_val is None:
raise SqlAlchemyDLockDatabaseError(
f"The named lock {self.key!r} did not exist, "
"was never obtained by a call to GET_LOCK(), "
"or has previously been released."
)
else: # pragma: no cover
raise SqlAlchemyDLockDatabaseError(f"RELEASE_LOCK({self.key!r}) returns {ret_val}")