Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_docker_output(arch_suffix: str, file_path: str, root_path: Path) -> dict:
'''
:return: in the case of no error, the output will have the form
{
'parameter 1': {'stdout': , 'stderr': , 'return_code': },
'parameter 2': {...},
'...',
'strace': {'stdout': , 'stderr': , 'return_code': },
}
in case of an error, there will be an entry 'error' instead of the entries stdout/stderr/return_code
'''
container = None
volume = Mount(CONTAINER_TARGET_PATH, str(root_path), read_only=True, type="bind")
try:
client = docker.from_env()
container = client.containers.run(
DOCKER_IMAGE, '{arch_suffix} {target}'.format(arch_suffix=arch_suffix, target=file_path),
network_disabled=True, mounts=[volume], detach=True
)
container.wait(timeout=TIMEOUT_IN_SECONDS)
return loads(container.logs().decode())
except (ImageNotFound, APIError, DockerException, RequestConnectionError):
return {'error': 'process error'}
except ReadTimeout:
return {'error': 'timeout'}
except JSONDecodeError:
return {'error': 'could not decode result'}
finally:
if container:
if key in ['target', 'dst', 'destination']:
target = val
elif key in ['source', 'src']:
source = val
elif key in ['readonly, ro']:
mount_kwargs['read_only'] = val
elif key in ['type', 'propagation']:
mount_kwargs[key] = val
elif key == 'volume-label':
k, v = val.strip("\"\'").split('=')
mount_kwargs.setdefault('labels', {}).update({k: v})
except Exception as e:
raise SyntaxError("Invalid mount format {0}\n{1}".format(string, e))
return docker.types.Mount(target, source, **mount_kwargs)
mount_data_dir = UserInputHelper.get_valid_choice_input(
prompt='Bind the GeoServer data directory to the host?',
choices=['y', 'n'],
default='y',
)
if mount_data_dir.lower() == 'y':
tethys_home = os.environ.get('TETHYS_HOME', os.path.expanduser('~/tethys/'))
default_mount_location = os.path.join(tethys_home, 'geoserver', 'data')
gs_data_volume = '/var/geoserver/data'
mount_location = UserInputHelper.get_valid_directory_input(
prompt='Specify location to bind data directory',
default=default_mount_location
)
mounts = [Mount(gs_data_volume, mount_location, type='bind')]
options['host_config'].update(mounts=mounts)
return options
def create_mounts(mount_list): # TODO: expand this for other storage_type's
res = []
for m in mount_list:
mnt = docker.types.Mount(type='bind', source=m.host_path+'/', target=m.mount_path)
res.append(mnt)
return res
def create_env(image, name, project_dir, deps, container_args):
workdir = os.path.abspath(project_dir)
mounts = [
Mount('/usr/src', workdir, type='bind')
]
kwargs = {
'command': '/bin/sh',
'stdin_open': True,
'labels': {
'workdir': workdir,
'env_name': name,
},
'name': definitions.CONTAINERS_PREFIX + name,
'mounts': mounts,
'network': definitions.CONTAINERS_PREFIX + name + '_network',
}
filtered_container_args = {k: v for k, v in container_args.items()
if k not in kwargs}
kwargs.update(filtered_container_args)
c.remove(force=True)
if not _is_running(master_name):
if not working_dir:
working_dir = os.getcwd()
print("Staring %s container in dir %s" % (master_name, working_dir))
# mount target dir needs to be absolute
target_dir = '/home/user'
user_dir = Mount(target=target_dir,
source=working_dir,
type='bind')
compss_log_dir = Mount(target='/root/.COMPSs',
source=os.environ['HOME'] + '/.COMPSs',
type='bind')
mounts = [user_dir, compss_log_dir]
client.containers.run(image='dislib', name=master_name,
mounts=mounts, detach=True)
'type': 'type',
'readonly': 'read_only',
'propagation': 'propagation',
'labels': 'labels',
'no_copy': 'no_copy',
'driver_config': 'driver_config',
'tmpfs_size': 'tmpfs_size',
'tmpfs_mode': 'tmpfs_mode'
}
mount_args = {}
for option, mount_arg in mount_options.items():
value = mount_config.get(option)
if value is not None:
mount_args[mount_arg] = value
mounts.append(types.Mount(**mount_args))
configs = None
if self.configs is not None:
configs = []
for config_config in self.configs:
config_args = {
'config_id': config_config['config_id'],
'config_name': config_config['config_name']
}
filename = config_config.get('filename')
if filename:
config_args['filename'] = filename
uid = config_config.get('uid')
if uid:
config_args['uid'] = uid
gid = config_config.get('gid')
self.docker_public_agent_labels = docker_public_agent_labels or {}
self.transport = transport
self.network = network
self.one_master_host_port_map = one_master_host_port_map or {}
self.container_name_prefix = container_name_prefix
# Deploying some applications, such as Kafka, read from the cgroups
# isolator to know their CPU quota.
# This is determined only by error messages when we attempt to deploy
# Kafka on a cluster without this mount.
# Therefore, we mount ``/sys/fs/cgroup`` from the host.
#
# This has a problem - some hosts do not have systemd, and therefore
# ``/sys/fs/cgroup`` is not available, and a mount error is shown.
# See https://jira.d2iq.com/browse/DCOS_OSS-4475 for details.
cgroup_mount = Mount(
source='/sys/fs/cgroup',
target='/sys/fs/cgroup',
read_only=True,
type='bind',
)
self.cgroup_mounts = [cgroup_mount] if mount_sys_fs_cgroup else []
def _setup_docker_uri_endpoint(self, ports_dict, volume_dict, mounts_list):
edge_config = self._config_obj
# if the uri has a port, create a port mapping else
# volume mount the end point (ex. unix socket file)
if edge_config.deployment_config.uri_port and \
edge_config.deployment_config.uri_port != '':
key = edge_config.deployment_config.uri_port + '/tcp'
val = int(edge_config.deployment_config.uri_port)
ports_dict[key] = val
else:
if self._client.get_os_type() == 'windows':
# Windows needs 'mounts' to mount named pipe to Agent
docker_pipe = edge_config.deployment_config.uri_endpoint
mounts_list.append(docker.types.Mount(target=docker_pipe,
source=docker_pipe,
type='npipe'))
else:
key = edge_config.deployment_config.uri_endpoint
volume_dict[key] = {'bind': key, 'mode': 'rw'}
if not src:
src = "/"
if os.path.isfile(src):
raise AliDockError("mount {src} is a file: only dirs allowed".format(src=src))
if not label:
label = "root" if src == "/" else os.path.basename(src)
elif "/" in label or label in [".", ".."]:
raise AliDockError("mount label {label} is invalid: label cannot contain a slash"
"and cannot be equal to \"..\" or \".\"".format(label=label))
mnt = posixpath.join("/", "mnt", label)
if not mode:
mode = "rw"
if mode not in ["rw", "ro"]:
raise AliDockError("supported modes for mounts are \"rw\" and \"ro\", "
"not {mode}".format(mode=mode))
dockMounts.append(Mount(mnt, src, type="bind", read_only=(mode == "ro"),
consistency="cached"))
return dockMounts