Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def on_exit():
state["acquired"] = False
if not fut.done():
if not state_fut.done():
self.client.session.state.remove_waiting(
state_fut, states.States.LOST)
fut.cancel()
await self.delete_unique_znode(znode_label)
def make_contextmanager(self, znode_label):
state = {"acquired": True}
def still_acquired():
return state["acquired"]
state_fut = self.client.session.state.wait_for(states.States.LOST)
async def handle_session_loss():
await state_fut
if not state["acquired"]:
return
log.warning(
"Session expired at some point, lock %s no longer acquired.",
self)
state["acquired"] = False
fut = asyncio.ensure_future(handle_session_loss(),
loop=self.client.loop)
async def on_exit():
state["acquired"] = False
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
async def close(self):
if not self.started:
log.debug('Do nothing because session is not started')
return
if self.closing:
return
self.closing = True
if self.repair_loop_task:
self.repair_loop_task.cancel()
await asyncio.wait_for(self.send(protocol.CloseRequest()), self.timeout)
if self.state.current_state != States.LOST:
self.state.transition_to(States.LOST)
if self.conn:
await self.conn.close(self.timeout)
self.closing = False
self.started = False
log = logging.getLogger(__name__)
class States:
CONNECTED = "connected"
SUSPENDED = "suspended"
READ_ONLY = "read_only"
LOST = "lost"
class SessionStateMachine:
valid_transitions = {
(States.LOST, States.CONNECTED),
(States.LOST, States.READ_ONLY),
(States.CONNECTED, States.SUSPENDED),
(States.CONNECTED, States.LOST),
(States.READ_ONLY, States.CONNECTED),
(States.READ_ONLY, States.SUSPENDED),
(States.READ_ONLY, States.LOST),
(States.SUSPENDED, States.CONNECTED),
(States.SUSPENDED, States.READ_ONLY),
(States.SUSPENDED, States.LOST),
}
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
async def establish_session(self):
log.info("Establishing session. {!r}".format(self.session_id))
connection_response = await self.conn.send_connect(
protocol.ConnectRequest(
protocol_version=0,
last_seen_zxid=self.last_zxid or 0,
timeout=int((self.timeout or 0) * 1000),
session_id=self.session_id or 0,
password=self.password,
read_only=self.allow_read_only,
)
)
if connection_response is None:
# handle issue with inconsistent zxid on reconnection
if self.state.current_state != States.LOST:
self.state.transition_to(States.LOST)
self.last_zxid = None
raise exc.SessionLost()
zxid, response = connection_response
self.last_zxid = zxid
if response.session_id == 0: # invalid session, probably expired
log.debug('Session lost')
if self.state.current_state != States.LOST:
self.state.transition_to(States.LOST)
raise exc.SessionLost()
log.info("Got session id %s", hex(response.session_id))
log.info("Negotiated timeout: %s seconds", response.timeout / 1000)
self.session_id = response.session_id