How to use gitdb - 10 common examples

To help you get started, we’ve selected a few gitdb examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github gitpython-developers / gitdb / gitdb / fun.py View on Github external
def is_equal_canonical_sha(canonical_length, match, sha1):
    """
    :return: True if the given lhs and rhs 20 byte binary shas
        The comparison will take the canonical_length of the match sha into account,
        hence the comparison will only use the last 4 bytes for uneven canonical representations
    :param match: less than 20 byte sha
    :param sha1: 20 byte sha"""
    binary_length = canonical_length // 2
    if match[:binary_length] != sha1[:binary_length]:
        return False

    if canonical_length - binary_length and \
            (byte_ord(match[-1]) ^ byte_ord(sha1[len(match) - 1])) & 0xf0:
        return False
    # END handle uneven canonnical length
    return True
github gitpython-developers / gitdb / gitdb / pack.py View on Github external
def sha_to_index(self, sha):
        """
        :return: index usable with the ``offset`` or ``entry`` method, or None
            if the sha was not found in this pack index
        :param sha: 20 byte sha to lookup"""
        first_byte = byte_ord(sha[0])
        get_sha = self.sha
        lo = 0                  # lower index, the left bound of the bisection
        if first_byte != 0:
            lo = self._fanout_table[first_byte - 1]
        hi = self._fanout_table[first_byte]     # the upper, right bound of the bisection

        # bisect until we have the sha
        while lo < hi:
            mid = (lo + hi) // 2
            mid_sha = get_sha(mid)
            if sha < mid_sha:
                hi = mid
            elif sha == mid_sha:
                return mid
            else:
                lo = mid + 1
github h3llrais3r / Auto-Subliminal / lib / gitdb / pack.py View on Github external
def partial_sha_to_index(self, partial_bin_sha, canonical_length):
        """
        :return: index as in `sha_to_index` or None if the sha was not found in this
            index file
        :param partial_bin_sha: an at least two bytes of a partial binary sha as bytes
        :param canonical_length: length of the original hexadecimal representation of the
            given partial binary sha
        :raise AmbiguousObjectName:"""
        if len(partial_bin_sha) < 2:
            raise ValueError("Require at least 2 bytes of partial sha")

        assert isinstance(partial_bin_sha, bytes), "partial_bin_sha must be bytes"
        first_byte = byte_ord(partial_bin_sha[0])

        get_sha = self.sha
        lo = 0                  # lower index, the left bound of the bisection
        if first_byte != 0:
            lo = self._fanout_table[first_byte - 1]
        hi = self._fanout_table[first_byte]     # the upper, right bound of the bisection

        # fill the partial to full 20 bytes
        filled_sha = partial_bin_sha + NULL_BYTE * (20 - len(partial_bin_sha))

        # find lowest
        while lo < hi:
            mid = (lo + hi) // 2
            mid_sha = get_sha(mid)
            if filled_sha < mid_sha:
                hi = mid
github gitpython-developers / GitPython / test / performance / test_commit.py View on Github external
commits = list()
		nc = 5000
		st = time()
		for i in xrange(nc):
			cm = Commit(	rwrepo, Commit.NULL_BIN_SHA, hc.tree, 
							hc.author, hc.authored_date, hc.author_tz_offset, 
							hc.committer, hc.committed_date, hc.committer_tz_offset, 
							str(i), parents=hc.parents, encoding=hc.encoding)
			
			stream = StringIO()
			cm._serialize(stream)
			slen = stream.tell()
			stream.seek(0)
			
			cm.binsha = make_object(IStream(Commit.type, slen, stream)).binsha
		# END commit creation
		elapsed = time() - st
		
		print >> sys.stderr, "Serialized %i commits to loose objects in %f s ( %f commits / s )" % (nc, elapsed, nc / elapsed)
github gitpython-developers / gitdb / test / test_util.py View on Github external
lfd.commit()
			lfd.rollback()
			
			# test reading
			lfd = LockedFD(my_file)
			rfd = lfd.open(write=False)
			assert os.read(rfd, len(orig_data)) == orig_data
			
			assert os.path.isfile(lockfilepath)
			# deletion rolls back
			del(lfd)
			assert not os.path.isfile(lockfilepath)
			
			
			# write data - concurrently
			lfd = LockedFD(my_file)
			olfd = LockedFD(my_file)
			assert not os.path.isfile(lockfilepath)
			wfdstream = lfd.open(write=True, stream=True)		# this time as stream
			assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
github gitpython-developers / gitdb / test / test_util.py View on Github external
assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
		
		# try non-existing file for reading
		lfd = LockedFD(tempfile.mktemp())
		try:
			lfd.open(write=False)
		except OSError:
			assert not os.path.exists(lfd._lockfilepath())
		else:
			self.fail("expected OSError")
		# END handle exceptions
