Source code for sqlalchemy_dlock.asyncio.lock.postgresql
importasyncioimportsysfromtimeimporttimefromtypingimportUnionfromwarningsimportcatch_warnings,warnifsys.version_info<(3,12):# pragma: no coverfromtyping_extensionsimportoverrideelse:# pragma: no coverfromtypingimportoverridefrom...exceptionsimportSqlAlchemyDLockDatabaseErrorfrom...lock.postgresqlimportPostgresqlSadLockMixinfrom...statement.postgresqlimportSLEEP_INTERVAL_DEFAULT,SLEEP_INTERVAL_MINfrom..typesimportTAsyncConnectionOrSessionfrom.baseimportBaseAsyncSadLock
[docs]@overrideasyncdefacquire(self,block:bool=True,timeout:Union[float,int,None]=None,interval:Union[float,int,None]=None,*args,**kwargs,)->bool:ifself._acquired:raiseValueError("invoked on a locked lock")ifblock:iftimeoutisNone:# None: set the timeout period to infinite._=(awaitself.connection_or_session.execute(self._stmt_lock)).all()self._acquired=Trueelse:# negative value for `timeout` are equivalent to a `timeout` of zero.iftimeout<0:timeout=0interval=SLEEP_INTERVAL_DEFAULTifintervalisNoneelseintervalifinterval<SLEEP_INTERVAL_MIN:# pragma: no coverraiseValueError("interval too small")ts_begin=time()whileTrue:ret_val=(awaitself.connection_or_session.execute(self._stmt_try_lock)).scalar_one()ifret_val:# succeedself._acquired=Truebreakiftime()-ts_begin>timeout:# expiredbreakawaitasyncio.sleep(interval)else:# This will either obtain the lock immediately and return true,# or return false without waiting if the lock cannot be acquired immediately.ret_val=(awaitself.connection_or_session.execute(self._stmt_try_lock)).scalar_one()self._acquired=bool(ret_val)#returnself._acquired
[docs]@overrideasyncdefrelease(self):ifself._stmt_unlockisNone:warn("PostgreSQL transaction level advisory locks are held until the current transaction ends; there is no provision for manual release.",RuntimeWarning,)returnifnotself._acquired:raiseValueError("invoked on an unlocked lock")ret_val=(awaitself.connection_or_session.execute(self._stmt_unlock)).scalar_one()ifret_val:self._acquired=Falseelse:# pragma: no coverself._acquired=FalseraiseSqlAlchemyDLockDatabaseError(f"The advisory lock {self.key!r} was not held.")