Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tarinfo.mtime = time.mktime(datetime.datetime.now().timetuple())
file = io.StringIO()
file.write(TEXT)
file.seek(0)
tarinfo.size = len(TEXT)
tar.addfile(tarinfo, file)
tar.close()
"""
result = list(extract.extract(test_file, recurse=False))
check_no_error(result)
expected = (
'somefilename-0.txt',
'somefilename-1.txt',
)
test_dir = extractcode.get_extraction_path(test_file)
check_files(test_dir, expected)
expected = ['a/b/a.txt', 'a/b/b.txt', 'a/c/c.txt']
cleaned_test_file = test_file.replace(base, '')
expected_event = [
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=False, warnings=[], errors=[]
),
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=True, warnings=[], errors=[]
)
]
target = extractcode.get_extraction_path(test_file)
result = list(extract.extract_file(test_file, target))
result = [r._replace(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file))
for r in result]
assert expected_event == result
check_files(target, expected)
def test_extract_file_function(self):
test_file = self.get_test_loc('extract/basic_non_nested.tar.gz', copy=True)
base = fileutils.parent_directory(test_file)
expected = ['a/b/a.txt', 'a/b/b.txt', 'a/c/c.txt']
cleaned_test_file = test_file.replace(base, '')
expected_event = [
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=False, warnings=[], errors=[]
),
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=True, warnings=[], errors=[]
)
]
target = extractcode.get_extraction_path(test_file)
result = list(extract.extract_file(test_file, target))
result = [r._replace(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file))
for r in result]
assert expected_event == result
check_files(target, expected)
def test_extract_archive_non_nested(self):
test_dir = self.get_test_loc('extract/basic_non_nested.tar.gz', copy=True)
expected = (
'a/b/a.txt',
'a/b/b.txt',
'a/c/c.txt',
)
result = extract.extract(test_dir, recurse=False)
check_no_error(result)
check_files(extractcode.get_extraction_path(test_dir), expected)
result = extract.extract(test_dir, recurse=True)
check_no_error(result)
check_files(extractcode.get_extraction_path(test_dir), expected)
def test_extract_file_function(self):
test_file = self.get_test_loc('extract/basic_non_nested.tar.gz', copy=True)
base = fileutils.parent_directory(test_file)
expected = ['a/b/a.txt', 'a/b/b.txt', 'a/c/c.txt']
cleaned_test_file = test_file.replace(base, '')
expected_event = [
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=False, warnings=[], errors=[]
),
extract.ExtractEvent(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file),
done=True, warnings=[], errors=[]
)
]
target = extractcode.get_extraction_path(test_file)
result = list(extract.extract_file(test_file, target))
result = [r._replace(
source=cleaned_test_file,
target=extractcode.get_extraction_path(cleaned_test_file))
for r in result]
assert expected_event == result
dirs.remove(d)
if TRACE:
logger.debug('extract:walk: not recurse: removed dirs:' + repr(drs.symmetric_difference(set(dirs))))
for f in files:
loc = join(top, f)
if not recurse and extractcode.is_extraction_path(loc):
if TRACE:
logger.debug('extract:walk not recurse: skipped file: %(loc)r' % locals())
continue
if not archive.should_extract(loc, kinds):
if TRACE:
logger.debug('extract:walk: skipped file: not should_extract: %(loc)r' % locals())
continue
target = join(abspath(top), extractcode.get_extraction_path(loc))
if TRACE:
logger.debug('extract:target: %(target)r' % locals())
for xevent in extract_file(loc, target, kinds):
if TRACE:
logger.debug('extract:walk:extraction event: %(xevent)r' % locals())
yield xevent
if recurse:
if TRACE:
logger.debug('extract:walk: recursing on target: %(target)r' % locals())
for xevent in extract(target, kinds, recurse):
if TRACE:
logger.debug('extract:walk:recurse:extraction event: %(xevent)r' % locals())
yield xevent