Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_equal_canonical_sha(canonical_length, match, sha1):
"""
:return: True if the given lhs and rhs 20 byte binary shas
The comparison will take the canonical_length of the match sha into account,
hence the comparison will only use the last 4 bytes for uneven canonical representations
:param match: less than 20 byte sha
:param sha1: 20 byte sha"""
binary_length = canonical_length // 2
if match[:binary_length] != sha1[:binary_length]:
return False
if canonical_length - binary_length and \
(byte_ord(match[-1]) ^ byte_ord(sha1[len(match) - 1])) & 0xf0:
return False
# END handle uneven canonnical length
return True
def sha_to_index(self, sha):
"""
:return: index usable with the ``offset`` or ``entry`` method, or None
if the sha was not found in this pack index
:param sha: 20 byte sha to lookup"""
first_byte = byte_ord(sha[0])
get_sha = self.sha
lo = 0 # lower index, the left bound of the bisection
if first_byte != 0:
lo = self._fanout_table[first_byte - 1]
hi = self._fanout_table[first_byte] # the upper, right bound of the bisection
# bisect until we have the sha
while lo < hi:
mid = (lo + hi) // 2
mid_sha = get_sha(mid)
if sha < mid_sha:
hi = mid
elif sha == mid_sha:
return mid
else:
lo = mid + 1
def partial_sha_to_index(self, partial_bin_sha, canonical_length):
"""
:return: index as in `sha_to_index` or None if the sha was not found in this
index file
:param partial_bin_sha: an at least two bytes of a partial binary sha as bytes
:param canonical_length: length of the original hexadecimal representation of the
given partial binary sha
:raise AmbiguousObjectName:"""
if len(partial_bin_sha) < 2:
raise ValueError("Require at least 2 bytes of partial sha")
assert isinstance(partial_bin_sha, bytes), "partial_bin_sha must be bytes"
first_byte = byte_ord(partial_bin_sha[0])
get_sha = self.sha
lo = 0 # lower index, the left bound of the bisection
if first_byte != 0:
lo = self._fanout_table[first_byte - 1]
hi = self._fanout_table[first_byte] # the upper, right bound of the bisection
# fill the partial to full 20 bytes
filled_sha = partial_bin_sha + NULL_BYTE * (20 - len(partial_bin_sha))
# find lowest
while lo < hi:
mid = (lo + hi) // 2
mid_sha = get_sha(mid)
if filled_sha < mid_sha:
hi = mid
def write(self, pack_sha, write):
"""Write the index file using the given write method
:param pack_sha: binary sha over the whole pack that we index
:return: sha1 binary sha over all index file contents"""
# sort for sha1 hash
self._objs.sort(key=lambda o: o[0])
sha_writer = FlexibleSha1Writer(write)
sha_write = sha_writer.write
sha_write(PackIndexFile.index_v2_signature)
sha_write(pack(">L", PackIndexFile.index_version_default))
# fanout
tmplist = list((0,) * 256) # fanout or list with 64 bit offsets
for t in self._objs:
tmplist[byte_ord(t[0][0])] += 1
# END prepare fanout
for i in xrange(255):
v = tmplist[i]
sha_write(pack('>L', v))
tmplist[i + 1] += v
# END write each fanout entry
sha_write(pack('>L', tmplist[255]))
# sha1 ordered
# save calls, that is push them into c
sha_write(b''.join(t[0] for t in self._objs))
# crc32
for t in self._objs:
sha_write(pack('>L', t[1] & 0xffffffff))
# END for each crc
def write(self, pack_sha, write):
"""Write the index file using the given write method
:param pack_sha: binary sha over the whole pack that we index
:return: sha1 binary sha over all index file contents"""
# sort for sha1 hash
self._objs.sort(key=lambda o: o[0])
sha_writer = FlexibleSha1Writer(write)
sha_write = sha_writer.write
sha_write(PackIndexFile.index_v2_signature)
sha_write(pack(">L", PackIndexFile.index_version_default))
# fanout
tmplist = list((0,) * 256) # fanout or list with 64 bit offsets
for t in self._objs:
tmplist[byte_ord(t[0][0])] += 1
# END prepare fanout
for i in xrange(255):
v = tmplist[i]
sha_write(pack('>L', v))
tmplist[i + 1] += v
# END write each fanout entry
sha_write(pack('>L', tmplist[255]))
# sha1 ordered
# save calls, that is push them into c
sha_write(b''.join(t[0] for t in self._objs))
# crc32
for t in self._objs:
sha_write(pack('>L', t[1] & 0xffffffff))
# END for each crc
an object of the correct type according to the type_id of the object.
If as_stream is True, the object will contain a stream, allowing the
data to be read decompressed.
:param data: random accessible data containing all required information
:parma offset: offset in to the data at which the object information is located
:param as_stream: if True, a stream object will be returned that can read
the data, otherwise you receive an info object only"""
data = cursor.use_region(offset).buffer()
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
total_rela_offset = None # set later, actual offset until data stream begins
delta_info = None
# OFFSET DELTA
if type_id == OFS_DELTA:
i = data_rela_offset
c = byte_ord(data[i])
i += 1
delta_offset = c & 0x7f
while c & 0x80:
c = byte_ord(data[i])
i += 1
delta_offset += 1
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
total_rela_offset = i
# REF DELTA
elif type_id == REF_DELTA:
total_rela_offset = data_rela_offset + 20
delta_info = data[data_rela_offset:total_rela_offset]
# BASE OBJECT
else:
:parma offset: offset in to the data at which the object information is located
:param as_stream: if True, a stream object will be returned that can read
the data, otherwise you receive an info object only"""
data = cursor.use_region(offset).buffer()
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
total_rela_offset = None # set later, actual offset until data stream begins
delta_info = None
# OFFSET DELTA
if type_id == OFS_DELTA:
i = data_rela_offset
c = byte_ord(data[i])
i += 1
delta_offset = c & 0x7f
while c & 0x80:
c = byte_ord(data[i])
i += 1
delta_offset += 1
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
total_rela_offset = i
# REF DELTA
elif type_id == REF_DELTA:
total_rela_offset = data_rela_offset + 20
delta_info = data[data_rela_offset:total_rela_offset]
# BASE OBJECT
else:
# assume its a base object
total_rela_offset = data_rela_offset
# END handle type id
abs_data_offset = offset + total_rela_offset
an object of the correct type according to the type_id of the object.
If as_stream is True, the object will contain a stream, allowing the
data to be read decompressed.
:param data: random accessible data containing all required information
:parma offset: offset in to the data at which the object information is located
:param as_stream: if True, a stream object will be returned that can read
the data, otherwise you receive an info object only"""
data = cursor.use_region(offset).buffer()
type_id, uncomp_size, data_rela_offset = pack_object_header_info(data)
total_rela_offset = None # set later, actual offset until data stream begins
delta_info = None
# OFFSET DELTA
if type_id == OFS_DELTA:
i = data_rela_offset
c = byte_ord(data[i])
i += 1
delta_offset = c & 0x7f
while c & 0x80:
c = byte_ord(data[i])
i += 1
delta_offset += 1
delta_offset = (delta_offset << 7) + (c & 0x7f)
# END character loop
delta_info = delta_offset
total_rela_offset = i
# REF DELTA
elif type_id == REF_DELTA:
total_rela_offset = data_rela_offset + 20
delta_info = data[data_rela_offset:total_rela_offset]
# BASE OBJECT
else: