Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert my_istream is istream
assert db.has_object(sha) != dry_run
assert len(sha) == 20
# verify data - the slow way, we want to run code
if not dry_run:
info = db.info(sha)
assert str_blob_type == info.type
assert info.size == len(data)
ostream = db.stream(sha)
assert ostream.read() == data
assert ostream.type == str_blob_type
assert ostream.size == len(data)
else:
self.failUnlessRaises(BadObject, db.info, sha)
self.failUnlessRaises(BadObject, db.stream, sha)
# DIRECT STREAM COPY
# our data hase been written in object format to the StringIO
# we pasesd as output stream. No physical database representation
# was created.
# Test direct stream copy of object streams, the result must be
# identical to what we fed in
ostream.seek(0)
istream.stream = ostream
assert istream.binsha is not None
prev_sha = istream.binsha
db.set_ostream(ZippedStoreShaWriter())
db.store(istream)
assert istream.binsha == prev_sha
def set_commit(self, commit, logmsg=None):
"""As set_object, but restricts the type of object to be a Commit
:raise ValueError: If commit is not a Commit object or doesn't point to
a commit
:return: self"""
# check the type - assume the best if it is a base-string
invalid_type = False
if isinstance(commit, Object):
invalid_type = commit.type != Commit.type
elif isinstance(commit, SymbolicReference):
invalid_type = commit.object.type != Commit.type
else:
try:
invalid_type = self.repo.rev_parse(commit).type != Commit.type
except (BadObject, BadName):
raise ValueError("Invalid object: %s" % commit)
# END handle exception
# END verify type
if invalid_type:
raise ValueError("Need commit, got %r" % commit)
# END handle raise
# we leave strings to the rev-parse method below
self.set_object(commit, logmsg)
return self
return SymbolicReference(repo, base % name)
#END handle symbolic ref
break
except ValueError:
pass
# END for each base
# END handle hexsha
# didn't find any ref, this is an error
if return_ref:
raise BadObject("Couldn't find reference named %r" % name)
#END handle return ref
# tried everything ? fail
if hexsha is None:
raise BadObject(name)
# END assert hexsha was found
return Object.new_from_sha(repo, hex_to_bin(hexsha))
if hexsha is None:
for base in ('%s', 'refs/%s', 'refs/tags/%s', 'refs/heads/%s', 'refs/remotes/%s', 'refs/remotes/%s/HEAD'):
try:
hexsha = SymbolicReference.dereference_recursive(repo, base % name)
if return_ref:
return SymbolicReference(repo, base % name)
# END handle symbolic ref
break
except ValueError:
pass
# END for each base
# END handle hexsha
# didn't find any ref, this is an error
if return_ref:
raise BadObject("Couldn't find reference named %r" % name)
# END handle return ref
# tried everything ? fail
if hexsha is None:
raise BadName(name)
# END assert hexsha was found
return Object.new_from_sha(repo, hex_to_bin(hexsha))
def stream(self, sha):
try:
ostream = self._cache[sha]
# rewind stream for the next one to read
ostream.stream.seek(0)
return ostream
except KeyError as e:
raise BadObject(sha) from e
# END exception handling
""":return: database containing the given 20 byte sha
:raise BadObject:"""
# most databases use binary representations, prevent converting
# it every time a database is being queried
try:
return self._db_cache[sha]
except KeyError:
pass
# END first level cache
for db in self._dbs:
if db.has_object(sha):
self._db_cache[sha] = db
return db
# END for each database
raise BadObject(sha)
return ODeltaStream(sha, dstream.type, None, dstream)
else:
if type_id not in delta_types:
return OInfo(sha, type_id_to_type_map[type_id], uncomp_size)
# END handle non-deltas
# deltas are a little tougher - unpack the first bytes to obtain
# the actual target size, as opposed to the size of the delta data
streams = self.collect_streams_at_offset(offset)
buf = streams[0].read(512)
offset, src_size = msb_size(buf)
offset, target_size = msb_size(buf, offset)
# collect the streams to obtain the actual object type
if streams[-1].type_id in delta_types:
raise BadObject(sha, "Could not resolve delta object")
return OInfo(sha, streams[-1].type, target_size)
# END handle stream
def has_object(self, sha):
try:
self._pack_info(sha)
return True
except BadObject:
return False
# END exception handling
"""
:return: readable object path to the object identified by hexsha
:raise BadObject: If the object file does not exist"""
try:
return self._hexsha_to_file[hexsha]
except KeyError:
pass
# END ignore cache misses
# try filesystem
path = self.db_path(self.object_path(hexsha))
if exists(path):
self._hexsha_to_file[hexsha] = path
return path
# END handle cache
raise BadObject(hexsha)
:return: self
:note: This symbolic reference will not be dereferenced. For that, see
``set_object(...)``"""
write_value = None
obj = None
if isinstance(ref, SymbolicReference):
write_value = "ref: %s" % ref.path
elif isinstance(ref, Object):
obj = ref
write_value = ref.hexsha
elif isinstance(ref, basestring):
try:
obj = self.repo.rev_parse(ref+"^{}") # optionally deref tags
write_value = obj.hexsha
except BadObject:
raise ValueError("Could not extract object from %s" % ref)
# END end try string
else:
raise ValueError("Unrecognized Value: %r" % ref)
# END try commit attribute
# typecheck
if obj is not None and self._points_to_commits_only and obj.type != Commit.type:
raise TypeError("Require commit, got %r" % obj)
#END verify type
oldbinsha = None
if logmsg is not None:
try:
oldbinsha = self.commit.binsha
except ValueError: