importsysfromtypingimportAny,Callable,Optional,TypeVar,Unionifsys.version_info<(3,12):# pragma: no coverfromtyping_extensionsimportoverrideelse:# pragma: no coverfromtypingimportoverridefrom..exceptionsimportSqlAlchemyDLockDatabaseErrorfrom..statement.mysqlimportLOCK,UNLOCKfrom..typesimportTConnectionOrSessionfrom..utilsimportto_str_keyfrom.baseimportBaseSadLockMYSQL_LOCK_NAME_MAX_LENGTH=64TKey=TypeVar("TKey",bound=Any)
[docs]classMysqlSadLockMixin:"""A Mix-in class for MySQL named lock"""def__init__(self,*,key:TKey,convert:Optional[Callable[[TKey],str]]=None,**kwargs):""" Args: key: MySQL named lock requires the key given by string. If ``key`` is not a :class:`str`: - When :class:`bytes` or alike, the constructor tries to decode it with default encoding:: key = key.decode() - Otherwise the constructor force convert it to :class:`str`:: key = str(key) - Or you can specify a ``convert`` function to that argument convert: Custom function to covert ``key`` to required data type. Example: :: def convert(value) -> str: # get a string key by `value` return the_string_covert_from_value """# noqa: E501ifconvert:self._actual_key=convert(key)elifisinstance(key,str):self._actual_key=keyelse:self._actual_key=to_str_key(key)ifnotisinstance(self._actual_key,str):raiseTypeError("MySQL named lock requires the key given by string")iflen(self._actual_key)>MYSQL_LOCK_NAME_MAX_LENGTH:raiseValueError(f"MySQL enforces a maximum length on lock names of {MYSQL_LOCK_NAME_MAX_LENGTH} characters.")
[docs]classMysqlSadLock(MysqlSadLockMixin,BaseSadLock[str]):"""A distributed lock implemented by MySQL named-lock See Also: https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html Caution: To MySQL locking function, it is even possible for a given session to acquire multiple locks for the same name. Other sessions cannot acquire a lock with that name until the acquiring session releases all its locks for the name. When perform multiple :meth:`.acquire` for a key on the **same** SQLAlchemy connection, latter :meth:`.acquire` will success immediately no wait and never block, it causes cascade lock instead! """# noqa: E501@overridedef__init__(self,connection_or_session:TConnectionOrSession,key,**kwargs):""" Args: connection_or_session: :attr:`.BaseSadLock.connection_or_session` key: :attr:`.BaseSadLock.key` **kwargs: other named parameters pass to :class:`.BaseSadLock` and :class:`.MysqlSadLockMixin` """MysqlSadLockMixin.__init__(self,key=key,**kwargs)BaseSadLock.__init__(self,connection_or_session,self._actual_key,**kwargs)
[docs]@overridedefacquire(self,block:bool=True,timeout:Union[float,int,None]=None,*args,**kwargs)->bool:ifself._acquired:raiseValueError("invoked on a locked lock")ifblock:# None: set the timeout period to infinite.iftimeoutisNone:timeout=-1# negative value for `timeout` are equivalent to a `timeout` of zeroeliftimeout<0:timeout=0else:timeout=0stmt=LOCK.params(str=self.key,timeout=timeout)ret_val=self.connection_or_session.execute(stmt).scalar_one()ifret_val==1:self._acquired=Trueelifret_val==0:pass# 直到超时也没有成功锁定elifret_valisNone:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"An error occurred while attempting to obtain the lock {self.key!r}")else:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"GET_LOCK({self.key!r}, {timeout}) returns {ret_val}")returnself._acquired
[docs]@overridedefrelease(self):ifnotself._acquired:raiseValueError("invoked on an unlocked lock")stmt=UNLOCK.params(str=self.key)ret_val=self.connection_or_session.execute(stmt).scalar_one()ifret_val==1:self._acquired=Falseelifret_val==0:# pragma: no coverself._acquired=FalseraiseSqlAlchemyDLockDatabaseError(f"The named lock {self.key!r} was not established by this thread, and the lock is not released.")elifret_valisNone:# pragma: no coverself._acquired=FalseraiseSqlAlchemyDLockDatabaseError(f"The named lock {self.key!r} did not exist, ""was never obtained by a call to GET_LOCK(), ""or has previously been released.")else:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"RELEASE_LOCK({self.key!r}) returns {ret_val}")