Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_next_proceedings(workflow_object, proceeding_pks=None, current_states=None, index=0, limit=None):
if not proceeding_pks:
proceeding_pks = []
index += 1
current_states = list(current_states.values_list('pk', flat=True)) if current_states else [workflow_object.get_state()]
next_proceedings = Proceeding.objects.filter(workflow_object=workflow_object, meta__transition__source_state__in=current_states)
if workflow_object.proceeding:
next_proceedings = next_proceedings.exclude(pk=workflow_object.proceeding.pk)
if next_proceedings.exists() and not next_proceedings.filter(pk__in=proceeding_pks).exists() and (
not limit or index < limit):
proceedings = ProceedingService.get_next_proceedings(
workflow_object,
proceeding_pks=proceeding_pks + list(next_proceedings.values_list('pk', flat=True)),
current_states=State.objects.filter(
pk__in=next_proceedings.values_list('meta__transition__destination_state', flat=True)),
index=index,
limit=limit
)
else:
proceedings = Proceeding.objects.filter(pk__in=proceeding_pks)
return proceedings
def get_available_states(workflow_object, user, include_user=True):
proceedings = Proceeding.objects.filter(
workflow_object=workflow_object,
meta__transition__source_state=workflow_object.get_state(),
)
if include_user:
proceedings = proceedings.filter(
meta__permissions__in=user.user_permissions.all()
)
destination_states = proceedings.values_list('meta__transition__destination_state', flat=True)
return State.objects.filter(pk__in=destination_states)
workflow_object=workflow_object,
meta__transition__source_state__in=source_states,
status=PENDING,
enabled=True
)
suitable_proceedings = get_proceeding(proceedings.filter(skip=False))
if user and not god_mod:
suitable_proceedings = authorize_proceedings(suitable_proceedings)
skipped_proceedings = get_proceeding(proceedings.filter(skip=True))
if skipped_proceedings:
source_state_pks = list(skipped_proceedings.values_list('meta__transition__destination_state', flat=True))
suitable_proceedings = suitable_proceedings | ProceedingService.get_available_proceedings(workflow_object,
State.objects.filter(
pk__in=source_state_pks),
user=user,
destination_state=destination_state,
god_mod=god_mod)
return suitable_proceedings
def get_next_approvements(workflow_object, field, approvement_pks=None, current_states=None, index=0, limit=None):
if not approvement_pks:
approvement_pks = []
index += 1
current_states = list(current_states.values_list('pk', flat=True)) if current_states else [getattr(workflow_object, field)]
next_approvements = Approvement.objects.filter(workflow_object=workflow_object, field=field, meta__transition__source_state__in=current_states)
if next_approvements.exists() and not next_approvements.filter(pk__in=approvement_pks).exists() and (not limit or index < limit):
approvements = ApprovementService.get_next_approvements(
workflow_object,
field,
approvement_pks=approvement_pks + list(next_approvements.values_list('pk', flat=True)),
current_states=State.objects.filter(pk__in=next_approvements.values_list('meta__transition__destination_state', flat=True)),
index=index,
limit=limit
)
else:
approvements = Approvement.objects.filter(pk__in=approvement_pks)
return approvements
workflow_object=workflow_object,
field=field,
meta__transition__source_state__in=source_states,
status=PENDING,
enabled=True
)
suitable_approvements = get_approvement(approvements.filter(skip=False))
if user and not god_mod:
suitable_approvements = authorize_approvements(suitable_approvements)
skipped_approvements = get_approvement(approvements.filter(skip=True))
if skipped_approvements:
source_state_pks = list(skipped_approvements.values_list('meta__transition__destination_state', flat=True))
suitable_approvements = suitable_approvements | ApprovementService.get_approvements_object_waiting_for_approval(workflow_object, field, State.objects.filter(pk__in=source_state_pks),
user=user, destination_state=destination_state, god_mod=god_mod)
return suitable_approvements
def get_by_natural_key(self, source_state, destination_state):
return self.get(source_state=State.objects.get_by_natural_key(source_state), destination_state=State.objects.get_by_natural_key(destination_state))
def get_state_by(**kwargs):
if kwargs:
return State.objects.get(**kwargs)