Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_reading(self):
gdb = GitDB(fixture_path('../../.git/objects'))
# we have packs and loose objects, alternates doesn't necessarily exist
assert 1 < len(gdb.databases()) < 4
# access should be possible
gitdb_sha = hex_to_bin("5690fd0d3304f378754b23b098bd7cb5f4aa1976")
assert isinstance(gdb.info(gitdb_sha), OInfo)
assert isinstance(gdb.stream(gitdb_sha), OStream)
assert gdb.size() > 200
sha_list = list(gdb.sha_iter())
assert len(sha_list) == gdb.size()
# This is actually a test for compound functionality, but it doesn't
# have a separate test module
# test partial shas
# this one as uneven and quite short
assert gdb.partial_to_complete_sha_hex('155b6') == hex_to_bin("155b62a9af0aa7677078331e111d0f7aa6eb4afc")
# mix even/uneven hexshas
for i, binsha in enumerate(sha_list):
assert gdb.partial_to_complete_sha_hex(bin_to_hex(binsha)[:8-(i%2)]) == binsha
def get_commit(self, hash):
try:
hexsha = gitdb.util.hex_to_bin(hash) # have to convert it first
commit = git.objects.commit.Commit(self.repo, hexsha)
# Call commit.size to see if the commit actually exists
# If it doesn't, it will raise an exception
commit.size
return commit
except:
# Invalid hash (theoretically)
return None
def _deserialize(self, stream):
""":param from_rev_list: if true, the stream format is coming from the rev-list command
Otherwise it is assumed to be a plain data stream from our object"""
readline = stream.readline
self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id<<12, '')
self.parents = list()
next_line = None
while True:
parent_line = readline()
if not parent_line.startswith('parent'):
next_line = parent_line
break
# END abort reading parents
self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1])))
# END for each parent line
self.parents = tuple(self.parents)
self.author, self.authored_date, self.author_tz_offset = parse_actor_and_date(next_line)
self.committer, self.committed_date, self.committer_tz_offset = parse_actor_and_date(readline())
refs = decoration.strip(' ()').replace(' -> ', ', ').split(', ')
else:
refs = []
tag_prefix = 'tag: ' # introduced in git 1.8.3
for i, ref in enumerate(refs):
if ref.startswith(tag_prefix):
refs[i] = ref[len(tag_prefix):]
refs.sort()
renamed = {}
# merge commits don't have any --name-status output
if len(commit_lines) > 1:
name_stat_parts = commit_lines[1].split(' ')
if name_stat_parts[0] == 'R100':
renamed['from'] = name_stat_parts[1]
renamed['to'] = name_stat_parts[2]
yield (git.Commit(self._git, gitdb.util.hex_to_bin(hexsha)), refs, renamed)
if not(len(line)):
# if all lines have been read
break
commit_lines = [line]
else:
commit_lines.append(line)
def _set_cache_(self, attr):
"""Cache all our attributes at once"""
if attr in TagObject.__slots__:
ostream = self.repo.odb.stream(self.binsha)
lines = ostream.read().splitlines()
obj, hexsha = lines[0].split(" ") # object
type_token, type_name = lines[1].split(" ") # type
self.object = get_object_type_by_name(type_name)(self.repo, hex_to_bin(hexsha))
self.tag = lines[2][4:] # tag
tagger_info = lines[3]# tagger
self.tagger, self.tagged_date, self.tagger_tz_offset = parse_actor_and_date(tagger_info)
# line 4 empty - it could mark the beginning of the next header
# in case there really is no message, it would not exist. Otherwise
# a newline separates header from message
if len(lines) > 5:
self.message = "\n".join(lines[5:])
else:
self.message = ''
# END check our attributes
else:
super(TagObject, self)._set_cache_(attr)
# a_blob_id, b_blob_id, b_mode = header.groups()
# new_file, deleted_file = bool(new_file_mode), bool(deleted_file_mode)
# Our only means to find the actual text is to see what has not been matched by our regex,
# and then retro-actively assin it to our index
# if previous_header is not None:
# index[-1].diff = text[previous_header.end():header.start()]
# end assign actual diff
# Make sure the mode is set if the path is set. Otherwise the resulting blob is invalid
# We just use the one mode we should have parsed
a_mode = old_mode or deleted_file_mode or (a_path and (b_mode or new_mode or new_file_mode))
b_mode = b_mode or new_mode or new_file_mode or (b_path and a_mode)
ablob = Blob(repo, hex_to_bin(a_blob_id), mode=a_mode, path=a_path)
bblob = Blob(repo, hex_to_bin(b_blob_id), mode=b_mode, path=a_path)
return ablob, bblob
self.a_mode = a_mode
self.b_mode = b_mode
if self.a_mode:
self.a_mode = mode_str_to_int(self.a_mode)
if self.b_mode:
self.b_mode = mode_str_to_int(self.b_mode)
if a_blob_id is None:
self.a_blob = None
else:
self.a_blob = Blob(repo, hex_to_bin(a_blob_id), mode=self.a_mode, path=a_path)
if b_blob_id is None:
self.b_blob = None
else:
self.b_blob = Blob(repo, hex_to_bin(b_blob_id), mode=self.b_mode, path=b_path)
self.new_file = new_file
self.deleted_file = deleted_file
# be clear and use None instead of empty strings
self.rename_from = rename_from or None
self.rename_to = rename_to or None
self.diff = diff
def partial_to_complete_sha_hex(self, partial_hexsha):
databases = self.databases()
len_partial_hexsha = len(partial_hexsha)
if len_partial_hexsha % 2 != 0:
partial_binsha = hex_to_bin(partial_hexsha + "0")
else:
partial_binsha = hex_to_bin(partial_hexsha)
# END assure successful binary conversion
candidate = None
for db in self._dbs:
full_bin_sha = None
try:
if hasattr(db, 'partial_to_complete_sha_hex'):
full_bin_sha = db.partial_to_complete_sha_hex(partial_hexsha)
else:
full_bin_sha = db.partial_to_complete_sha(partial_binsha, len_partial_hexsha)
# END handle database type
except BadObject:
continue
# END ignore bad objects
if full_bin_sha:
if candidate and candidate != full_bin_sha:
try:
# transform reversed index into the format of our revlog
revlog_index = -(int(output_type)+1)
except ValueError:
# TODO: Try to parse the other date options, using parse_date
# maybe
raise NotImplementedError("Support for additional @{...} modes not implemented")
#END handle revlog index
try:
entry = ref.log_entry(revlog_index)
except IndexError:
raise IndexError("Invalid revlog index: %i" % revlog_index)
#END handle index out of bound
obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha))
# make it pass the following checks
output_type = None
else:
raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
# END handle output type
# empty output types don't require any specific type, its just about dereferencing tags
if output_type and obj.type != output_type:
raise ValueError("Could not accomodate requested object type %r, got %s" % (output_type, obj.type))
# END verify ouput type
start = end+1 # skip brace
parsed_to = start
continue
# END parse type
# old_mode, new_mode, new_file_mode, deleted_file_mode, \
# a_blob_id, b_blob_id, b_mode = header.groups()
# new_file, deleted_file = bool(new_file_mode), bool(deleted_file_mode)
# Our only means to find the actual text is to see what has not been matched by our regex,
# and then retro-actively assin it to our index
# if previous_header is not None:
# index[-1].diff = text[previous_header.end():header.start()]
# end assign actual diff
# Make sure the mode is set if the path is set. Otherwise the resulting blob is invalid
# We just use the one mode we should have parsed
a_mode = old_mode or deleted_file_mode or (a_path and (b_mode or new_mode or new_file_mode))
b_mode = b_mode or new_mode or new_file_mode or (b_path and a_mode)
ablob = Blob(repo, hex_to_bin(a_blob_id), mode=a_mode, path=a_path)
bblob = Blob(repo, hex_to_bin(b_blob_id), mode=b_mode, path=a_path)
return ablob, bblob