Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def pack_object_at(cursor, offset, as_stream):
"""
:return: Tuple(abs_data_offset, PackInfo|PackStream)
an object of the correct type according to the type_id of the object.
If as_stream is True, the object will contain a stream, allowing the
data to be read decompressed.
:param data: random accessible data containing all required information
:parma offset: offset in to the data at which the object information is located
:param as_stream: if True, a stream object will be returned that can read
the data, otherwise you receive an info object only"""
data = cursor.use_region(offset).buffer()
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
total_rela_offset = None # set later, actual offset until data stream begins
delta_info = None
# OFFSET DELTA
if type_id == OFS_DELTA:
i = data_rela_offset
c = byte_ord(data[i])
i += 1
delta_offset = c & 0x7f
while c & 0x80:
c = byte_ord(data[i])
i += 1
delta_offset += 1
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
def _object(self, sha, as_stream, index=-1):
""":return: OInfo or OStream object providing information about the given sha
:param index: if not -1, its assumed to be the sha's index in the IndexFile"""
# its a little bit redundant here, but it needs to be efficient
if index < 0:
index = self._sha_to_index(sha)
if sha is None:
sha = self._index.sha(index)
# END assure sha is present ( in output )
offset = self._index.offset(index)
type_id, uncomp_size, data_rela_offset = pack_object_header_info(self._pack._cursor.use_region(offset).buffer())
if as_stream:
if type_id not in delta_types:
packstream = self._pack.stream(offset)
return OStream(sha, packstream.type, packstream.size, packstream.stream)
# END handle non-deltas
# produce a delta stream containing all info
# To prevent it from applying the deltas when querying the size,
# we extract it from the delta stream ourselves
streams = self.collect_streams_at_offset(offset)
dstream = DeltaApplyReader.new(streams)
return ODeltaStream(sha, dstream.type, None, dstream)
else:
if type_id not in delta_types:
return OInfo(sha, type_id_to_type_map[type_id], uncomp_size)
def pack_object_at(cursor, offset, as_stream):
"""
:return: Tuple(abs_data_offset, PackInfo|PackStream)
an object of the correct type according to the type_id of the object.
If as_stream is True, the object will contain a stream, allowing the
data to be read decompressed.
:param data: random accessible data containing all required information
:parma offset: offset in to the data at which the object information is located
:param as_stream: if True, a stream object will be returned that can read
the data, otherwise you receive an info object only"""
data = cursor.use_region(offset).buffer()
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
total_rela_offset = None # set later, actual offset until data stream begins
delta_info = None
# OFFSET DELTA
if type_id == OFS_DELTA:
i = data_rela_offset
c = byte_ord(data[i])
i += 1
delta_offset = c & 0x7f
while c & 0x80:
c = byte_ord(data[i])
i += 1
delta_offset += 1
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset