Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def prepare_job_metadata(script, job_name, user_id, create_time):
"""Returns a dictionary of metadata fields for the job."""
# The name of the pipeline gets set into the ephemeralPipeline.name as-is.
# The default name of the pipeline is the script name
# The name of the job is derived from the job_name and gets set as a
# 'job-name' label (and so the value must be normalized).
if job_name:
pipeline_name = job_name
job_name_value = job_model.convert_to_label_chars(job_name)
else:
pipeline_name = os.path.basename(script)
job_name_value = job_model.convert_to_label_chars(
pipeline_name.split('.', 1)[0])
# The user-id will get set as a label
user_id = job_model.convert_to_label_chars(user_id)
# Now build the job-id. We want the job-id to be expressive while also
# having a low-likelihood of collisions.
#
# For expressiveness, we:
# * use the job name (truncated at 10 characters).
# * insert the user-id
# * add a datetime value
# To have a high likelihood of uniqueness, the datetime value is out to
# hundredths of a second.
#
# The full job-id is:
def _read_task_metadata(self, task_dir):
with open(os.path.join(task_dir, 'meta.yaml'), 'rt') as f:
return job_model.JobDescriptor.from_yaml(f.read())
# Set local variables for the core pipeline values
script = task_view.job_metadata['script']
user_project = task_view.job_metadata['user-project'] or ''
envs = job_params['envs'] | task_params['envs']
inputs = job_params['inputs'] | task_params['inputs']
outputs = job_params['outputs'] | task_params['outputs']
mounts = job_params['mounts']
gcs_mounts = param_util.get_gcs_mounts(mounts)
persistent_disk_mount_params = param_util.get_persistent_disk_mounts(mounts)
persistent_disks = [
google_v2_pipelines.build_disk(
name=disk.name.replace('_', '-'), # Underscores not allowed
size_gb=disk.disk_size or job_model.DEFAULT_MOUNTED_DISK_SIZE,
source_image=disk.value,
disk_type=disk.disk_type or job_model.DEFAULT_DISK_TYPE)
for disk in persistent_disk_mount_params
]
persistent_disk_mounts = [
google_v2_pipelines.build_mount(
disk=persistent_disk.get('name'),
path=os.path.join(providers_util.DATA_MOUNT_POINT,
persistent_disk_mount_param.docker_path),
read_only=True)
for persistent_disk, persistent_disk_mount_param in zip(
persistent_disks, persistent_disk_mount_params)
]
# The list of "actions" (1-based) will be:
# 1- continuous copy of log files off to Cloud Storage
"""Copy outputs from local disk to GCS."""
commands = []
for o in outputs:
if o.recursive or not o.value:
continue
# The destination path is o.uri.path, which is the target directory
# (rather than o.uri, which includes the filename or wildcard).
dest_path = o.uri.path
local_path = task_dir + '/' + _DATA_SUBDIR + '/' + o.docker_path
if o.file_provider == job_model.P_LOCAL:
commands.append('mkdir -p "%s"' % dest_path)
# Use gsutil even for local files (explained in _localize_inputs_command).
if o.file_provider in [job_model.P_LOCAL, job_model.P_GCS]:
if user_project:
command = 'gsutil -u %s -mq cp "%s" "%s"' % (user_project, local_path,
dest_path)
else:
command = 'gsutil -mq cp "%s" "%s"' % (local_path, dest_path)
commands.append(command)
return '\n'.join(commands)
def __init__(self, auto_prefix, relative_path):
self.param_class = job_model.FileParam
self._auto_prefix = auto_prefix
self._auto_index = 0
self._relative_path = relative_path
_DATA_DISK_NAME,
job_resources.disk_size,
source_image=None,
disk_type=job_resources.disk_type or job_model.DEFAULT_DISK_TYPE)
]
disks.extend(persistent_disks)
network = google_v2_pipelines.build_network(
job_resources.network, job_resources.subnetwork,
job_resources.use_private_address)
if job_resources.machine_type:
machine_type = job_resources.machine_type
elif job_resources.min_cores or job_resources.min_ram:
machine_type = GoogleV2CustomMachine.build_machine_type(
job_resources.min_cores, job_resources.min_ram)
else:
machine_type = job_model.DEFAULT_MACHINE_TYPE
accelerators = None
if job_resources.accelerator_type:
accelerators = [
google_v2_pipelines.build_accelerator(job_resources.accelerator_type,
job_resources.accelerator_count)
]
service_account = google_v2_pipelines.build_service_account(
job_resources.service_account or 'default', scopes)
resources = google_v2_pipelines.build_resources(
self._project,
job_resources.regions,
google_base.get_zones(job_resources.zones),
google_v2_pipelines.build_machine(
network=network,
machine_type=machine_type,
"""Retry task_id (numeric id) assigning it task_attempt."""
td_orig = job_descriptor.find_task_descriptor(task_id)
new_task_descriptors = [
job_model.TaskDescriptor({
'task-id': task_id,
'task-attempt': task_attempt
}, td_orig.task_params, td_orig.task_resources)
]
# Update the logging path and preemptible field.
_resolve_task_resources(job_descriptor.job_metadata,
job_descriptor.job_resources, new_task_descriptors)
provider.submit_job(
job_model.JobDescriptor(
job_descriptor.job_metadata, job_descriptor.job_params,
job_descriptor.job_resources, new_task_descriptors), False)
def _try_op_to_job_descriptor(self):
# The _META_YAML_REPR field in the 'prepare' action enables reconstructing
# the original job descriptor.
# Jobs run while the google-v2 provider was in development will not have
# the _META_YAML.
env = google_v2_operations.get_action_environment(self._op, 'prepare')
if not env:
return
meta = env.get(_META_YAML_VARNAME)
if not meta:
return
return job_model.JobDescriptor.from_yaml(ast.literal_eval(meta))