Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if reset is True:
compressor.reset()
compressed = do_compress()
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
with lz4frame.LZ4FrameDecompressor() as decompressor:
decompressed = b''
for chunk in get_chunked(compressed, chunks):
b = decompressor.decompress(chunk)
decompressed += b
assert data == decompressed
def test_roundtrip_multiframe_4(data):
nframes = 4
compressed = b''
with lz4frame.LZ4FrameCompressor() as compressor:
for _ in range(nframes):
compressed += compressor.begin()
compressed += compressor.compress(data)
compressed += compressor.flush()
decompressed = b''
with lz4frame.LZ4FrameDecompressor() as decompressor:
for i in range(nframes):
if i == 0:
d = compressed
else:
d = decompressor.unused_data
decompressed += decompressor.decompress(d)
assert decompressor.eof == True
assert decompressor.needs_input == True
if i == nframes - 1:
assert decompressor.unused_data == None
else:
assert len(decompressor.unused_data) == len(
compressed) * (nframes - i - 1) / nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def get_approx_index_chunks(self):
"""Gets decompressed chunks of the AnnoyIndex of the vectors from
the database."""
try:
db = self._db(force_new=True)
with lz4.frame.LZ4FrameDecompressor() as decompressor:
chunks = db.execute(
"""
SELECT rowid,index_file
FROM `magnitude_approx`
WHERE trees = ?
""", (self.approx_trees,))
for chunk in chunks:
yield decompressor.decompress(chunk[1])
if self.closed:
return
except Exception as e:
if self.closed:
pass
else:
raise e
def get_meta_chunks(self, meta_index):
"""Gets decompressed chunks of a meta file embedded in
the database."""
try:
db = self._db(force_new=True)
with lz4.frame.LZ4FrameDecompressor() as decompressor:
chunks = db.execute(
"""
SELECT rowid,meta_file
FROM `magnitude_meta_""" + str(meta_index) + """`
""")
for chunk in chunks:
yield decompressor.decompress(chunk[1])
if self.closed:
return
except Exception as e:
if self.closed:
pass
else:
raise e
@staticmethod
def decompress(data): # pragma: no cover
return data
try:
import lz4.frame
try:
lz4.frame._compression.BUFFER_SIZE = BUFFER_SIZE
except AttributeError: # pragma: no cover
pass
lz4_open = functools.partial(lz4.frame.open, block_size=lz4.frame.BLOCKSIZE_MAX1MB)
lz4_compress = functools.partial(lz4.frame.compress, block_size=lz4.frame.BLOCKSIZE_MAX1MB)
lz4_compressobj = lz4.frame.LZ4FrameCompressor
lz4_decompress = lz4.frame.decompress
lz4_decompressobj = lz4.frame.LZ4FrameDecompressor
except ImportError: # pragma: no cover
lz4_open = None
lz4_compress, lz4_compressobj = None, None
lz4_decompress, lz4_decompressobj = None, None
gz_open = gzip.open
gz_compressobj = functools.partial(
lambda level=-1: zlib.compressobj(level, zlib.DEFLATED, 16 + zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
)
gz_decompressobj = functools.partial(lambda: zlib.decompressobj(16 + zlib.MAX_WBITS))
gz_compress = gzip.compress
gz_decompress = gzip.decompress
SERIAL_VERSION = 0