Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_flush_block_repeated(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
self.assertEqual(cobj.compress(b"foo"), b"")
self.assertEqual(
cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK),
b"\x28\xb5\x2f\xfd\x00\x48\x18\x00\x00foo",
)
self.assertEqual(cobj.compress(b"bar"), b"")
# 3 byte header plus content.
self.assertEqual(cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK), b"\x18\x00\x00bar")
self.assertEqual(cobj.flush(), b"\x01\x00\x00")
while True:
input_size = chunk_sizes.draw(strategies.integers(1, 4096))
source = original[i : i + input_size]
if not source:
break
i += input_size
chunk = cobj.compress(source)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
if not flushes.draw(strategies.booleans()):
continue
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
self.assertEqual(b"".join(decompressed_chunks), original[0:i])
chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
compressed_chunks.append(chunk)
decompressed_chunks.append(dobj.decompress(chunk))
self.assertEqual(
dctx.decompress(b"".join(compressed_chunks), max_output_size=len(original)),
original,
)
self.assertEqual(b"".join(decompressed_chunks), original)
compressed_size = sum(map(len, compressed_stream_zlib))
ratio = float(compressed_size) / float(orig_size) * 100.0
print(
"stream zlib compressed size (l=%d): %d (%.2f%%)"
% (args.zlib_level, compressed_size, ratio)
)
if args.stream:
zctx = zstd.ZstdCompressor(compression_params=zparams)
compressed_stream = []
ratios = []
compressor = zctx.compressobj()
for chunk in chunks:
output = compressor.compress(chunk)
output += compressor.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
compressed_stream.append(output)
compressed_size = sum(map(len, compressed_stream))
ratio = float(compressed_size) / float(orig_size) * 100.0
print(
"stream compressed size (l=%d): %d (%.2f%%)"
% (zparams.compression_level, compressed_size, ratio)
)
if args.content_dict:
compressed_content_dict = []
ratios = []
# First chunk is compressed like normal.
c = zstd.ZstdCompressor(compression_params=zparams).compress(chunks[0])
compressed_content_dict.append(c)
ratios.append(float(len(c)) / float(len(chunks[0])))
def compress_stream_compressobj(chunks, zparams):
zctx = zstd.ZstdCompressor(compression_params=zparams)
compressor = zctx.compressobj()
flush = zstd.COMPRESSOBJ_FLUSH_BLOCK
for chunk in chunks:
compressor.compress(chunk)
compressor.flush(flush)