Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def multithreaded_chunk_size(level, source_size=0):
params = zstd.ZstdCompressionParameters.from_level(level, source_size=source_size)
return 1 << (params.window_log + 2)
def test_no_magic(self):
params = zstd.ZstdCompressionParameters.from_level(1, format=zstd.FORMAT_ZSTD1)
cctx = zstd.ZstdCompressor(compression_params=params)
magic = cctx.compress(b"foobar")
params = zstd.ZstdCompressionParameters.from_level(
1, format=zstd.FORMAT_ZSTD1_MAGICLESS
)
cctx = zstd.ZstdCompressor(compression_params=params)
no_magic = cctx.compress(b"foobar")
self.assertEqual(magic[0:4], b"\x28\xb5\x2f\xfd")
self.assertEqual(magic[4:], no_magic)
def test_multithreaded_compression_params(self):
params = zstd.ZstdCompressionParameters.from_level(0, threads=2)
cctx = zstd.ZstdCompressor(compression_params=params)
result = cctx.compress(b"foo")
params = zstd.get_frame_parameters(result)
self.assertEqual(params.content_size, 3)
self.assertEqual(result, b"\x28\xb5\x2f\xfd\x20\x03\x19\x00\x00\x66\x6f\x6f")
def __init__(self, name, path, name_prefix, compression_lvl=0, threads=0):
super().__init__(name)
#: Directory path to store the archives in.
self.path = path
#: Each file from this package will be stored as one separated archive.
#: Their name will be prefixed by prefix_name
self.name_prefix = name_prefix
#: zstd_params is used by the compressor.
self.zstd_params = zstd.ZstdCompressionParameters.from_level(
compression_lvl, threads=threads
)
params = {
"write_content_size": True,
}
if args.no_write_size:
params["write_content_size"] = False
if args.write_checksum:
params["write_checksum"] = True
if args.compress_threads:
params["threads"] = args.compress_threads
if args.enable_ldm:
params["enable_ldm"] = True
if args.ldm_hash_log:
params["ldm_hash_log"] = args.ldm_hash_log
zparams = zstd.ZstdCompressionParameters.from_level(args.level, **params)
if args.compress_threads:
threads_zparams = zstd.ZstdCompressionParameters.from_level(
args.level, **params
)
chunks = get_chunks(
args.path,
args.limit_count,
args.chunk_encoding,
chunk_size=args.split_input_size,
)
orig_size = sum(map(len, chunks))
print("%d chunks; %d bytes" % (len(chunks), orig_size))
if args.discrete_dict:
if args.dict_sample_limit:
"write_content_size": True,
}
if args.no_write_size:
params["write_content_size"] = False
if args.write_checksum:
params["write_checksum"] = True
if args.compress_threads:
params["threads"] = args.compress_threads
if args.enable_ldm:
params["enable_ldm"] = True
if args.ldm_hash_log:
params["ldm_hash_log"] = args.ldm_hash_log
zparams = zstd.ZstdCompressionParameters.from_level(args.level, **params)
if args.compress_threads:
threads_zparams = zstd.ZstdCompressionParameters.from_level(
args.level, **params
)
chunks = get_chunks(
args.path,
args.limit_count,
args.chunk_encoding,
chunk_size=args.split_input_size,
)
orig_size = sum(map(len, chunks))
print("%d chunks; %d bytes" % (len(chunks), orig_size))
if args.discrete_dict:
if args.dict_sample_limit:
training_chunks = random.sample(chunks, args.dict_sample_limit)
else: