Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
source = io.BytesIO()
source.write(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
source.write(b"o")
source.seek(0)
# Creating an iterator should not perform any compression until
# first read.
it = cctx.read_to_iter(source, size=len(source.getvalue()))
self.assertEqual(source.tell(), 0)
# We should have exactly 2 output chunks.
chunks = []
chunk = next(it)
self.assertIsNotNone(chunk)
self.assertEqual(source.tell(), zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
chunks.append(chunk)
chunk = next(it)
self.assertIsNotNone(chunk)
chunks.append(chunk)
self.assertEqual(source.tell(), len(source.getvalue()))
with self.assertRaises(StopIteration):
next(it)
# And again for good measure.
with self.assertRaises(StopIteration):
next(it)
# We should get the same output as the one-shot compression mechanism.
self.assertEqual(b"".join(chunks), cctx.compress(source.getvalue()))
def test_read_large(self):
cctx = zstd.ZstdCompressor(level=1, write_content_size=False)
source = io.BytesIO()
source.write(b"f" * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
source.write(b"o")
source.seek(0)
# Creating an iterator should not perform any compression until
# first read.
it = cctx.read_to_iter(source, size=len(source.getvalue()))
self.assertEqual(source.tell(), 0)
# We should have exactly 2 output chunks.
chunks = []
chunk = next(it)
self.assertIsNotNone(chunk)
self.assertEqual(source.tell(), zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
chunks.append(chunk)
chunk = next(it)
self.assertIsNotNone(chunk)
def add(self, src, name=None, stop_event=None):
name = name or os.path.basename(src)
self.log(logging.DEBUG, "Add %s into %s", src, self.archive_path(name))
cctx = zstd.ZstdCompressor(compression_params=self.zstd_params)
try:
with open(src, "rb") as ifh, open(self.archive_path(name), "wb") as ofh:
with cctx.stream_writer(ofh) as writer:
while True:
if stop_event and stop_event.is_set():
raise CancelledError()
data = ifh.read(zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE)
if not data:
break
if stop_event and stop_event.is_set():
raise CancelledError()
writer.write(data)
except:
if os.path.exists(self.archive_path(name)):
os.remove(self.archive_path(name))
raise
return self.archive_path(name)