Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
meta = api.CollectionVersionMetadata('ansible_namespace', 'collection', '0.1.0', 'https://downloadme.com',
'myhash', {})
req = collection.CollectionRequirement('ansible_namespace', 'collection', None, galaxy_server,
['0.1.0'], '*', False, metadata=meta)
req.install(to_text(output_path), temp_path)
# Ensure the temp directory is empty, nothing is left behind
assert os.listdir(temp_path) == []
actual_files = os.listdir(collection_path)
actual_files.sort()
assert actual_files == [b'FILES.json', b'MANIFEST.json', b'README.md', b'docs', b'playbooks', b'plugins', b'roles']
assert mock_display.call_count == 1
assert mock_display.mock_calls[0][1][0] == "Installing 'ansible_namespace.collection:0.1.0' to '%s'" \
% to_text(collection_path)
assert mock_download.call_count == 1
assert mock_download.mock_calls[0][1][0] == 'https://downloadme.com'
assert mock_download.mock_calls[0][1][1] == temp_path
assert mock_download.mock_calls[0][1][2] == 'myhash'
assert mock_download.mock_calls[0][1][3] is True
def _download_file(url, b_path, expected_hash, validate_certs, headers=None):
bufsize = 65536
digest = sha256()
urlsplit = os.path.splitext(to_text(url.rsplit('/', 1)[1]))
b_file_name = to_bytes(urlsplit[0], errors='surrogate_or_strict')
b_file_ext = to_bytes(urlsplit[1], errors='surrogate_or_strict')
b_file_path = tempfile.NamedTemporaryFile(dir=b_path, prefix=b_file_name, suffix=b_file_ext, delete=False).name
display.vvv("Downloading %s to %s" % (url, to_text(b_path)))
# Galaxy redirs downloads to S3 which reject the request if an Authorization header is attached so don't redir that
resp = open_url(to_native(url, errors='surrogate_or_strict'), validate_certs=validate_certs, headers=headers,
unredirected_headers=['Authorization'], http_agent=user_agent())
with open(b_file_path, 'wb') as download_file:
data = resp.read(bufsize)
while data:
digest.update(data)
download_file.write(data)
data = resp.read(bufsize)
def _parse_vaulttext_envelope(b_vaulttext_envelope, default_vault_id=None):
b_tmpdata = b_vaulttext_envelope.splitlines()
b_tmpheader = b_tmpdata[0].strip().split(b';')
b_version = b_tmpheader[1].strip()
cipher_name = to_text(b_tmpheader[2].strip())
vault_id = default_vault_id
# Only attempt to find vault_id if the vault file is version 1.2 or newer
# if self.b_version == b'1.2':
if len(b_tmpheader) >= 4:
vault_id = to_text(b_tmpheader[3].strip())
b_ciphertext = b''.join(b_tmpdata[1:])
return b_ciphertext, b_version, cipher_name, vault_id
src = self._task.args.get('src')
working_path = self._get_working_path()
if os.path.isabs(src) or urlsplit('src').scheme:
source = src
else:
source = self._loader.path_dwim_relative(working_path, 'templates', src)
if not source:
source = self._loader.path_dwim_relative(working_path, src)
if not os.path.exists(source):
raise ValueError('path specified in src not found')
try:
with open(source, 'r') as f:
template_data = to_text(f.read())
except IOError:
return dict(failed=True, msg='unable to load src file')
# Create a template search path in the following order:
# [working_path, self_role_path, dependent_role_paths, dirname(source)]
searchpath = [working_path]
if self._task._role is not None:
searchpath.append(self._task._role._role_path)
if hasattr(self._task, "_block:"):
dep_chain = self._task._block.get_dep_chain()
if dep_chain is not None:
for role in dep_chain:
searchpath.append(role._role_path)
searchpath.append(os.path.dirname(source))
self._templar.environment.loader.searchpath = searchpath
self._task.args['src'] = self._templar.template(template_data)
([^:\]\s]+) # group name (see groupname below)
(?::(\w+))? # optional : and tag name
\]
\s* # ignore trailing whitespace
(?:\#.*)? # and/or a comment till the
$ # end of the line
''', errors='surrogate_or_strict'), re.X
)
# FIXME: What are the real restrictions on group names, or rather, what
# should they be? At the moment, they must be non-empty sequences of non
# whitespace characters excluding ':' and ']', but we should define more
# precise rules in order to support better diagnostics.
self.patterns['groupname'] = re.compile(
to_text(r'''^
([^:\]\s]+)
\s* # ignore trailing whitespace
(?:\#.*)? # and/or a comment till the
$ # end of the line
''', errors='surrogate_or_strict'), re.X
)
if response.getcode() == 200: # Success
if state == "present": # Logon Action
# Result token from REST Api uses a different key based
# the use of shared logon authentication
token = None
try:
if use_shared_logon:
token = json.loads(response.read())["LogonResult"]
else:
token = json.loads(response.read())["CyberArkLogonResult"]
except Exception as e:
module.fail_json(
msg="Error obtaining token\n%s" % (to_text(e)),
payload=payload,
headers=headers,
status_code=-1,
)
# Preparing result of the module
result = {
"cyberark_session": {
"token": token,
"api_base_url": api_base_url,
"validate_certs": validate_certs,
"use_shared_logon_authentication": use_shared_logon,
}
}
if new_password is not None:
def get_capabilities(module):
if hasattr(module, '_dellos9_capabilities'):
return module._dellos9_capabilities
try:
capabilities = Connection(module._socket_path).get_capabilities()
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors='surrogate_then_replace'))
module._dellos9_capabilities = json.loads(capabilities)
return module._dellos9_capabilities
def get_text(self, ele, tag):
try:
return to_text(ele.find(tag).text, errors='surrogate_then_replace').strip()
except AttributeError:
pass
with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as f:
in_data = to_bytes(f.read(), nonstring='passthru')
if not in_data:
count = ' count=0'
else:
count = ''
(returncode, stdout, stderr) = self.exec_command('dd of=%s bs=%s%s' % (out_path, BUFSIZE, count), in_data=in_data, sudoable=False)
# Check the return code and rollover to next method if failed
if returncode == 0:
return (returncode, stdout, stderr)
else:
# If not in smart mode, the data will be printed by the raise below
if len(methods) > 1:
display.warning(u'%s transfer mechanism failed on %s. Use ANSIBLE_DEBUG=1 to see detailed information' % (method, host))
display.debug(u'%s' % to_text(stdout))
display.debug(u'%s' % to_text(stderr))
if returncode == 255:
raise AnsibleConnectionFailure("Failed to connect to the host via %s: %s" % (method, to_native(stderr)))
else:
raise AnsibleError("failed to transfer file to %s %s:\n%s\n%s" %
(to_native(in_path), to_native(out_path), to_native(stdout), to_native(stderr)))
def __init__(self, play_context, new_stdin, *args, **kwargs):
super(Connection, self).__init__(play_context, new_stdin, *args, **kwargs)
self._task_uuid = to_text(kwargs.get('task_uuid', ''))