Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _localize_inputs_recursive_command(self, task_dir, inputs):
"""Returns a command that will stage recursive inputs."""
data_dir = os.path.join(task_dir, _DATA_SUBDIR)
provider_commands = [
providers_util.build_recursive_localize_command(data_dir, inputs,
file_provider)
for file_provider in _SUPPORTED_INPUT_PROVIDERS
]
return '\n'.join(provider_commands)
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_LOCALIZATION_LOOP)
]),
google_v2_pipelines.build_action(
name='user-command',
image_uri=job_resources.image,
mounts=[mnt_datadisk] + persistent_disk_mounts,
environment=user_environment,
entrypoint='/usr/bin/env',
commands=[
'bash', '-c',
_USER_CMD.format(
tmp_dir=providers_util.TMP_DIR,
working_dir=providers_util.WORKING_DIR,
user_script=script_path)
]),
google_v2_pipelines.build_action(
name='delocalization',
image_uri=_CLOUD_SDK_IMAGE,
mounts=[mnt_datadisk],
environment=delocalization_env,
entrypoint='/bin/bash',
commands=[
'-c',
_LOCALIZATION_CMD.format(
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_DELOCALIZATION_LOOP)
]),
script_data = script_header.format(
volumes=volumes,
name=_format_task_name(
job_metadata.get('job-id'), task_metadata.get('task-id'),
task_metadata.get('task-attempt')),
image=job_resources.image,
script=providers_util.DATA_MOUNT_POINT + '/' + _SCRIPT_DIR + '/' +
job_metadata['script'].name,
env_file=task_dir + '/' + 'docker.env',
uid=os.getuid(),
data_mount_point=providers_util.DATA_MOUNT_POINT,
data_dir=task_dir + '/' + _DATA_SUBDIR,
date_format='+%Y-%m-%d %H:%M:%S',
workingdir=_WORKING_DIR,
export_input_dirs=providers_util.build_recursive_localize_env(
task_dir, job_params['inputs'] | task_params['inputs']),
recursive_localize_command=self._localize_inputs_recursive_command(
task_dir, job_params['inputs'] | task_params['inputs']),
localize_command=self._localize_inputs_command(
task_dir, job_params['inputs'] | task_params['inputs'],
job_metadata['user-project']),
export_output_dirs=providers_util.build_recursive_gcs_delocalize_env(
task_dir, job_params['outputs'] | task_params['outputs']),
recursive_delocalize_command=self._delocalize_outputs_recursive_command(
task_dir, job_params['outputs'] | task_params['outputs']),
delocalize_command=self._delocalize_outputs_commands(
task_dir, job_params['outputs'] | task_params['outputs'],
job_metadata['user-project']),
delocalize_logs_command=self._delocalize_logging_command(
task_resources.logging_path, job_metadata['user-project']),
export_mount_dirs=providers_util.build_mount_env(
name='mount-{}'.format(bucket),
flags=['ENABLE_FUSE', 'RUN_IN_BACKGROUND'],
image_uri=_GCSFUSE_IMAGE,
mounts=[mnt_datadisk],
commands=[
'--implicit-dirs', '--foreground', '-o ro', bucket,
os.path.join(providers_util.DATA_MOUNT_POINT, mount_path)
]),
google_v2_pipelines.build_action(
name='mount-wait-{}'.format(bucket),
flags=['ENABLE_FUSE'],
image_uri=_GCSFUSE_IMAGE,
mounts=[mnt_datadisk],
commands=[
'wait',
os.path.join(providers_util.DATA_MOUNT_POINT, mount_path)
])
])
return actions_to_add
_LOCALIZATION_CMD.format(
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_LOCALIZATION_LOOP)
]),
google_v2_pipelines.build_action(
name='user-command',
image_uri=job_resources.image,
mounts=[mnt_datadisk] + persistent_disk_mounts,
environment=user_environment,
entrypoint='/usr/bin/env',
commands=[
'bash', '-c',
_USER_CMD.format(
tmp_dir=providers_util.TMP_DIR,
working_dir=providers_util.WORKING_DIR,
user_script=script_path)
]),
google_v2_pipelines.build_action(
name='delocalization',
image_uri=_CLOUD_SDK_IMAGE,
mounts=[mnt_datadisk],
environment=delocalization_env,
entrypoint='/bin/bash',
commands=[
'-c',
_LOCALIZATION_CMD.format(
log_msg_fn=_LOG_MSG_FN,
recursive_cp_fn=_GSUTIL_RSYNC_FN,
cp_fn=_GSUTIL_CP_FN,
cp_loop=_DELOCALIZATION_LOOP)
# Old instances of the meta.yaml do not have a task create time.
if not value:
value = job_metadata.get(field)
elif field == 'start-time':
# There's no delay between creation and start since we launch docker
# immediately for local runs.
value = self.get_field('create-time', default)
elif field in ['task-id', 'task-attempt']:
value = task_metadata.get(field)
elif field == 'logging':
# The job_resources will contain the "--logging" value.
# The task_resources will contain the resolved logging path.
# get_field('logging') should currently return the resolved logging path.
value = task_resources.logging_path
elif field in ['labels', 'envs']:
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value = {item.name: item.value for item in items}
elif field in [
'inputs', 'outputs', 'input-recursives', 'output-recursives'
]:
value = {}
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value.update({item.name: item.value for item in items})
elif field == 'mounts':
items = providers_util.get_job_and_task_param(job_params, task_params,
field)
value = {item.name: item.value for item in items}
elif field == 'provider':
return _PROVIDER_NAME
elif field == 'provider-attributes':
'dsub-version'
]:
value = google_v2_operations.get_label(self._op, field)
elif field == 'task-status':
value = self._operation_status()
elif field == 'logging':
if self._job_descriptor:
# The job_resources will contain the "--logging" value.
# The task_resources will contain the resolved logging path.
# Return the resolved logging path.
task_resources = self._job_descriptor.task_descriptors[0].task_resources
value = task_resources.logging_path
elif field in ['envs', 'labels']:
if self._job_descriptor:
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value = {item.name: item.value for item in items}
elif field in [
'inputs', 'outputs', 'input-recursives', 'output-recursives'
]:
if self._job_descriptor:
value = {}
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
self._job_descriptor.task_descriptors[0].task_params, field)
value.update({item.name: item.value for item in items})
elif field == 'mounts':
if self._job_descriptor:
items = providers_util.get_job_and_task_param(
self._job_descriptor.job_params,
delocalize_data() {{
{delocalize_command}
recursive_delocalize_data
}}
delocalize_logs() {{
{delocalize_logs_command}
delocalize_logs_function "${{cp_cmd}}" "${{prefix}}"
}}
""")
# Build the local runner script
volumes = ('-v ' + task_dir + '/' + _DATA_SUBDIR + '/'
':' + providers_util.DATA_MOUNT_POINT)
for mount in param_util.get_local_mounts(job_params['mounts']):
volumes += '\n'
docker_path = os.path.join(providers_util.DATA_MOUNT_POINT,
mount.docker_path)
volumes += '-v {}:{}:ro'.format(mount.uri, docker_path)
script_data = script_header.format(
volumes=volumes,
name=_format_task_name(
job_metadata.get('job-id'), task_metadata.get('task-id'),
task_metadata.get('task-attempt')),
image=job_resources.image,
script=providers_util.DATA_MOUNT_POINT + '/' + _SCRIPT_DIR + '/' +
job_metadata['script'].name,
env_file=task_dir + '/' + 'docker.env',
uid=os.getuid(),