Source code for sqlalchemy_dlock.asyncio.lock.base
importsysfromtypingimportGeneric,TypeVar,Unionifsys.version_info>=(3,11):# pragma: no coverfromtypingimportSelfelse:# pragma: no coverfromtyping_extensionsimportSelffrom..typesimportTAsyncConnectionOrSessionTKey=TypeVar("TKey")
[docs]classBaseAsyncSadLock(Generic[TKey]):def__init__(self,connection_or_session:TAsyncConnectionOrSession,key:TKey,/,contextual_timeout:Union[float,int,None]=None,**kwargs,):self._acquired=Falseself._connection_or_session=connection_or_sessionself._key=keyself._contextual_timeout=contextual_timeoutasyncdef__aenter__(self)->Self:ifself._contextual_timeoutisNone:awaitself.acquire()elifnotawaitself.acquire(timeout=self._contextual_timeout):# the timeout period has elapsed and not acquiredraiseTimeoutError()returnselfasyncdef__aexit__(self,exc_type,exc_value,exc_tb):awaitself.close()def__str__(self):return"<{}{} key={} at 0x{:x}>".format("locked"ifself._acquiredelse"unlocked",self.__class__.__name__,self._key,id(self),)@propertydefconnection_or_session(self)->TAsyncConnectionOrSession:returnself._connection_or_session@propertydefkey(self)->TKey:returnself._key@propertydeflocked(self)->bool:returnself._acquired
[docs]asyncdefacquire(self,block:bool=True,timeout:Union[float,int,None]=None,*args,**kwargs)->bool:# pragma: no coverraiseNotImplementedError()
[docs]asyncdefrelease(self,*args,**kwargs)->None:# pragma: no coverraiseNotImplementedError()