github gitpython-developers / gitdb / test / test_util.py View on Github external
lfd.rollback()
			
			# test reading
			lfd = LockedFD(my_file)
			rfd = lfd.open(write=False)
			assert os.read(rfd, len(orig_data)) == orig_data
			
			assert os.path.isfile(lockfilepath)
			# deletion rolls back
			del(lfd)
			assert not os.path.isfile(lockfilepath)
			
			
			# write data - concurrently
			lfd = LockedFD(my_file)
			olfd = LockedFD(my_file)
			assert not os.path.isfile(lockfilepath)
			wfdstream = lfd.open(write=True, stream=True)		# this time as stream
			assert os.path.isfile(lockfilepath)
			# another one fails
			self.failUnlessRaises(IOError, olfd.open)
			
			wfdstream.write(new_data)
			lfd.commit()
			assert not os.path.isfile(lockfilepath)
			self._cmp_contents(my_file, new_data)
			
			# could test automatic _end_writing on destruction
		finally:
			os.remove(my_file)
		# END final cleanup
github gitpython-developers / gitdb / test / db / lib.py View on Github external
assert my_istream is istream
				assert db.has_object(sha) != dry_run
				assert len(sha) == 20	
				
				# verify data - the slow way, we want to run code
				if not dry_run:
					info = db.info(sha)
					assert str_blob_type == info.type
					assert info.size == len(data)
					
					ostream = db.stream(sha)
					assert ostream.read() == data
					assert ostream.type == str_blob_type
					assert ostream.size == len(data)
				else:
					self.failUnlessRaises(BadObject, db.info, sha)
					self.failUnlessRaises(BadObject, db.stream, sha)
					
					# DIRECT STREAM COPY
					# our data hase been written in object format to the StringIO
					# we pasesd as output stream. No physical database representation
					# was created.
					# Test direct stream copy of object streams, the result must be 
					# identical to what we fed in
					ostream.seek(0)
					istream.stream = ostream
					assert istream.binsha is not None
					prev_sha = istream.binsha
					
					db.set_ostream(ZippedStoreShaWriter())
					db.store(istream)
					assert istream.binsha == prev_sha
github gitpython-developers / gitdb / test / db / test_git.py View on Github external
def test_reading(self):
		gdb = GitDB(fixture_path('../../.git/objects'))
		
		# we have packs and loose objects, alternates doesn't necessarily exist
		assert 1 < len(gdb.databases()) < 4
		
		# access should be possible
		gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
		assert isinstance(gdb.info(gitdb_sha), OInfo)
		assert isinstance(gdb.stream(gitdb_sha), OStream)
		assert gdb.size() > 200
		sha_list = list(gdb.sha_iter())
		assert len(sha_list) == gdb.size()
		
		
		# This is actually a test for compound functionality, but it doesn't 
		# have a separate test module
		# test partial shas
		# this one as uneven and quite short
		assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
		
		# mix even/uneven hexshas
		for i, binsha in enumerate(sha_list):
			assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
github gitpython-developers / GitPython / test / performance / test_streams.py View on Github external
ldb = LooseObjectDB(os.path.join(rwrepo.git_dir, 'objects'))
		
		for randomize in range(2):
			desc = (randomize and 'random ') or ''
			print >> sys.stderr, "Creating %s data ..." % desc
			st = time()
			size, stream = make_memory_file(self.large_data_size_bytes, randomize)
			elapsed = time() - st
			print >> sys.stderr, "Done (in %f s)" % elapsed
			
			# writing - due to the compression it will seem faster than it is 
			st = time()
			binsha = ldb.store(IStream('blob', size, stream)).binsha
			elapsed_add = time() - st
			assert ldb.has_object(binsha)
			db_file = ldb.readable_db_object_path(bin_to_hex(binsha))
			fsize_kib = os.path.getsize(db_file) / 1000
			
			
			size_kib = size / 1000
			print >> sys.stderr, "Added %i KiB (filesize = %i KiB) of %s data to loose odb in %f s ( %f Write KiB / s)" % (size_kib, fsize_kib, desc, elapsed_add, size_kib / elapsed_add)
			
			# reading all at once
			st = time()
			ostream = ldb.stream(binsha)
			shadata = ostream.read()
			elapsed_readall = time() - st
			
			stream.seek(0)
			assert shadata == stream.getvalue()
			print >> sys.stderr, "Read %i KiB of %s data at once from loose odb in %f s ( %f Read KiB / s)" % (size_kib, desc, elapsed_readall, size_kib / elapsed_readall)