Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _transition() -> bool:
with app.session.begin_nested():
lock_label(app, label)
current_state = get_current_state(app, label, state_machine)
if isinstance(current_state, Action):
return process_action(
app=app,
state=current_state,
state_machine=state_machine,
label=label,
)
elif isinstance(current_state, Gate): # pragma: no branch
if not current_state.trigger_on_entry:
return False
return process_gate(
app=app,
state=current_state,
state_machine=state_machine,
label=label,
)
else:
raise RuntimeError( # pragma: no cover
"Unsupported state type {0}".format(current_state),
)
app: App,
label: LabelRef,
state_machine: StateMachine,
state_pending_update: State,
):
with app.session.begin_nested():
lock_label(app, label)
current_state = get_current_state(app, label, state_machine)
if state_pending_update != current_state:
# We have raced with another update, and are no longer in
# the state for which we needed an update, so we should
# stop.
return
if not isinstance(current_state, Gate): # pragma: no branch
# Cannot be hit because of the semantics of
# `needs_gate_evaluation_for_metadata_change`. Here to
# appease mypy.
raise RuntimeError( # pragma: no cover
"Label not in a gate",
)
could_progress = process_gate(
app=app,
state=current_state,
state_machine=state_machine,
label=label,
)
if could_progress:
process_transitions(app, label)
def _configure_schedule_for_state(
scheduler: schedule.Scheduler,
processor: StateSpecificCronProcessor,
state: State,
) -> None:
if isinstance(state, Action):
scheduler.every().minute.do(
processor,
fn=process_action,
label_provider=labels_in_state,
)
elif isinstance(state, Gate):
for trigger in state.triggers:
if isinstance(trigger, TimeTrigger):
scheduler.every().day.at(
f"{trigger.time.hour:02d}:{trigger.time.minute:02d}",
).do(
processor,
fn=process_gate,
label_provider=labels_in_state,
)
elif isinstance(trigger, IntervalTrigger):
scheduler.every(
trigger.interval.total_seconds(),
).seconds.do(
processor,
fn=process_gate,
label_provider=labels_in_state,
*,
app: App,
state: State,
state_machine: StateMachine,
label: LabelRef,
) -> bool:
"""
Process a label in a gate, continuing if necessary.
Assumes that `gate` is the current state of the label, and that the label
has been locked.
Returns whether the label progressed in the state machine, for which `True`
implies further progression should be attempted.
"""
if not isinstance(state, Gate): # pragma: no branch
raise ValueError( # pragma: no cover
f"process_gate called with {state.name} which is not an Gate",
)
gate = state
state_machine = get_state_machine(app, label)
metadata, deleted = get_label_metadata(app, label, state_machine)
if deleted:
raise DeletedLabel(label)
history_entry = get_current_history(app, label)
context = context_for_label(
label,
metadata,
def context_for_label(
label: LabelRef,
metadata: Metadata,
state_machine: StateMachine,
state: State,
history_entry: Any,
logger: BaseLogger,
) -> Context:
"""Util to build the context for a label in a state."""
feeds = feeds_for_state_machine(state_machine)
accessed_variables: List[str] = []
if isinstance(state, Gate):
accessed_variables.extend(state.exit_condition.accessed_variables())
if isinstance(state.next_states, ContextNextStates):
accessed_variables.append(state.next_states.path)
@contextlib.contextmanager
def feed_logging_context(feed_url):
with logger.process_feed(state_machine, state, feed_url):
yield functools.partial(
logger.feed_response,
state_machine,
state,
feed_url,
)
return Context(
label=label.name,
label: LabelRef,
update: Metadata,
) -> Tuple[bool, State]:
"""
Given a change to the metadata, should the gate evaluation be triggered.
"""
current_state = get_current_state(app, label, state_machine)
if current_state is None:
raise ValueError(
f"Cannot determine gate evaluation for deleted label {label} "
"(deleted labels have no current state)",
)
if not isinstance(current_state, Gate):
# Label is not a gate state so there's no trigger to resolve.
return False, current_state
if any(
trigger.should_trigger_for_update(update)
for trigger in current_state.metadata_triggers
):
return True, current_state
return False, current_state