Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return payload
def create(self, username='', password='', organization=None, **kwargs):
payload = self.create_payload(
username=username, password=password, **kwargs)
self.password = payload.password
self.update_identity(Users(self.connection).post(payload))
if organization:
organization.add_user(self)
return self
page.register_page([resources.user,
(resources.users, 'post')], User)
class Users(page.PageList, User):
pass
page.register_page([resources.users,
resources.organization_admins,
resources.related_users,
resources.user_admin_organizations], Users)
class Me(Users):
from awxkit.api.resources import resources
from . import page
class Subscriptions(page.Page):
def get_possible_licenses(self, **kwargs):
return self.post(json=kwargs).json
page.register_page(resources.subscriptions, Subscriptions)
self.get()
try:
return self.job
except AttributeError:
return False
page.register_page(resources.workflow_job_node, WorkflowJobNode)
class WorkflowJobNodes(page.PageList, WorkflowJobNode):
pass
page.register_page([resources.workflow_job_nodes,
resources.workflow_job_workflow_nodes,
resources.workflow_job_node_always_nodes,
resources.workflow_job_node_failure_nodes,
resources.workflow_job_node_success_nodes], WorkflowJobNodes)
"""Waits until node's job exists"""
adjusted_timeout = timeout - seconds_since_date_string(self.created)
poll_until(self.job_exists, interval=interval, timeout=adjusted_timeout, **kw)
return self
def job_exists(self):
self.get()
try:
return self.job
except AttributeError:
return False
page.register_page(resources.workflow_job_node, WorkflowJobNode)
class WorkflowJobNodes(page.PageList, WorkflowJobNode):
pass
page.register_page([resources.workflow_job_nodes,
resources.workflow_job_workflow_nodes,
resources.workflow_job_node_always_nodes,
resources.workflow_job_node_failure_nodes,
resources.workflow_job_node_success_nodes], WorkflowJobNodes)
"""Return a list of expected passwords needed to start a job using this credential."""
passwords = []
for field in (
'password',
'become_password',
'ssh_key_unlock',
'vault_password'):
if getattr(self.inputs, field, None) == 'ASK':
if field == 'password':
passwords.append('ssh_password')
else:
passwords.append(field)
return passwords
page.register_page([resources.credential,
(resources.credentials, 'post'),
(resources.credential_copy, 'post')], Credential)
class Credentials(page.PageList, Credential):
pass
page.register_page([resources.credentials,
resources.related_credentials,
resources.job_extra_credentials,
resources.job_template_extra_credentials],
Credentials)
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
class JobTemplates(page.PageList, JobTemplate):
pass
page.register_page([resources.job_templates,
resources.related_job_templates], JobTemplates)
class JobTemplateCallback(base.Base):
pass
page.register_page(resources.job_template_callback, JobTemplateCallback)
class JobTemplateLaunch(base.Base):
pass
page.register_page(resources.job_template_launch, JobTemplateLaunch)