Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class ConditionTests(TestModelViewsetMixin, ConditionsTestCase):
instances = Condition.objects.all()
url_names = {
'viewset': 'internal-conditions:condition'
}
def prepare_create_instance(self, instance):
instance.key += '_new'
return instance
class AttributeTests(TestListViewsetMixin, ConditionsTestCase):
instances = Attribute.objects.all()
url_names = {
'viewset': 'internal-conditions:attribute'
}
class RelationTests(TestListViewsetMixin, ConditionsTestCase):
url_names = {
'viewset': 'internal-conditions:relation'
}
status_map = {
'list_viewset': {'editor': 200, 'reviewer': 200, 'api': 200, 'user': 200, 'anonymous': 403}
}
class ConditionExportTests(TestExportViewMixin, ConditionsTestCase):
task_uri = get_uri(task_node, nsmap)
try:
task = Task.objects.get(uri=task_uri)
except Task.DoesNotExist:
task = Task()
log.info('Task not in db. Created with uri ' + task_uri)
else:
log.info('Task does exist. Loaded from uri ' + task_uri)
task.uri_prefix = task_uri.split('/tasks/')[0]
task.key = task_uri.split('/')[-1]
try:
attribute_uri = get_uri(task_node, nsmap, 'attrib')
task.attribute = Attribute.objects.get(uri=attribute_uri)
except (AttributeError, Attribute.DoesNotExist, KeyError):
task.attribute = None
for element in task_node.findall('title'):
setattr(task, 'title_' + element.attrib['lang'], element.text)
for element in task_node.findall('text'):
setattr(task, 'text_' + element.attrib['lang'], element.text)
log.info('Task saving to "' + str(task_uri) + '"')
task.save()
# TODO: finalize timeframe
timeframe = TimeFrame()
timeframe_node = task_node.find('timeframe')
timeframe.start_attribute = timeframe_node.find('start_attribute').get(get_ns_tag('dc:uri', nsmap))
timeframe.end_attribute = timeframe_node.find('end_attribute').get(get_ns_tag('dc:uri', nsmap))
def import_attribute(attribute_node, nsmap, parent=None):
uri = attribute_node[get_ns_tag('dc:uri', nsmap)].text
try:
attribute = Attribute.objects.get(uri=uri)
except Attribute.DoesNotExist:
attribute = Attribute()
attribute.parent = parent
attribute.uri_prefix = uri.split('/domain/')[0]
attribute.key = uri.split('/')[-1]
attribute.comment = attribute_node[get_ns_tag('dc:comment', nsmap)]
attribute.is_collection = attribute_node['is_collection'] == 'True'
attribute.value_type = attribute_node['value_type']
attribute.unit = attribute_node['unit']
attribute.save()
if hasattr(attribute_node, 'range'):
import_verbose_name(attribute_node.range, attribute)
if hasattr(attribute_node, 'verbosename'):
def import_attribute(element):
try:
attribute = Attribute.objects.get(uri=element['uri'])
except Attribute.DoesNotExist:
log.info('Attribute not in db. Created with uri %s.', element['uri'])
attribute = Attribute()
attribute.parent = None
if element['parent']:
try:
attribute.parent = Attribute.objects.get(uri=element['parent'])
except Attribute.DoesNotExist:
log.info('Parent not in db. Created with uri %s.', element['uri'])
attribute.uri_prefix = element['uri_prefix'] or ''
attribute.key = element['key'] or ''
attribute.comment = element['comment'] or ''
try:
AttributeUniquePathValidator(attribute).validate()
except ValidationError as e:
log.info('Attribute not saving "%s" due to validation error (%s).', element['uri'], e)
pass
else:
log.info('Attribute saving to "%s", parent "%s".', element['uri'], element['parent'])
attribute.save()
def filter_queryset(self, request, queryset, view):
set_attribute = request.GET.get('set_attribute')
if set_attribute:
try:
attribute = Attribute.objects.get(pk=set_attribute)
attributes = attribute.get_descendants(include_self=True).filter()
queryset = queryset.filter(attribute__in=attributes)
except Attribute.DoesNotExist:
queryset = queryset.none()
return queryset
def import_attribute(element):
try:
attribute = Attribute.objects.get(uri=element['uri'])
except Attribute.DoesNotExist:
log.info('Attribute not in db. Created with uri %s.', element['uri'])
attribute = Attribute()
attribute.parent = None
if element['parent']:
try:
attribute.parent = Attribute.objects.get(uri=element['parent'])
except Attribute.DoesNotExist:
log.info('Parent not in db. Created with uri %s.', element['uri'])
attribute.uri_prefix = element['uri_prefix'] or ''
attribute.key = element['key'] or ''
attribute.comment = element['comment'] or ''
try:
task.key = element['key'] or ''
task.comment = element['comment'] or ''
for lang_code, lang_string, lang_field in get_languages():
set_lang_field(task, 'title', element, lang_code, lang_field)
set_lang_field(task, 'text', element, lang_code, lang_field)
if element['start_attribute']:
try:
task.start_attribute = Attribute.objects.get(uri=element['start_attribute'])
except Attribute.DoesNotExist:
pass
if element['end_attribute']:
try:
task.end_attribute = Attribute.objects.get(uri=element['end_attribute'])
except Attribute.DoesNotExist:
pass
task.days_before = element['days_before']
task.days_after = element['days_after']
try:
TaskUniqueKeyValidator(task).validate()
except ValidationError as e:
log.info('Task not saving "%s" due to validation error (%s).', element['uri'], e)
pass
else:
log.info('Task saving to "%s".', element['uri'])
task.save()
task.sites.add(Site.objects.get_current())
from rest_framework import serializers
from rdmo.domain.models import Attribute
from rdmo.options.models import Option
from ..models import Condition
from ..validators import ConditionUniqueKeyValidator
class ConditionSerializer(serializers.ModelSerializer):
key = serializers.CharField(required=True)
source = serializers.PrimaryKeyRelatedField(queryset=Attribute.objects.all(), required=True)
class Meta:
model = Condition
fields = (
'id',
'uri_prefix',
'key',
'comment',
'source',
'relation',
'target_text',
'target_option'
)
validators = (ConditionUniqueKeyValidator(), )
def import_value(value_node, ns_map, project, snapshot=None):
value = Value()
value.project = project
value.snapshot = snapshot
attribute_uri = get_uri(value_node.find('attribute'), ns_map)
if attribute_uri is not None:
try:
value.attribute = Attribute.objects.get(uri=attribute_uri)
except Attribute.DoesNotExist:
log.info('Attribute %s not in db. Skipping.', attribute_uri)
return
value.set_index = value_node.find('set_index').text
value.collection_index = value_node.find('collection_index').text
value.text = value_node.find('text').text or ''
option_uri = get_uri(value_node.find('option'), ns_map)
if option_uri is not None:
try:
value.option = Option.objects.get(uri=option_uri)
except Option.DoesNotExist:
log.info('Option %s not in db. Skipping.', option_uri)
return
log.info('QuestionSet not in db. Created with uri %s.', element['uri'])
question = Question()
try:
question.questionset = QuestionSet.objects.get(uri=element['questionset'])
except QuestionSet.DoesNotExist:
log.info('QuestionSet not in db. Skipping.')
return
question.uri_prefix = element['uri_prefix'] or ''
question.key = element['key'] or ''
question.comment = element['comment'] or ''
if element['attribute']:
try:
question.attribute = Attribute.objects.get(uri=element['attribute'])
except Attribute.DoesNotExist:
pass
question.is_collection = element['is_collection']
question.order = element['order']
for lang_code, lang_string, lang_field in get_languages():
set_lang_field(question, 'text', element, lang_code, lang_field)
set_lang_field(question, 'help', element, lang_code, lang_field)
set_lang_field(question, 'verbose_name', element, lang_code, lang_field)
set_lang_field(question, 'verbose_name_plural', element, lang_code, lang_field)
question.widget_type = element['widget_type'] or ''
question.value_type = element['value_type'] or ''
question.maximum = element['maximum']
question.minimum = element['minimum']