Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_container_failed(self):
create_user(dict(
username=test_cfg.USER_NAME,
password=test_cfg.USER_PASSWORD,
email=test_cfg.USER_EMAIL
))
req = dict(
image_id=test_cfg.CONTAINER_IMAGE_ID,
user_name=test_cfg.USER_NAME,
container_name=test_cfg.CONTAINER_NAME
)
with mock.patch.object(self.cli, 'create_container') as mock_cli_create:
mock_cli_create.side_effect = docker.errors.APIError(mock.Mock(), mock.Mock())
response = worker.create_container(self.cli, **req)
#The default container_serial is ''
remove_instance_by_serial('')
remove_user_by_username(test_cfg.USER_NAME)
mock_cli_create.assert_called_once()
res_dict = json.loads(response)
self.assertEqual(res_dict.get('code'), '0x3')
def test_remove_volume_exception(self):
resp = mock.MagicMock()
resp.status_code = 409
docker_except = docker_error.APIError('test error', resp)
self.dw = get_DockerWorker({'name': 'nova_compute',
'action': 'remove_volume'})
self.dw.dc.volumes.return_value = self.volumes
self.dw.dc.remove_volume.side_effect = docker_except
self.assertRaises(docker_error.APIError, self.dw.remove_volume)
self.assertTrue(self.dw.changed)
self.dw.module.fail_json.assert_called_once_with(
failed=True,
msg="Volume named 'nova_compute' is currently in-use"
)
def test_should_handle_cleanup_error_when_removing_image(self, v2_image):
self.docker_client.inspect_image.return_value = {'Id': "abcdefgh"}
self.docker_client.remove_image.side_effect = docker.errors.APIError("Message")
squash = Squash(self.log, 'image', self.docker_client, load_image=True, cleanup=True)
squash.run()
self.log.info.assert_any_call("Removing old image image...")
self.log.warn.assert_any_call("Could not remove image image: Message, skipping cleanup after squashing")
for key, ver in versions.items():
updated = key in current_versions and ver != current_versions[key]
new = key not in current_versions
if new or updated:
new_or_updated.append(ver)
if not new_or_updated:
print("No new or updated versions")
return
# Login to docker hub
docker_client = docker.from_env()
dockerhub_username = os.getenv("DOCKERHUB_USERNAME")
try:
docker_client.login(dockerhub_username, os.getenv("DOCKERHUB_PASSWORD"))
except docker.errors.APIError:
print(f"Could not login to docker hub with username:'{dockerhub_username}'.")
print("Is env var DOCKERHUB_USERNAME and DOCKERHUB_PASSWORD set correctly?")
exit(1)
node_gpg_keys = _fetch_node_gpg_keys()
# Build, tag and push images
for version in new_or_updated:
dockerfile = render_dockerfile(version, node_gpg_keys)
# docker build wants bytes
with BytesIO(dockerfile.encode()) as fileobj:
tag = f"{DOCKER_IMAGE_NAME}:{version['key']}"
nodejs_version = version["nodejs_canonical"]
python_version = version["python_canonical"]
print(
f"Building image {version['key']} python: {python_version} nodejs: {nodejs_version} ...",
end="",
if self.conf['insecure']:
out = self.insecure_run_program()
else:
out = self.docker_run_program()
signal.alarm(0)
except TimedOutException:
logging.info("Timed out writ %s", self.writ.run_id)
self.submit_writ("Error: Timed out", RunState.TIMED_OUT)
except OutputLimitExceeded:
logging.info("Output limit exceeded on writ %s", self.writ.run_id)
self.submit_writ("Error: Output limit exceeded", RunState.OUTPUT_LIMIT_EXCEEDED)
except NoOutputException:
logging.info("No output given from writ %s", self.writ.run_id)
self.submit_writ("", RunState.NO_OUTPUT)
except docker.errors.APIError:
self.return_writ_without_output()
traceback.print_exc()
else:
self.submit_writ(self.clean_output(out), RunState.EXECUTED)
finally:
signal.alarm(0)
if self.container:
self.container.remove(force=True)
self.container = None
try:
shutil.rmtree(self.writ.shared_data_dir)
except FileNotFoundError:
pass
self.writ = None
def docker_connect():
docker_cli = docker.from_env()
try:
docker_cli.ping()
except docker.errors.APIError as e:
_LOG.critical("Failed to connect to docker server\n {}".format(e))
return None
return docker_cli
self.docker.client.remove_container(
self.docker.buffer()["container create"]["Id"], force=True)
if (manage_flag == "container") and (command_flag is not None) and (command_flag == "run"):
self.docker.set_buffer(
self.docker.buffer()["container run"])
if suppress is not True:
self.buf = self.docker.buffer()
except (KeyboardInterrupt, SystemExit) as e:
self.response.set_exception(type(e))
except ConnectionError as e:
self.response.set_exception(type(e))
self.response.message = e.message[0]
except (AttributeError, ValueError) as e:
self.response.set_exception(type(e))
self.response.message = str(e)
except (APIError, NotFound) as e:
self.response.set_exception(type(e))
try:
self.response.message = e.response.json()["message"]
except ValueError:
self.response.message = str(e.response.text[:-1])
except RuntimeError as e:
self.response.set_exception(type(e))
self.response.message = str(e)
def section_node_disk_usage(client):
'''docker system df'''
section = Section('node_disk_usage')
try:
data = client.df()
except docker.errors.APIError as exc:
if DEBUG:
raise
section.write()
LOGGER.exception(exc)
return
LOGGER.debug(data)
def get_row(type_, instances, is_inactive, key='Size'):
inactive = [i for i in instances if is_inactive(i)]
item_data = {
'type': type_,
'size': sum(i.get(key, 0) for i in instances),
'reclaimable': sum(i.get(key, 0) for i in inactive),
'count': len(instances),
'active': len(instances) - len(inactive),
}
def _setup_network(self, name):
try:
labels = {defaults.get('DEFAULT_DOCKER_LABEL_KEY'): get_clusterdock_label(self.name)}
network = client.networks.create(name=name,
driver=DEFAULT_NETWORK_TYPE,
check_duplicate=True,
labels=labels)
logger.debug('Successfully created network (%s).', name)
except docker.errors.APIError as api_error:
if api_error.explanation == 'network with name {} already exists'.format(name):
logger.warning('Network (%s) already exists. Continuing without creating ...',
name)
network = client.networks.get(name)
else:
raise
return network