Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class States:
CONNECTED = "connected"
SUSPENDED = "suspended"
READ_ONLY = "read_only"
LOST = "lost"
class SessionStateMachine:
valid_transitions = {
(States.LOST, States.CONNECTED),
(States.LOST, States.READ_ONLY),
(States.CONNECTED, States.SUSPENDED),
(States.CONNECTED, States.LOST),
(States.READ_ONLY, States.CONNECTED),
(States.READ_ONLY, States.SUSPENDED),
(States.READ_ONLY, States.LOST),
(States.SUSPENDED, States.CONNECTED),
(States.SUSPENDED, States.READ_ONLY),
(States.SUSPENDED, States.LOST),
}
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
def transition_to(self, state):
if (self.current_state, state) not in self.valid_transitions:
raise exc.InvalidStateTransition(
self.loop.call_soon(callback, event.path)
# ioloop.IOLoop.current().add_callback(callback, event.path)
return
if event.state == protocol.WatchEvent.DISCONNECTED:
log.error("Got 'disconnected' watch event.")
self.state.transition_to(States.LOST)
elif event.state == protocol.WatchEvent.SESSION_EXPIRED:
log.error("Got 'session expired' watch event.")
self.state.transition_to(States.LOST)
elif event.state == protocol.WatchEvent.AUTH_FAILED:
log.error("Got 'auth failed' watch event.")
self.state.transition_to(States.LOST)
elif event.state == protocol.WatchEvent.CONNECTED_READ_ONLY:
log.warning("Got 'connected read only' watch event.")
self.state.transition_to(States.READ_ONLY)
elif event.state == protocol.WatchEvent.SASL_AUTHENTICATED:
log.info("Authentication successful.")
elif event.state == protocol.WatchEvent.CONNECTED:
log.info("Got 'connected' watch event.")
self.state.transition_to(States.CONNECTED)
async def ensure_safe_state(self, writing=False):
safe_states = [States.CONNECTED]
if self.allow_read_only and not writing:
safe_states.append(States.READ_ONLY)
if self.state in safe_states:
return
await self.state.wait_for(*safe_states)
SUSPENDED = "suspended"
READ_ONLY = "read_only"
LOST = "lost"
class SessionStateMachine:
valid_transitions = {
(States.LOST, States.CONNECTED),
(States.LOST, States.READ_ONLY),
(States.CONNECTED, States.SUSPENDED),
(States.CONNECTED, States.LOST),
(States.READ_ONLY, States.CONNECTED),
(States.READ_ONLY, States.SUSPENDED),
(States.READ_ONLY, States.LOST),
(States.SUSPENDED, States.CONNECTED),
(States.SUSPENDED, States.READ_ONLY),
(States.SUSPENDED, States.LOST),
}
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
def transition_to(self, state):
if (self.current_state, state) not in self.valid_transitions:
raise exc.InvalidStateTransition(
"Invalid session state transition: %s -> %s" % (
self.current_state, state
)
)
CONNECTED = "connected"
SUSPENDED = "suspended"
READ_ONLY = "read_only"
LOST = "lost"
class SessionStateMachine:
valid_transitions = {
(States.LOST, States.CONNECTED),
(States.LOST, States.READ_ONLY),
(States.CONNECTED, States.SUSPENDED),
(States.CONNECTED, States.LOST),
(States.READ_ONLY, States.CONNECTED),
(States.READ_ONLY, States.SUSPENDED),
(States.READ_ONLY, States.LOST),
(States.SUSPENDED, States.CONNECTED),
(States.SUSPENDED, States.READ_ONLY),
(States.SUSPENDED, States.LOST),
}
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
def transition_to(self, state):
if (self.current_state, state) not in self.valid_transitions:
raise exc.InvalidStateTransition(
"Invalid session state transition: %s -> %s" % (
self.current_state, state
)
async def delete_garbage_znodes(self, znode_label):
MAXIMUM_WAIT = 60
retry_policy = RetryPolicy.exponential_backoff(maximum=MAXIMUM_WAIT)
while True:
await self.client.session.state.wait_for(states.States.CONNECTED)
await retry_policy.enforce()
try:
siblings = await self.get_siblings()
for sibling in siblings:
if self.guid in sibling and self.determine_znode_label(
sibling) == znode_label:
path = self.sibling_path(sibling)
if path != self.owned_paths.get(znode_label, ''):
await self.client.delete(path)
break
except Exception:
log.exception('Exception in delete_garbage_znodes:')
LOST = "lost"
class SessionStateMachine:
valid_transitions = {
(States.LOST, States.CONNECTED),
(States.LOST, States.READ_ONLY),
(States.CONNECTED, States.SUSPENDED),
(States.CONNECTED, States.LOST),
(States.READ_ONLY, States.CONNECTED),
(States.READ_ONLY, States.SUSPENDED),
(States.READ_ONLY, States.LOST),
(States.SUSPENDED, States.CONNECTED),
(States.SUSPENDED, States.READ_ONLY),
(States.SUSPENDED, States.LOST),
}
def __init__(self, session):
self.session = session
self.current_state = States.LOST
self.futures = collections.defaultdict(set)
def transition_to(self, state):
if (self.current_state, state) not in self.valid_transitions:
raise exc.InvalidStateTransition(
"Invalid session state transition: %s -> %s" % (
self.current_state, state
)
)
log.debug("Session transition: %s -> %s", self.current_state, state)