Source code for sqlalchemy_dlock.asyncio.lock.mysql
importsysfromtypingimportUnionifsys.version_info<(3,12):# pragma: no coverfromtyping_extensionsimportoverrideelse:# pragma: no coverfromtypingimportoverridefrom...exceptionsimportSqlAlchemyDLockDatabaseErrorfrom...lock.mysqlimportMysqlSadLockMixinfrom...statement.mysqlimportLOCK,UNLOCKfrom..typesimportTAsyncConnectionOrSessionfrom.baseimportBaseAsyncSadLock
[docs]@overrideasyncdefacquire(self,block:bool=True,timeout:Union[float,int,None]=None,*args,**kwargs)->bool:ifself._acquired:raiseValueError("invoked on a locked lock")ifblock:# None: set the timeout period to infinite.iftimeoutisNone:timeout=-1# negative value for `timeout` are equivalent to a `timeout` of zeroeliftimeout<0:timeout=0else:timeout=0stmt=LOCK.params(str=self.key,timeout=timeout)ret_val=(awaitself.connection_or_session.execute(stmt)).scalar_one()ifret_val==1:self._acquired=Trueelifret_val==0:pass# 直到超时也没有成功锁定elifret_valisNone:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"An error occurred while attempting to obtain the lock {self.key!r}")else:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"GET_LOCK({self.key!r}, {timeout}) returns {ret_val}")returnself._acquired
[docs]@overrideasyncdefrelease(self):ifnotself._acquired:raiseValueError("invoked on an unlocked lock")stmt=UNLOCK.params(str=self.key)ret_val=(awaitself.connection_or_session.execute(stmt)).scalar_one()ifret_val==1:self._acquired=Falseelifret_val==0:# pragma: no coverself._acquired=FalseraiseSqlAlchemyDLockDatabaseError(f"The named lock {self.key!r} was not established by this thread, ""and the lock is not released.")elifret_valisNone:# pragma: no coverself._acquired=FalseraiseSqlAlchemyDLockDatabaseError(f"The named lock {self.key!r} did not exist, ""was never obtained by a call to GET_LOCK(), ""or has previously been released.")else:# pragma: no coverraiseSqlAlchemyDLockDatabaseError(f"RELEASE_LOCK({self.key!r}) returns {ret_val}")