Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
total_rela_offset = i
# REF DELTA
elif type_id == REF_DELTA:
total_rela_offset = data_rela_offset + 20
delta_info = data[data_rela_offset:total_rela_offset]
# BASE OBJECT
else:
# assume its a base object
total_rela_offset = data_rela_offset
# END handle type id
abs_data_offset = offset + total_rela_offset
if as_stream:
stream = DecompressMemMapReader(buffer(data, total_rela_offset), False, uncomp_size)
if delta_info is None:
return abs_data_offset, OPackStream(offset, type_id, uncomp_size, stream)
else:
return abs_data_offset, ODeltaPackStream(offset, type_id, uncomp_size, delta_info, stream)
else:
if delta_info is None:
return abs_data_offset, OPackInfo(offset, type_id, uncomp_size)
else:
return abs_data_offset, ODeltaPackInfo(offset, type_id, uncomp_size, delta_info)
# END handle info
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
total_rela_offset = i
# REF DELTA
elif type_id == REF_DELTA:
total_rela_offset = data_rela_offset + 20
delta_info = data[data_rela_offset:total_rela_offset]
# BASE OBJECT
else:
# assume its a base object
total_rela_offset = data_rela_offset
# END handle type id
abs_data_offset = offset + total_rela_offset
if as_stream:
stream = DecompressMemMapReader(buffer(data, total_rela_offset), False, uncomp_size)
if delta_info is None:
return abs_data_offset, OPackStream(offset, type_id, uncomp_size, stream)
else:
return abs_data_offset, ODeltaPackStream(offset, type_id, uncomp_size, delta_info, stream)
else:
if delta_info is None:
return abs_data_offset, OPackInfo(offset, type_id, uncomp_size)
else:
return abs_data_offset, ODeltaPackInfo(offset, type_id, uncomp_size, delta_info)
# END handle info
def new(self, m, close_on_deletion=False):
"""Create a new DecompressMemMapReader instance for acting as a read-only stream
This method parses the object header from m and returns the parsed
type and size, as well as the created stream instance.
:param m: memory map on which to operate. It must be object data ( header + contents )
:param close_on_deletion: if True, the memory map will be closed once we are
being deleted"""
inst = DecompressMemMapReader(m, close_on_deletion, 0)
typ, size = inst._parse_header_info()
return typ, size, inst