Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _offset_v2(self, i):
""":return: 32 or 64 byte offset into pack files. 64 byte offsets will only
be returned if the pack is larger than 4 GiB, or 2^32"""
offset = unpack_from(">L", self._cursor.map(), self._pack_offset + i * 4)[0]
# if the high-bit is set, this indicates that we have to lookup the offset
# in the 64 bit region of the file. The current offset ( lower 31 bits )
# are the index into it
if offset & 0x80000000:
offset = unpack_from(">Q", self._cursor.map(), self._pack_64_offset + (offset & ~0x80000000) * 8)[0]
# END handle 64 bit offset
return offset
def _set_cache_(self, attr):
# we fill the whole cache, whichever attribute gets queried first
self._cursor = mman.make_cursor(self._packpath).use_region()
# read the header information
type_id, self._version, self._size = unpack_from(">LLL", self._cursor.map(), 0)
# TODO: figure out whether we should better keep the lock, or maybe
# add a .keep file instead ?
if type_id != self.pack_signature:
raise ParseError("Invalid pack signature: %i" % type_id)
def _offset_v2(self, i):
""":return: 32 or 64 byte offset into pack files. 64 byte offsets will only
be returned if the pack is larger than 4 GiB, or 2^32"""
offset = unpack_from(">L", self._cursor.map(), self._pack_offset + i * 4)[0]
# if the high-bit is set, this indicates that we have to lookup the offset
# in the 64 bit region of the file. The current offset ( lower 31 bits )
# are the index into it
if offset & 0x80000000:
offset = unpack_from(">Q", self._cursor.map(), self._pack_64_offset + (offset & ~0x80000000) * 8)[0]
# END handle 64 bit offset
return offset
# alternate for instance
self._cursor = mman.make_cursor(self._indexpath).use_region()
# We will assume that the index will always fully fit into memory !
if mman.window_size() > 0 and self._cursor.file_size() > mman.window_size():
raise AssertionError("The index file at %s is too large to fit into a mapped window (%i > %i). This is a limitation of the implementation" % (
self._indexpath, self._cursor.file_size(), mman.window_size()))
# END assert window size
else:
# now its time to initialize everything - if we are here, someone wants
# to access the fanout table or related properties
# CHECK VERSION
mmap = self._cursor.map()
self._version = (mmap[:4] == self.index_v2_signature and 2) or 1
if self._version == 2:
version_id = unpack_from(">L", mmap, 4)[0]
assert version_id == self._version, "Unsupported index version: %i" % version_id
# END assert version
# SETUP FUNCTIONS
# setup our functions according to the actual version
for fname in ('entry', 'offset', 'sha', 'crc'):
setattr(self, fname, getattr(self, "_%s_v%i" % (fname, self._version)))
# END for each function to initialize
# INITIALIZE DATA
# byte offset is 8 if version is 2, 0 otherwise
self._initialize()
# END handle attributes
# alternate for instance
self._cursor = mman.make_cursor(self._indexpath).use_region()
# We will assume that the index will always fully fit into memory !
if mman.window_size() > 0 and self._cursor.file_size() > mman.window_size():
raise AssertionError("The index file at %s is too large to fit into a mapped window (%i > %i). This is a limitation of the implementation" % (
self._indexpath, self._cursor.file_size(), mman.window_size()))
# END assert window size
else:
# now its time to initialize everything - if we are here, someone wants
# to access the fanout table or related properties
# CHECK VERSION
mmap = self._cursor.map()
self._version = (mmap[:4] == self.index_v2_signature and 2) or 1
if self._version == 2:
version_id = unpack_from(">L", mmap, 4)[0]
assert version_id == self._version, "Unsupported index version: %i" % version_id
# END assert version
# SETUP FUNCTIONS
# setup our functions according to the actual version
for fname in ('entry', 'offset', 'sha', 'crc'):
setattr(self, fname, getattr(self, "_%s_v%i" % (fname, self._version)))
# END for each function to initialize
# INITIALIZE DATA
# byte offset is 8 if version is 2, 0 otherwise
self._initialize()
# END handle attributes
def _crc_v2(self, i):
""":return: 4 bytes crc for the object at index i"""
return unpack_from(">L", self._cursor.map(), self._crc_list_offset + i * 4)[0]
def _entry_v1(self, i):
""":return: tuple(offset, binsha, 0)"""
return unpack_from(">L20s", self._cursor.map(), 1024 + i * 24) + (0, )
def _crc_v2(self, i):
""":return: 4 bytes crc for the object at index i"""
return unpack_from(">L", self._cursor.map(), self._crc_list_offset + i * 4)[0]
def _read_fanout(self, byte_offset):
"""Generate a fanout table from our data"""
d = self._cursor.map()
out = list()
append = out.append
for i in xrange(256):
append(unpack_from('>L', d, byte_offset + i * 4)[0])
# END for each entry
return out