Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_handler_backend():
handler_module, handler_cls = app_config.HANDLER_BACKEND_CLASS.rsplit('.', 1)
return getattr(__import__(handler_module, fromlist=[handler_cls]), handler_cls)(**app_config.HANDLER_BACKEND_CONFIG)
from river.config import app_config
__author__ = 'ahmetdal'
handler_module, hooking_cls = app_config.HOOKING_BACKEND_CLASS.rsplit('.', 1)
callback_backend = getattr(__import__(handler_module, fromlist=[hooking_cls]), hooking_cls)(**app_config.HOOKING_BACKEND_CONFIG)
def load_handler_backend():
handler_module, handler_cls = app_config.HANDLER_BACKEND_CLASS.rsplit('.', 1)
return getattr(__import__(handler_module, fromlist=[handler_cls]), handler_cls)(**app_config.HANDLER_BACKEND_CONFIG)
permission_q = Q()
for p in permissions:
label, codename = p.split('.')
permission_q = permission_q | Q(permissions__content_type__app_label=label,
permissions__codename=codename)
return proceedings.filter(
(
(Q(transactioner__isnull=True) | Q(transactioner=user)) &
(Q(permissions__isnull=True) | permission_q) &
(Q(groups__isnull=True) | group_q)
)
)
proceedings = Proceeding.objects.filter(
workflow_object=workflow_object,
meta__transition__source_state__in=source_states,
status=PENDING,
enabled=True
)
suitable_proceedings = get_proceeding(proceedings.filter(skip=False))
if user and not god_mod:
suitable_proceedings = authorize_proceedings(suitable_proceedings)
skipped_proceedings = get_proceeding(proceedings.filter(skip=True))
if skipped_proceedings:
source_state_pks = list(skipped_proceedings.values_list('meta__transition__destination_state', flat=True))
suitable_proceedings = suitable_proceedings | ProceedingService.get_available_proceedings(workflow_object,
State.objects.filter(
def _authorized_approvals(self, as_user):
group_q = Q()
for g in as_user.groups.all():
group_q = group_q | Q(groups__in=[g])
permissions = []
for backend in auth.get_backends():
permissions.extend(backend.get_all_permissions(as_user))
permission_q = Q()
for p in permissions:
label, codename = p.split('.')
permission_q = permission_q | Q(permissions__content_type__app_label=label,
permissions__codename=codename)
return TransitionApproval.objects.filter(
Q(workflow=self.workflow, status=PENDING) &
(
(Q(transactioner__isnull=True) | Q(transactioner=as_user)) &
(Q(permissions__isnull=True) | permission_q) &
(Q(groups__isnull=True) | group_q)
)
def _transitions_before(iteration):
return Transition.objects.filter(workflow=self.workflow, workflow_object=self.workflow_object, iteration__lte=iteration)
)
transitions = Transition.objects.filter(qs)
iteration = transition.iteration + 1
while transitions:
uncancelled_transitions_qs = uncancelled_transitions_qs | qs
qs = Q(
workflow=self.workflow,
object_id=self.workflow_object.pk,
iteration=iteration,
source_state__pk__in=transitions.values_list("destination_state__pk", flat=True),
status=PENDING
)
transitions = Transition.objects.filter(qs)
iteration += 1
actual_cancelled_transitions = Transition.objects.filter(cancelled_transitions_qs & ~uncancelled_transitions_qs)
actual_cancelled_transitions.update(status=CANCELLED)
TransitionApproval.objects.filter(transition__in=actual_cancelled_transitions).update(status=CANCELLED)
def _get_transition_images(self, source_states):
meta_max_iteration = Transition.objects.filter(
workflow=self.workflow,
workflow_object=self.workflow_object,
source_state__pk__in=source_states,
).values_list("meta").annotate(max_iteration=Max("iteration"))
return Transition.objects.filter(
Q(workflow=self.workflow, object_id=self.workflow_object.pk) &
six.moves.reduce(lambda agg, q: q | agg, [Q(meta__id=meta_id, iteration=max_iteration) for meta_id, max_iteration in meta_max_iteration], Q(pk=-1))
)
iteration=transition.iteration,
source_state=transition.source_state,
destination_state=transition.destination_state
)
transitions = Transition.objects.filter(qs)
iteration = transition.iteration + 1
while transitions:
uncancelled_transitions_qs = uncancelled_transitions_qs | qs
qs = Q(
workflow=self.workflow,
object_id=self.workflow_object.pk,
iteration=iteration,
source_state__pk__in=transitions.values_list("destination_state__pk", flat=True),
status=PENDING
)
transitions = Transition.objects.filter(qs)
iteration += 1
actual_cancelled_transitions = Transition.objects.filter(cancelled_transitions_qs & ~uncancelled_transitions_qs)
actual_cancelled_transitions.update(status=CANCELLED)
TransitionApproval.objects.filter(transition__in=actual_cancelled_transitions).update(status=CANCELLED)
def _check_if_it_cycled(self, done_transition):
qs = Transition.objects.filter(
workflow_object=self.workflow_object,
workflow=self.class_workflow.workflow,
source_state=done_transition.destination_state
)
return qs.filter(status=DONE).count() > 0 and qs.filter(status=PENDING).count() == 0