Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def contribute_to_class(self, cls, name):
@classproperty
def river(_self):
return RiverObject(_self)
self.field_name = name
self._add_to_class(cls, self.field_name + "_transition_approvals", GenericRelation('%s.%s' % (TransitionApproval._meta.app_label, TransitionApproval._meta.object_name)))
self._add_to_class(cls, self.field_name + "_transitions", GenericRelation('%s.%s' % (Transition._meta.app_label, Transition._meta.object_name)))
if id(cls) not in workflow_registry.workflows:
self._add_to_class(cls, "river", river)
super(StateField, self).contribute_to_class(cls, name)
if id(cls) not in workflow_registry.workflows:
post_save.connect(_on_workflow_object_saved, self.model, False, dispatch_uid='%s_%s_riverstatefield_post' % (self.model, name))
post_delete.connect(_on_workflow_object_deleted, self.model, False, dispatch_uid='%s_%s_riverstatefield_post' % (self.model, name))
workflow_registry.add(self.field_name, cls)
def _register_hook_inlines(self, model): # pylint: disable=no-self-use
from django.contrib import admin
from river.core.workflowregistry import workflow_registry
from river.admin import OnApprovedHookInline, OnTransitHookInline, OnCompleteHookInline, DefaultWorkflowModelAdmin
registered_admin = admin.site._registry.get(model, None)
if registered_admin:
if OnApprovedHookInline not in registered_admin.inlines:
registered_admin.inlines = list(set(registered_admin.inlines + [OnApprovedHookInline, OnTransitHookInline, OnCompleteHookInline]))
registered_admin.readonly_fields = list(set(list(registered_admin.readonly_fields) + list(workflow_registry.get_class_fields(model))))
admin.site._registry[model] = registered_admin
else:
admin.site.register(model, DefaultWorkflowModelAdmin)
def _get_all_workflow_fields(cls):
from river.core.workflowregistry import workflow_registry
return reduce(operator.concat, map(list, workflow_registry.workflows.values()), [])
self.field_name = name
self._add_to_class(cls, self.field_name + "_transition_approvals", GenericRelation('%s.%s' % (TransitionApproval._meta.app_label, TransitionApproval._meta.object_name)))
self._add_to_class(cls, self.field_name + "_transitions", GenericRelation('%s.%s' % (Transition._meta.app_label, Transition._meta.object_name)))
if id(cls) not in workflow_registry.workflows:
self._add_to_class(cls, "river", river)
super(StateField, self).contribute_to_class(cls, name)
if id(cls) not in workflow_registry.workflows:
post_save.connect(_on_workflow_object_saved, self.model, False, dispatch_uid='%s_%s_riverstatefield_post' % (self.model, name))
post_delete.connect(_on_workflow_object_deleted, self.model, False, dispatch_uid='%s_%s_riverstatefield_post' % (self.model, name))
workflow_registry.add(self.field_name, cls)
def __getattr__(self, field_name):
cls = self.owner if self.is_class else self.owner.__class__
if field_name not in workflow_registry.workflows[id(cls)]:
raise Exception("Workflow with name:%s doesn't exist for class:%s" % (field_name, cls.__name__))
if self.is_class:
return ClassWorkflowObject(self.owner, field_name)
else:
return InstanceWorkflowObject(self.owner, field_name)
def _get_workflow_class_fields(cls, model):
from river.core.workflowregistry import workflow_registry
return workflow_registry.workflows[id(model)]
def __init__(self, *args, **kwargs):
super(DefaultWorkflowModelAdmin, self).__init__(*args, **kwargs)
self.readonly_fields += tuple(workflow_registry.get_class_fields(self.model))
def all(self, cls):
return list([getattr(self, field_name) for field_name in workflow_registry.workflows[id(cls)]])
def _get_all_workflow_classes(cls):
from river.core.workflowregistry import workflow_registry
return list(workflow_registry.class_index.values())