Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
payload = self.payload(name=name, description=description, organization=organization, **kwargs)
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
return payload
def create(self, name='', description='', organization=None, **kwargs):
payload = self.create_payload(name=name, description=description, organization=organization, **kwargs)
return self.update_identity(WorkflowJobTemplates(self.connection).post(payload))
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
page.register_page([resources.workflow_job_template,
(resources.workflow_job_templates, 'post'),
(resources.workflow_job_template_copy, 'post')], WorkflowJobTemplate)
class WorkflowJobTemplates(page.PageList, WorkflowJobTemplate):
pass
page.register_page([resources.workflow_job_templates], WorkflowJobTemplates)
class WorkflowJobTemplateLaunch(base.Base):
pass
from awxkit.api.resources import resources
from . import base
from . import page
class Metrics(base.Base):
def get(self, **query_parameters):
request = self.connection.get(self.endpoint, query_parameters,
headers={'Accept': 'application/json'})
return self.page_identity(request)
page.register_page([resources.metrics,
(resources.metrics, 'get')], Metrics)
except (yaml.parser.ParserError, yaml.scanner.ScannerError):
if arg[0] == '@': # extra var file reference
args.append(attempt_yaml_load(arg))
elif args[-1] == '-c': # this arg is likely sh arg string
args.extend([attempt_yaml_load(item) for item in args_string_to_list(arg)])
else:
raise
return args
class UnifiedJobs(page.PageList, UnifiedJob):
pass
page.register_page([resources.unified_jobs,
resources.instance_related_jobs,
resources.instance_group_related_jobs,
resources.schedules_jobs], UnifiedJobs)
update_payload(payload, fields, kwargs)
return payload
def create_payload(self, kind='cloud', **kwargs):
payload = self.payload(kind=kind, **kwargs)
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
return payload
def create(self, kind='cloud', **kwargs):
payload = self.create_payload(kind=kind, **kwargs)
return self.update_identity(
CredentialTypes(
self.connection).post(payload))
page.register_page([resources.credential_type,
(resources.credential_types, 'post')], CredentialType)
class CredentialTypes(page.PageList, CredentialType):
pass
page.register_page(resources.credential_types, CredentialTypes)
class Credential(HasCopy, HasCreate, base.Base):
dependencies = [CredentialType]
optional_dependencies = [Organization, User, Team]
result_attr = 'notification_templates_{0}'.format(job_result)
if result_attr not in resource.related:
raise ValueError(
'Unsupported resource "{0}". Does not have a related {1} field.' .format(
resource, result_attr))
payload = dict(id=self.id)
if disassociate:
payload['disassociate'] = True
with suppress(exc.NoContent):
getattr(resource.related, result_attr).post(payload)
page.register_page([resources.notification_template,
(resources.notification_templates, 'post'),
(resources.notification_template_copy, 'post'),
resources.notification_template_any,
resources.notification_template_error,
resources.notification_template_success], NotificationTemplate)
class NotificationTemplates(page.PageList, NotificationTemplate):
pass
page.register_page([resources.notification_templates,
resources.notification_templates_any,
resources.notification_templates_error,
resources.notification_templates_success],
try:
return self.job
except AttributeError:
return False
page.register_page(resources.workflow_job_node, WorkflowJobNode)
class WorkflowJobNodes(page.PageList, WorkflowJobNode):
pass
page.register_page([resources.workflow_job_nodes,
resources.workflow_job_workflow_nodes,
resources.workflow_job_node_always_nodes,
resources.workflow_job_node_failure_nodes,
resources.workflow_job_node_success_nodes], WorkflowJobNodes)
payload = self.create_payload(kind=kind, **kwargs)
return self.update_identity(
CredentialTypes(
self.connection).post(payload))
page.register_page([resources.credential_type,
(resources.credential_types, 'post')], CredentialType)
class CredentialTypes(page.PageList, CredentialType):
pass
page.register_page(resources.credential_types, CredentialTypes)
class Credential(HasCopy, HasCreate, base.Base):
dependencies = [CredentialType]
optional_dependencies = [Organization, User, Team]
def payload(
self,
credential_type,
user=None,
team=None,
organization=None,
inputs=None,
**kwargs):
if not any((user, team, organization)):
for cred in self.related.credentials.get().results:
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=cred.id, disassociate=True))
def make_approval_node(
self,
**kwargs
):
if 'name' not in kwargs:
kwargs['name'] = 'approval node {}'.format(random_title())
self.related.create_approval_template.post(kwargs)
return self.get()
page.register_page([resources.workflow_job_template_node,
(resources.workflow_job_template_nodes,
'post')],
WorkflowJobTemplateNode)
class WorkflowJobTemplateNodes(page.PageList, WorkflowJobTemplateNode):
pass
page.register_page([resources.workflow_job_template_nodes,
resources.workflow_job_template_workflow_nodes,
resources.workflow_job_template_node_always_nodes,
resources.workflow_job_template_node_failure_nodes,
resources.workflow_job_template_node_success_nodes],
WorkflowJobTemplateNodes)
elif args[-1] == '-c': # this arg is likely sh arg string
args.extend([attempt_yaml_load(item) for item in args_string_to_list(arg)])
else:
raise
return args
class UnifiedJobs(page.PageList, UnifiedJob):
pass
page.register_page([resources.unified_jobs,
resources.instance_related_jobs,
resources.instance_group_related_jobs,
resources.schedules_jobs], UnifiedJobs)
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
return payload
def create(self, name='', description='', organization=None, **kwargs):
payload = self.create_payload(name=name, description=description, organization=organization, **kwargs)
return self.update_identity(WorkflowJobTemplates(self.connection).post(payload))
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
page.register_page([resources.workflow_job_template,
(resources.workflow_job_templates, 'post'),
(resources.workflow_job_template_copy, 'post')], WorkflowJobTemplate)
class WorkflowJobTemplates(page.PageList, WorkflowJobTemplate):
pass
page.register_page([resources.workflow_job_templates], WorkflowJobTemplates)
class WorkflowJobTemplateLaunch(base.Base):
pass