Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return Transition.objects.filter(workflow=self.workflow, workflow_object=self.workflow_object, iteration__lte=iteration)
try:
recent_iteration = self.recent_approval.iteration if self.recent_approval else 0
jumped_transition = getattr(self.workflow_object, self.field_name + "_transitions").filter(
iteration__gte=recent_iteration, destination_state=state, status=PENDING
).earliest("iteration")
jumped_transitions = _transitions_before(jumped_transition.iteration).filter(status=PENDING)
TransitionApproval.objects.filter(pk__in=jumped_transitions.values_list("transition_approvals__pk", flat=True)).update(status=JUMPED)
jumped_transitions.update(status=JUMPED)
self.set_state(state)
self.workflow_object.save()
except Transition.DoesNotExist:
raise RiverException(ErrorCode.STATE_IS_NOT_AVAILABLE_TO_BE_JUMPED, "This state is not available to be jumped in the future of this object")
def process(workflow_object, user, action, next_state=None, god_mod=False):
proceedings = ProceedingService.get_available_proceedings(workflow_object, [workflow_object.get_state()], user=user, god_mod=god_mod)
c = proceedings.count()
if c == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available state for destination for the user.")
if c > 1:
if next_state:
proceedings = proceedings.filter(meta__transition__destination_state=next_state)
if proceedings.count() == 0:
available_states = StateService.get_available_states(workflow_object, user)
raise RiverException(ErrorCode.INVALID_NEXT_STATE_FOR_USER, "Invalid state is given(%s). Valid states is(are) %s" % (
next_state.__str__(), ','.join([ast.__str__() for ast in available_states])))
else:
raise RiverException(ErrorCode.NEXT_STATE_IS_REQUIRED,
"State must be given when there are multiple states for destination")
proceeding = proceedings[0]
proceeding.status = action
proceeding.transactioner = user
proceeding.transaction_date = datetime.now()
if workflow_object.proceeding:
proceeding.previous = workflow_object.proceeding
proceeding.save()
return proceeding
def process(workflow_object, user, action, next_state=None, god_mod=False):
proceedings = ProceedingService.get_available_proceedings(workflow_object, [workflow_object.get_state()], user=user, god_mod=god_mod)
c = proceedings.count()
if c == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available state for destination for the user.")
if c > 1:
if next_state:
proceedings = proceedings.filter(meta__transition__destination_state=next_state)
if proceedings.count() == 0:
available_states = StateService.get_available_states(workflow_object, user)
raise RiverException(ErrorCode.INVALID_NEXT_STATE_FOR_USER, "Invalid state is given(%s). Valid states is(are) %s" % (
next_state.__str__(), ','.join([ast.__str__() for ast in available_states])))
else:
raise RiverException(ErrorCode.NEXT_STATE_IS_REQUIRED,
"State must be given when there are multiple states for destination")
proceeding = proceedings[0]
proceeding.status = action
proceeding.transactioner = user
proceeding.transaction_date = datetime.now()
if workflow_object.proceeding:
proceeding.previous = workflow_object.proceeding
def get_field(workflow_class):
from river.models.fields.state import StateField
field = next((f for f in workflow_class._meta.fields if type(f) is StateField), None)
if not field:
raise RiverException(ErrorCode.NO_STATE_FIELD, "There is no state field in model class of given instance")
return field
def approve(self, as_user, next_state=None):
available_approvals = self.get_available_approvals(as_user=as_user)
number_of_available_approvals = available_approvals.count()
if number_of_available_approvals == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available approval for the user.")
elif next_state:
available_approvals = available_approvals.filter(transition__destination_state=next_state)
if available_approvals.count() == 0:
available_states = self.get_available_states(as_user)
raise RiverException(ErrorCode.INVALID_NEXT_STATE_FOR_USER, "Invalid state is given(%s). Valid states is(are) %s" % (
next_state.__str__(), ','.join([ast.__str__() for ast in available_states])))
elif number_of_available_approvals > 1 and not next_state:
raise RiverException(ErrorCode.NEXT_STATE_IS_REQUIRED, "State must be given when there are multiple states for destination")
approval = available_approvals.first()
approval.status = APPROVED
approval.transactioner = as_user
approval.transaction_date = timezone.now()
approval.previous = self.recent_approval
approval.save()
if next_state:
self.cancel_impossible_future(approval)
has_transit = False
if approval.peers.filter(status=PENDING).count() == 0:
def __init__(self, error_code, *args, **kwargs):
super(RiverException, self).__init__(*args, **kwargs)
self.code = error_code
def process(workflow_object, user, action, next_state=None, god_mod=False):
proceedings = ProceedingService.get_available_proceedings(workflow_object, [workflow_object.get_state()], user=user, god_mod=god_mod)
c = proceedings.count()
if c == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available state for destination for the user.")
if c > 1:
if next_state:
proceedings = proceedings.filter(meta__transition__destination_state=next_state)
if proceedings.count() == 0:
available_states = StateService.get_available_states(workflow_object, user)
raise RiverException(ErrorCode.INVALID_NEXT_STATE_FOR_USER, "Invalid state is given(%s). Valid states is(are) %s" % (
next_state.__str__(), ','.join([ast.__str__() for ast in available_states])))
else:
raise RiverException(ErrorCode.NEXT_STATE_IS_REQUIRED,
"State must be given when there are multiple states for destination")
proceeding = proceedings[0]
proceeding.status = action
proceeding.transactioner = user
proceeding.transaction_date = datetime.now()
if workflow_object.proceeding:
proceeding.previous = workflow_object.proceeding
proceeding.save()
return proceeding
def approve(self, as_user, next_state=None):
available_approvals = self.get_available_approvals(as_user=as_user)
number_of_available_approvals = available_approvals.count()
if number_of_available_approvals == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_NEXT_STATE_FOR_USER, "There is no available approval for the user.")
elif next_state:
available_approvals = available_approvals.filter(transition__destination_state=next_state)
if available_approvals.count() == 0:
available_states = self.get_available_states(as_user)
raise RiverException(ErrorCode.INVALID_NEXT_STATE_FOR_USER, "Invalid state is given(%s). Valid states is(are) %s" % (
next_state.__str__(), ','.join([ast.__str__() for ast in available_states])))
elif number_of_available_approvals > 1 and not next_state:
raise RiverException(ErrorCode.NEXT_STATE_IS_REQUIRED, "State must be given when there are multiple states for destination")
approval = available_approvals.first()
approval.status = APPROVED
approval.transactioner = as_user
approval.transaction_date = timezone.now()
approval.previous = self.recent_approval
approval.save()
if next_state:
self.cancel_impossible_future(approval)
has_transit = False
if approval.peers.filter(status=PENDING).count() == 0:
approval.transition.status = DONE
approval.transition.save()
previous_state = self.get_state()