Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
base_url, timeout, pool_connections=num_pools
)
self.mount('http+docker://', self._custom_adapter)
self._unmount('http://', 'https://')
self.base_url = 'http+docker://localunixsocket'
elif base_url.startswith('npipe://'):
if not IS_WINDOWS_PLATFORM:
raise DockerException(
'The npipe:// protocol is only supported on Windows'
)
try:
self._custom_adapter = NpipeAdapter(
base_url, timeout, pool_connections=num_pools
)
except NameError:
raise DockerException(
'Install pypiwin32 package to enable npipe:// support'
)
self.mount('http+docker://', self._custom_adapter)
self.base_url = 'http+docker://localnpipe'
else:
# Use SSLAdapter for the ability to specify SSL version
if isinstance(tls, TLSConfig):
tls.configure_client(self)
elif tls:
self._custom_adapter = SSLAdapter(pool_connections=num_pools)
self.mount('https://', self._custom_adapter)
self.base_url = base_url
# version detection needs to be after unix adapter mounting
if version is None:
self._version = DEFAULT_DOCKER_API_VERSION
def main(**kwargs):
if not os.path.exists(kwargs['test_results_path']):
os.makedirs(kwargs['test_results_path'])
if kwargs['linux_app_url']:
linux_app_url = kwargs['linux_app_url']
else:
linux_app_url = re.findall('https\S*AppImage', requests.get('https://status.im/nightly/').text)[0]
app_version = ([i for i in [i for i in linux_app_url.split('/') if '.AppImage' in i]])[0]
urlretrieve(linux_app_url, 'nightly.AppImage')
client = docker.from_env()
client.images.build(tag='status_desktop', path='.')
pytest.main(['--collect-only', '-m %s' % kwargs['mark']])
from tests import test_data
if kwargs['testrail_report']:
testrail_report = TestrailReportDesktop(kwargs['test_results_path'])
testrail_report.add_run(app_version if app_version else '%s' % datetime.now().strftime("%Y-%m-%d %H:%M"))
for test in test_data.tests_to_run:
print(test)
try:
container = client.containers.run("status_desktop",
detach=True, tty=True, ports={'5900/tcp': 5900},
volumes={kwargs['test_results_path']: {'bind': TEST_REPORT_DIR_CONTAINER,
'mode': 'rw'}})
outcome = container.exec_run(['/jython-2.7.1/bin/jython', '-m', 'pytest', '/home/tests/' + test],
stdout=True, stderr=False, stdin=True)
except Exception:
try:
self.docker_client.swarm.init()
except docker.errors.APIError:
pass
# docker network
self.network_name = "test-network-dfple"
self.network = self.docker_client.networks.create(name=self.network_name, driver='overlay')
# docker-flow-proxy service
# dfp_image = self.docker_client.images.pull('vfarcic/docker-flow-proxy')
dfp_service = {
'name': 'proxy_{}'.format(self.test_name),
'image': 'vfarcic/docker-flow-proxy',
'constraints': [],
'endpoint_spec': docker.types.EndpointSpec(
ports={80: 80, 443: 443, 8080: 8080}),
'env': [
"LISTENER_ADDRESS=swarm_listener_{}".format(self.test_name),
"MODE=swarm",
"DEBUG=true",
"SERVICE_NAME=proxy_{}".format(self.test_name) ],
'networks': [self.network_name]
}
# docker-flow-swarm-listener service
# dfsl_image = self.docker_client.images.pull('vfarcic/docker-flow-swarm-listener')
dfsl_service = {
'name': 'swarm_listener_{}'.format(self.test_name),
'image': 'vfarcic/docker-flow-swarm-listener',
'constraints': ["node.role == manager"],
'endpoint_spec': docker.types.EndpointSpec(
def _create_tagged_image(base_image_tag, new_image_tag, app_or_lib_name):
docker_client = get_docker_client()
command = _get_test_image_setup_command(app_or_lib_name)
split_volumes = _get_split_volumes(get_volume_mounts(app_or_lib_name, get_expanded_libs_specs(), test=True))
create_container_volumes = _get_create_container_volumes(split_volumes)
create_container_binds = _get_create_container_binds(split_volumes)
container = docker_client.create_container(image=base_image_tag,
command=command,
volumes=create_container_volumes,
host_config=docker.utils.create_host_config(binds=create_container_binds))
docker_client.start(container=container['Id'])
log_to_client('Running commands to create new image:')
for line in docker_client.logs(container['Id'], stdout=True, stderr=True, stream=True):
log_to_client(line.strip())
exit_code = docker_client.wait(container['Id'])
if exit_code:
raise ImageCreationError(exit_code)
new_image = docker_client.commit(container=container['Id'])
try:
docker_client.remove_image(image=new_image_tag)
except:
log_to_client('Not able to remove image {}'.format(new_image_tag))
docker_client.tag(image=new_image['Id'], repository=new_image_tag, force=True)
docker_client.remove_container(container=container['Id'], v=True)
def step_impl(context, image_name, basedir):
client = docker.from_env(assert_hostname=False)
full_image_path = os.path.join(IMAGE_DIR, basedir)
output = client.build(full_image_path, rm=True, tag=image_name)
response = [" %s" % (line,) for line in output]
print("Building image %s from %s" % (full_image_path, basedir))
print(response)
return True
def exec_command(cls, command, project, service, timeout_seconds=60):
"""Executes the command in the project and service container running under
docker-compose.
Throws ContainerUnavailableError if the retry limit is reached.
Args:
command: The command to be executed in the container
project: Name of the project the container is hosting
service: Name of the service that the container is hosting
timeout_seconds: Retry time limit to wait for containers to start
Default in seconds: 60
"""
docker_client = Client(version='auto')
container_id = cls._get_container_id(project, service, docker_client, timeout_seconds)
exec_id = docker_client.exec_create(container_id, command)['Id']
docker_client.exec_start(exec_id)
def addif(self, ctn, ip_addr=''):
_name = ctn.next_if_name()
self.ctns.append(ctn)
ip = ''
if not ip_addr == '':
ip = '--ip {0}'.format(ip_addr)
if self.subnet.version == 6:
ip = '--ip6 {0}'.format(ip_addr)
local("docker network connect {0} {1} {2}".format(ip, self.name, ctn.docker_name()))
i = [x for x in list(Client(timeout=60, version='auto').inspect_network(self.id)['Containers'].values()) if x['Name'] == ctn.docker_name()][0]
if self.subnet.version == 4:
eth = 'eth{0}'.format(len(ctn.ip_addrs))
addr = i['IPv4Address']
ctn.ip_addrs.append((eth, addr, self.name))
else:
eth = 'eth{0}'.format(len(ctn.ip6_addrs))
addr = i['IPv6Address']
ctn.ip6_addrs.append((eth, addr, self.name))
def test_create_container_failed(self):
create_user(dict(
username=test_cfg.USER_NAME,
password=test_cfg.USER_PASSWORD,
email=test_cfg.USER_EMAIL
))
req = dict(
image_id=test_cfg.CONTAINER_IMAGE_ID,
user_name=test_cfg.USER_NAME,
container_name=test_cfg.CONTAINER_NAME
)
with mock.patch.object(self.cli, 'create_container') as mock_cli_create:
mock_cli_create.side_effect = docker.errors.APIError(mock.Mock(), mock.Mock())
response = worker.create_container(self.cli, **req)
#The default container_serial is ''
remove_instance_by_serial('')
remove_user_by_username(test_cfg.USER_NAME)
mock_cli_create.assert_called_once()
res_dict = json.loads(response)
self.assertEqual(res_dict.get('code'), '0x3')
def test_remove_volume_exception(self):
resp = mock.MagicMock()
resp.status_code = 409
docker_except = docker_error.APIError('test error', resp)
self.dw = get_DockerWorker({'name': 'nova_compute',
'action': 'remove_volume'})
self.dw.dc.volumes.return_value = self.volumes
self.dw.dc.remove_volume.side_effect = docker_except
self.assertRaises(docker_error.APIError, self.dw.remove_volume)
self.assertTrue(self.dw.changed)
self.dw.module.fail_json.assert_called_once_with(
failed=True,
msg="Volume named 'nova_compute' is currently in-use"
)
def test_should_handle_cleanup_error_when_removing_image(self, v2_image):
self.docker_client.inspect_image.return_value = {'Id': "abcdefgh"}
self.docker_client.remove_image.side_effect = docker.errors.APIError("Message")
squash = Squash(self.log, 'image', self.docker_client, load_image=True, cleanup=True)
squash.run()
self.log.info.assert_any_call("Removing old image image...")
self.log.warn.assert_any_call("Could not remove image image: Message, skipping cleanup after squashing")