Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.get_config('format') == 'human' and
_filter == '.' and
self.resource in UNIQUENESS_RULES
):
_filter = ', '.join(UNIQUENESS_RULES[self.resource])
formatted = format_response(
response,
fmt=self.get_config('format'),
filter=_filter,
changed=self.original_action in (
'modify', 'create', 'associate', 'disassociate'
)
)
if formatted:
print(utils.to_str(formatted), file=self.stdout)
else:
if six.PY3:
self.parser.print_help()
elif six.PY2 and not self.help:
# Unfortunately, argparse behavior between py2 and py3
# changed in a notable way when required subparsers
# have invalid (or missing) arguments specified
# see: https://github.com/python/cpython/commit/f97c59aaba2d93e48cbc6d25f7ff9f9c87f8d0b2
print('\nargument resource: invalid choice')
raise SystemExit(2)
def main():
exc = None
try:
global akit_args
akit_args = parse_args()
config.base_url = akit_args.base_url
if akit_args.credential_file != utils.not_provided:
config.credentials = utils.load_credentials(
akit_args.credential_file)
else:
config.credentials = utils.PseudoNamespace({
'default': {
'username': os.getenv('AWXKIT_USER', 'admin'),
'password': os.getenv('AWXKIT_USER_PASSWORD', 'password')
}
})
if akit_args.project_file != utils.not_provided:
config.project_urls = utils.load_projects(
akit_args.project_file)
global root
root = api.Api()
if uses_sessions(root.connection):
config.use_sessions = True
root.load_session().get()
else:
def connect(self):
"""Fetch top-level resources from /api/v2"""
config.base_url = self.get_config('host')
config.client_connection_attempts = 1
config.assume_untrusted = False
if self.get_config('insecure'):
config.assume_untrusted = True
config.credentials = utils.PseudoNamespace({
'default': {
'username': self.get_config('username'),
'password': self.get_config('password'),
}
})
_, remainder = self.parser.parse_known_args()
if remainder and remainder[0] == 'config':
# the config command is special; it doesn't require
# API connectivity
return
# ...otherwise, set up a awxkit connection because we're
# likely about to do some requests to /api/v2/
self.root = api.Api()
try:
self.fetch_version_root()
def parse_args():
parser = ArgumentParser()
parser.add_argument(
'--base-url',
dest='base_url',
default=os.getenv(
'AWXKIT_BASE_URL',
'http://127.0.0.1:8013'),
help='URL for AWX. Defaults to env var AWXKIT_BASE_URL or http://127.0.0.1:8013')
parser.add_argument(
'-c',
'--credential-file',
dest='credential_file',
default=os.getenv(
'AWXKIT_CREDENTIAL_FILE',
utils.not_provided),
help='Path for yml credential file. If not provided or set by AWXKIT_CREDENTIAL_FILE, set '
'AWXKIT_USER and AWXKIT_USER_PASSWORD env vars for awx user credentials.')
parser.add_argument(
'-p',
'--project-file',
dest='project_file',
default=os.getenv(
'AWXKIT_PROJECT_FILE'),
help='Path for yml project config file.'
'If not provided or set by AWXKIT_PROJECT_FILE, projects will not have default SCM_URL')
parser.add_argument('-f', '--file', dest='akit_script', default=False,
help='akit script file to run in interactive session.')
parser.add_argument(
'-x',
'--non-interactive',
action='store_true',
def main():
exc = None
try:
global akit_args
akit_args = parse_args()
config.base_url = akit_args.base_url
if akit_args.credential_file != utils.not_provided:
config.credentials = utils.load_credentials(
akit_args.credential_file)
else:
config.credentials = utils.PseudoNamespace({
'default': {
'username': os.getenv('AWXKIT_USER', 'admin'),
'password': os.getenv('AWXKIT_USER_PASSWORD', 'password')
}
})
if akit_args.project_file != utils.not_provided:
config.project_urls = utils.load_projects(
akit_args.project_file)
global root
root = api.Api()
if uses_sessions(root.connection):
akit_args = parse_args()
config.base_url = akit_args.base_url
if akit_args.credential_file != utils.not_provided:
config.credentials = utils.load_credentials(
akit_args.credential_file)
else:
config.credentials = utils.PseudoNamespace({
'default': {
'username': os.getenv('AWXKIT_USER', 'admin'),
'password': os.getenv('AWXKIT_USER_PASSWORD', 'password')
}
})
if akit_args.project_file != utils.not_provided:
config.project_urls = utils.load_projects(
akit_args.project_file)
global root
root = api.Api()
if uses_sessions(root.connection):
config.use_sessions = True
root.load_session().get()
else:
root.load_authtoken().get()
if 'v2' in root.available_versions:
global v2
v2 = root.available_versions.v2.get()
rc = 0
if akit_args.akit_script:
def main():
exc = None
try:
global akit_args
akit_args = parse_args()
config.base_url = akit_args.base_url
if akit_args.credential_file != utils.not_provided:
config.credentials = utils.load_credentials(
akit_args.credential_file)
else:
config.credentials = utils.PseudoNamespace({
'default': {
'username': os.getenv('AWXKIT_USER', 'admin'),
'password': os.getenv('AWXKIT_USER_PASSWORD', 'password')
}
})
if akit_args.project_file != utils.not_provided:
config.project_urls = utils.load_projects(
akit_args.project_file)
global root
root = api.Api()