Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_initial_state(content_type):
"""
A state which is not a destination of a transition but can be source of a transition OR not (a destination of a transition and this transition direction is FORWARD)
"""
states = State.objects.filter(pk__in=ProceedingService.get_initial_proceedings(content_type).values_list('meta__transition__source_state', flat=True))
if states.count() == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_INITIAL_STATE, 'There is no available initial state for the content type %s. ' % content_type)
elif states.count() > 1:
raise RiverException(ErrorCode.MULTIPLE_INITIAL_STATE,
'There are multiple initial state for the content type %s. Have only one initial state' % content_type)
return states[0]
def post_group_change(sender, instance, *args, **kwargs):
from river.services.proceeding import ProceedingService
from river.models.proceeding import PENDING
for proceeding_pending in instance.proceedings.filter(status=PENDING):
ProceedingService.override_groups(proceeding_pending, instance.groups.all())
return proceeding
proceeding = process(workflow_object, user, APPROVED, next_state, god_mod)
# Any other proceeding is left?
required_proceedings = ProceedingService.get_available_proceedings(workflow_object, [workflow_object.get_state()], destination_state=next_state,
god_mod=god_mod)
transition_status = False
if required_proceedings.count() == 0:
workflow_object.set_state(proceeding.meta.transition.destination_state)
transition_status = True
# Next states should be PENDING back again if there is circle.
ProceedingService.cycle_proceedings(workflow_object)
# ProceedingService.get_next_proceedings(workflow_object).update(status=PENDING)
with ProceedingSignal(workflow_object, proceeding), TransitionSignal(transition_status, workflow_object, proceeding), FinalSignal(workflow_object):
workflow_object.save()
LOGGER.debug("Workflow object %s is proceeded for next transition. Transition: %s -> %s" % (
workflow_object, workflow_object.get_state().label, workflow_object.get_state()))
def post_permissions_change(sender, instance, *args, **kwargs):
from river.models.proceeding import PENDING
from river.services.proceeding import ProceedingService
for proceeding_pending in instance.proceedings.filter(status=PENDING):
ProceedingService.override_permissions(proceeding_pending, instance.permissions.all())
def register_object(workflow_object):
proceedings = Proceeding.objects.filter(workflow_object=workflow_object)
if proceedings.count() == 0:
ProceedingService.init_proceedings(workflow_object)
def get_final_states(content_type):
"""
A state which is not a source of a transition but can be destination of a transition OR not (a source of a transition and this transition direction is FORWARD)
"""
proceedings = ProceedingService.get_final_proceedings(content_type)
if proceedings.count() == 0:
raise RiverException(ErrorCode.NO_AVAILABLE_FINAL_STATE, 'There is no available final state for the content type %s.' % content_type)
return State.objects.filter(pk__in=proceedings.values_list('meta__transition__destination_state', flat=True))
def get_next_proceedings(workflow_object, proceeding_pks=None, current_states=None, index=0, limit=None):
if not proceeding_pks:
proceeding_pks = []
index += 1
current_states = list(current_states.values_list('pk', flat=True)) if current_states else [workflow_object.get_state()]
next_proceedings = Proceeding.objects.filter(workflow_object=workflow_object, meta__transition__source_state__in=current_states)
if workflow_object.proceeding:
next_proceedings = next_proceedings.exclude(pk=workflow_object.proceeding.pk)
if next_proceedings.exists() and not next_proceedings.filter(pk__in=proceeding_pks).exists() and (
not limit or index < limit):
proceedings = ProceedingService.get_next_proceedings(
workflow_object,
proceeding_pks=proceeding_pks + list(next_proceedings.values_list('pk', flat=True)),
current_states=State.objects.filter(
pk__in=next_proceedings.values_list('meta__transition__destination_state', flat=True)),
index=index,
limit=limit
)
else:
proceedings = Proceeding.objects.filter(pk__in=proceeding_pks)
return proceedings