Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cluster.status.statechangereason.message = (
'Shut down as step failed')
return
# complete step
step.status.state = 'COMPLETED'
step.status.timeline.enddatetime = to_iso8601(now)
# create fake output if we're supposed to write to S3
output_uri = self._get_step_output_uri(step.config.args)
if output_uri and is_s3_uri(output_uri):
mock_output = self.mock_emr_output.get(
(cluster_id, step_num)) or [b'']
bucket_name, key_name = parse_s3_uri(output_uri)
# write output to S3
for i, part in enumerate(mock_output):
add_mock_s3_data(self.mock_s3_fs, {
bucket_name: {key_name + 'part-%05d' % i: part}})
elif (cluster_id, step_num) in self.mock_emr_output:
raise AssertionError(
"can't use output for cluster ID %s, step %d "
"(it doesn't output to S3)" %
(cluster_id, step_num))
# done!
# if this is the last step, continue to autotermination code, below
if step_num < len(cluster._steps) - 1:
return
if step['Status']['State'] in ('PENDING', 'RUNNING'):
step['Status']['State'] = 'CANCELLED'
return
# complete step
step['Status']['State'] = 'COMPLETED'
step['Status']['Timeline']['EndDateTime'] = now
# create fake output if we're supposed to write to S3
output_uri = self._get_step_output_uri(step['Config']['Args'])
if output_uri and is_s3_uri(output_uri):
mock_output = self.mock_emr_output.get(
(cluster_id, step_num)) or [b'']
bucket_name, key_name = parse_s3_uri(output_uri)
# write output to S3
for i, part in enumerate(mock_output):
add_mock_s3_data(self.mock_s3_fs, {
bucket_name: {key_name + 'part-%05d' % i: part}})
elif (cluster_id, step_num) in self.mock_emr_output:
raise ValueError(
"can't use output for cluster ID %s, step %d "
"(it doesn't output to S3)" %
(cluster_id, step_num))
# done!
# if this is the last step, continue to autotermination code, below
if step_num < len(cluster['_Steps']) - 1:
return
self.add_mock_s3_data({'walrus': {'logs/j-MOCKCLUSTER0/1': b'1\n'}})
stdin = BytesIO(b'foo\nbar\n')
mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'--s3-log-uri', 's3://walrus/logs',
'-', '--cleanup', mode])
mr_job.sandbox(stdin=stdin)
with mr_job.make_runner() as runner:
s3_tmp_dir = runner._opts['s3_tmp_dir']
tmp_bucket, _ = parse_s3_uri(s3_tmp_dir)
runner.run()
# this is set and unset before we can get at it unless we do this
log_bucket, _ = parse_s3_uri(runner._s3_log_dir())
list(runner.stream_output())
conn = runner.fs.make_s3_conn()
bucket = conn.get_bucket(tmp_bucket)
self.assertEqual(len(list(bucket.list())), tmp_len)
bucket = conn.get_bucket(log_bucket)
self.assertEqual(len(list(bucket.list())), log_len)
stdin = StringIO('foo\nbar\n')
mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'-c', self.mrjob_conf_path,
'--s3-log-uri', 's3://walrus/logs',
'-', '--cleanup', mode])
mr_job.sandbox(stdin=stdin)
with mr_job.make_runner() as runner:
s3_scratch_uri = runner._opts['s3_scratch_uri']
scratch_bucket, _ = parse_s3_uri(s3_scratch_uri)
runner.run()
# this is set and unset before we can get at it unless we do this
log_bucket, _ = parse_s3_uri(runner._s3_job_log_uri)
list(runner.stream_output())
conn = runner.make_s3_conn()
bucket = conn.get_bucket(scratch_bucket)
self.assertEqual(len(list(bucket.list())), scratch_len)
bucket = conn.get_bucket(log_bucket)
self.assertEqual(len(list(bucket.list())), log_len)
def _test_remote_scratch_cleanup(self, mode, scratch_len, log_len):
self.add_mock_s3_data({'walrus': {'logs/j-MOCKJOBFLOW0/1': '1\n'}})
# read from STDIN, a local file, and a remote file
stdin = StringIO('foo\nbar\n')
mr_job = MRTwoStepJob(['-r', 'emr', '-v',
'-c', self.mrjob_conf_path,
'--s3-log-uri', 's3://walrus/logs',
'-', '--cleanup', mode])
mr_job.sandbox(stdin=stdin)
with mr_job.make_runner() as runner:
s3_scratch_uri = runner._opts['s3_scratch_uri']
scratch_bucket, _ = parse_s3_uri(s3_scratch_uri)
runner.run()
# this is set and unset before we can get at it unless we do this
log_bucket, _ = parse_s3_uri(runner._s3_job_log_uri)
list(runner.stream_output())
conn = runner.make_s3_conn()
bucket = conn.get_bucket(scratch_bucket)
self.assertEqual(len(list(bucket.list())), scratch_len)
bucket = conn.get_bucket(log_bucket)
self.assertEqual(len(list(bucket.list())), log_len)
def _get_s3_key(self, uri):
"""Get the boto3 s3.Object matching the given S3 uri, or
return None if that key doesn't exist.
uri is an S3 URI: ``s3://foo/bar``
"""
bucket_name, key_name = parse_s3_uri(uri)
return self.get_bucket(bucket_name).Object(key_name)