Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
input2_file.write(input_file_bytes)
with open(manifest_filename, 'w') as manifest_file:
manifest_file.writelines([
'%s\n' % input1_filename, '%s\n' % input2_filename])
job = self._harness_job(
MRNickNackWithHadoopInputFormat, runner_alias='local',
spark_conf=self.SPARK_CONF, input_paths=([manifest_filename]))
with job.make_runner() as runner:
runner.run()
output_counts = dict(
line.strip().split(b'\t')
for line in to_lines(runner.cat_output()))
expected_output_counts = {b'"tomato"': b'2', b'"potato"': b'2'}
self.assertEqual(expected_output_counts, output_counts)
job = self._harness_job(
MRNickNack, input_bytes=input_bytes, runner_alias='local',
spark_conf=self.SPARK_CONF, compression_codec=compression_codec)
with job.make_runner() as runner:
runner.run()
self.assertTrue(runner.fs.exists(
join(runner.get_output_dir(), 'o', 'part*.gz')))
self.assertTrue(runner.fs.exists(
join(runner.get_output_dir(), 't', 'part*.gz')))
output_counts = dict(
line.strip().split(b'\t')
for line in to_lines(runner.cat_output()))
expected_output_counts = {b'"one"': b'2', b'"two"': b'2'}
self.assertEqual(expected_output_counts, output_counts)
def test_empty(self):
job = MRSparkWordcount([])
job.sandbox()
with job.make_runner() as runner:
runner.run()
self.assertEqual(
sorted(to_lines(runner.cat_output())),
[])
def test_output_dir_not_considered_hidden(self):
output_dir = os.path.join(self.tmp_dir, '_hidden', '_output_dir')
self.makefile(os.path.join(output_dir, 'part-00000'),
b'cats\n')
runner = InlineMRJobRunner(conf_paths=[], output_dir=output_dir)
self.assertEqual(sorted(to_lines(runner.stream_output())),
[b'cats\n'])
# Make sure that input.gz occurs in a single split that starts at
# its beginning and ends at its end
for split_info in file_splits.values():
if split_info['orig_name'] == input_gz_path:
self.assertEqual(split_info['start'], 0)
self.assertEqual(split_info['length'],
os.stat(input_gz_path)[stat.ST_SIZE])
# make sure we get 3 files
self.assertEqual(len(file_splits), 3)
# make sure all the data is preserved
content = []
for file_name in file_splits:
with open(file_name, 'rb') as f:
lines = list(to_lines(decompress(f, file_name)))
# make sure the input_gz split got its entire contents
if file_name == input_gz_path:
self.assertEqual(lines, contents_gz)
content.extend(lines)
self.assertEqual(sorted(content),
all_contents_sorted)
# expect python -v crud in stderr
with open(runner._task_stderr_path('mapper', 0, 0)) as lines:
self.assertTrue(any(
'import mrjob' in line or # Python 2
"import 'mrjob'" in line
for line in lines))
with open(runner._task_stderr_path('mapper', 0, 0)) as lines:
self.assertTrue(any(
'#' in line for line in lines))
# should still get expected results
self.assertEqual(
sorted(to_lines(runner.cat_output())),
sorted([b'1\tnull\n', b'1\t"bar"\n']))
self.makefile(os.path.join(self.output_dir, 'part-00001'),
b'line1\n')
# hidden .crc file
self.makefile(os.path.join(self.output_dir, '.crc.part-00000'),
b'42\n')
# hidden _SUCCESS file (ignore)
self.makefile(os.path.join(self.output_dir, '_SUCCESS'),
b'such a relief!\n')
# hidden _logs dir
self.makefile(os.path.join(self.output_dir, '_logs', 'log.xml'),
b'pretty much the usual\n')
self.assertEqual(sorted(to_lines(self.runner.cat_output())),
[b'line0\n', b'line1\n'])
def test_no_trailing_newline(self):
self.assertEqual(
list(to_lines(iter([
b'Alouette,\ngentille',
b' Alouette.',
]))),
[b'Alouette,\n', b'gentille Alouette.'])
def test_mixed_job(self):
# test a combination of streaming and spark steps
job = MRStreamingAndSpark(['-r', 'spark'])
job.sandbox(stdin=BytesIO(
b'foo\nbar\n'))
with job.make_runner() as runner:
runner.run()
# converts to 'null\t"foo"', 'null\t"bar"' and then counts chars
self.assertEqual(
sorted(to_lines(runner.cat_output())),
[
b'\t 2\n',
b'" 4\n',
b'a 1\n',
b'b 1\n',
b'f 1\n',
b'l 4\n',
b'n 2\n',
b'o 2\n',
b'r 1\n',
b'u 2\n',
]
Yield one line at time.
- Resolve globs (``foo_*.gz``).
- Decompress ``.gz`` and ``.bz2`` files.
- If path is ``-``, read from STDIN.
- Recursively read all files in a directory
"""
paths = self.options.args or ['-']
for path in paths:
if path == '-':
for line in self.stdin:
yield line
else:
with open(path, 'rb') as f:
for line in to_lines(decompress(f, path)):
yield line