Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_assert_hostname(tmpdir):
path = str(tmpdir.join('no-manifest'))
create_repo(path)
args = get_defaults()
args['--x-assert-hostname'] = True
tls_config = tls.TLSConfig()
_, _, _, _, _, client = cli.process_arguments(
path, args,
client_cfg={
'base_url': 'https://example.com:443/api/v1/',
'tls': tls_config,
},
environ={'SW_NAMESPACE': 'eg'},
)
assert not client.adapters['https://'].assert_hostname
*,
job_id: str,
job_model: str,
exec_image: File,
exec_image_sha256: str,
):
super().__init__()
self._job_id = job_id
self._job_label = f"{job_model}-{job_id}"
self._exec_image = exec_image
self._exec_image_sha256 = exec_image_sha256
client_kwargs = {"base_url": settings.CONTAINER_EXEC_DOCKER_BASE_URL}
if settings.CONTAINER_EXEC_DOCKER_TLSVERIFY:
tlsconfig = TLSConfig(
verify=True,
client_cert=(
settings.CONTAINER_EXEC_DOCKER_TLSCERT,
settings.CONTAINER_EXEC_DOCKER_TLSKEY,
),
ca_cert=settings.CONTAINER_EXEC_DOCKER_TLSCACERT,
)
client_kwargs.update({"tls": tlsconfig})
self._client = docker.DockerClient(**client_kwargs)
self._labels = {"job": f"{self._job_label}", "traefik.enable": "false"}
self._run_kwargs = {
"init": True,
"network_disabled": True,
self._backend_url = '{:s}://localhost:{:d}'.format(
proto, self._tunnel.bind_port)
# Apparently bgtunnel isn't always ready right away and this
# drastically cuts down on the timeouts
time.sleep(1)
elif self._socket_path is not None:
self._backend_url = 'unix://{:s}'.format(self._socket_path)
else:
proto = "https" if (tls or tls_verify) else "http"
self._backend_url = '{:s}://{:s}:{:d}'.format(
proto, self._endpoint, self._docker_port)
self._tls = docker.tls.TLSConfig(
verify=tls_verify,
client_cert=(tls_cert, tls_key),
ca_cert=tls_ca_cert,
ssl_version=ssl_version) if tls else None
self._backend = docker.Client(
base_url=self._backend_url,
version=str(api_version or Ship.DEFAULT_API_VERSION),
timeout=timeout or Ship.DEFAULT_DOCKER_TIMEOUT,
tls=self._tls)
def _client(self):
# TODO: add cert options to containers conf
cert_path = os.environ.get('DOCKER_CERT_PATH') or None
if not cert_path:
cert_path = os.path.join(os.path.expanduser('~'), '.docker')
if self._conf.force_tlsverify or self._conf.tls:
tls_config = docker.tls.TLSConfig(
client_cert=(os.path.join(cert_path, 'cert.pem'),
os.path.join(cert_path, 'key.pem')),
ca_cert=os.path.join(cert_path, 'ca.pem'),
verify=self._conf.force_tlsverify,
)
else:
tls_config = False
if not self.__client:
self.__client = DockerAPIClient(
host_iter=self.host_iter,
tls=tls_config,
)
return self.__client
def _start_recording(self, queue):
"""
"""
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP, signal.SIGQUIT]:
signal.signal(sig, self._handler)
client = None
try:
if isinstance(self.client.verify, bool):
tls_config = docker.tls.TLSConfig(
client_cert=self.client.cert,
verify=self.client.verify
)
else:
tls_config = docker.tls.TLSConfig(
client_cert=self.client.cert,
ca_cert=self.client.verify
)
client = docker.Client(self.client.base_url, tls=tls_config, timeout=self.client.timeout, version=self.client.api_version)
for line in client.attach(self.id, True, True, True, False):
queue.put(line)
finally:
if isinstance(client, docker.Client):
client.close()
if self.url.scheme == 'https':
# TODO: Need to allow for ca to be passed if not disable warnings.
urllib3.disable_warnings()
for cert_name_type in ('ca', 'cert', 'key'):
cert_path = utils.validate_path(os.path.join(kwargs['ssl_cert_path'], "{0}.pem".format(cert_name_type))) \
if 'ssl_cert_path' in kwargs and kwargs['ssl_cert_path'] else None
setattr(self, 'SSL_{0}_PATH'.format(cert_name_type.upper()), cert_path)
self.SSL_VERIFY = kwargs['verify'] if 'verify' in kwargs and isinstance(kwargs['verify'], bool) else True
if not self.SSL_VERIFY:
self.SSL_CA_PATH = None
client_certs = (self.SSL_CERT_PATH, self.SSL_KEY_PATH) if self.SSL_KEY_PATH and self.SSL_CERT_PATH else None
tls_config = docker.tls.TLSConfig(client_cert=client_certs, ca_cert=self.SSL_CA_PATH, verify=self.SSL_VERIFY)
self._client_session = docker.Client(self.url.geturl(), tls=tls_config, timeout=DOCKER_DEFAULT_TIMEOUT, version=self.API_VERSION)
else:
self._client_session = docker.Client(self.url.geturl(), timeout=DOCKER_DEFAULT_TIMEOUT, version=self.API_VERSION)
self._docker_info = self._client_session.version()
self._injector = None
def generate_tls(self):
tls = {'verify': self.params.get('tls_verify')}
tls_cert = self.params.get('tls_cert'),
tls_key = self.params.get('tls_key'),
tls_cacert = self.params.get('tls_cacert')
if tls['verify']:
if tls_cert:
self.check_file(tls_cert)
self.check_file(tls_key)
tls['client_cert'] = (tls_cert, tls_key)
if tls_cacert:
self.check_file(tls_cacert)
tls['verify'] = tls_cacert
return docker.tls.TLSConfig(**tls)
def __get_tls_config(self):
tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname,
)
self.docker_url = self.docker_url.replace("tcp://", "https://")
return tls_config
Returns the docker configuration for the given machine.
Args:
machine: The machine name
Returns:
dict: base_url, tls
"""
cmd = ["config", machine]
regexp = """(--tlsverify\n)?--tlscacert="(.+)"\n--tlscert="(.+)"\n--tlskey="(.+)"\n-H=(.+)"""
match = self._match(cmd, regexp)
tlsverify, tlscacert, tlscert, tlskey, host = match.group(1, 2, 3, 4, 5)
tlsverify = bool(tlsverify)
params = {
'base_url': host.replace('tcp://', 'https://') if tlsverify else host,
'tls': TLSConfig(
client_cert=(tlscert, tlskey),
ca_cert=tlscacert,
verify=True
)
}
return params
cert_paths['cert_files'] = {
'client_cert': join(cert_paths['cert_dir'], 'client.cert'),
'client_key': join(cert_paths['cert_dir'], 'client.key'),
'ca_crt': join(cert_paths['cert_dir'], 'ca.crt')
}
if not isdir(cert_paths['cert_dir']):
self.logger.error('%s is not a valid cert folder', cert_paths['cert_dir'])
raise ValueError
for cert_file in cert_paths['cert_files'].values():
if not isfile(cert_file):
self.logger.error('%s does not exist', cert_file)
raise ValueError
tls_config = tls.TLSConfig(
ca_cert=cert_paths['cert_files']['ca_crt'],
verify=cert_paths['cert_files']['ca_crt'] if self.config.docker_tls_verify else False,
client_cert=(cert_paths['cert_files']['client_cert'], cert_paths['cert_files']['client_key'])
)
client = DockerClient(base_url=self.socket, tls=tls_config)
except ValueError:
self.logger.error('Invalid Docker TLS config for %s, reverting to unsecured', self.socket)
client = DockerClient(base_url=self.socket)
else:
client = DockerClient(base_url=self.socket)
return client