Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@validate_url
def exists(self, url):
"""Get if the object at the given URL exist."""
return storage.Blob(bucket=self.bucket, name=os.fspath(url)).exists()
@validate_url
def get_object_list(self, url):
"""Get a list of objects stored bellow the given URL."""
url = os.path.join(url, "")
paginator = self.client.get_paginator("list_objects_v2")
kwargs = {"Bucket": self.bucket_name, "Prefix": url}
ret = []
for page in paginator.paginate(**kwargs):
try:
contents = page["Contents"]
except KeyError:
break
for obj in contents:
ret.append(Path(obj["Key"]).relative_to(url).as_posix())
return ret
@validate_url
def push(self, stream, url):
"""Push data from the stream to the given URL."""
url = os.fspath(url)
mime_type = mimetypes.guess_type(url)[0]
extra_args = {} if mime_type is None else {"ContentType": mime_type}
self.client.upload_fileobj(
stream,
self.bucket_name,
url,
Config=self._get_transfer_config(),
ExtraArgs=extra_args,
)
@validate_url
def get_hashes(self, url, hash_types):
"""Get the hash of the given type for the given object."""
hasher = StreamHasher(chunk_size=self.multipart_chunksize)
path = self.base_path / url
if not path.exists():
return None
with path.open("rb", self.CHUNK_SIZE) as f:
hasher.compute(f)
return {hash_type: hasher.hexdigest(hash_type) for hash_type in hash_types}
@validate_url
def get_hashes(self, url, hash_types):
"""Get the hash of the given type for the given object."""
hashes = dict()
blob = self.bucket.get_blob(os.fspath(url))
if blob is None:
return None
blob.update()
for hash_type in hash_types:
if hash_type in self.hash_propery:
prop = self.hash_propery[hash_type]
hashes[hash_type] = base64.b64decode(getattr(blob, prop)).hex()
else:
hashes[hash_type] = blob.metadata[hash_type]
return hashes
@validate_url
def presigned_url(self, url, expiration=60, force_download=False):
"""Create a presigned URL.
The URL is used to obtain temporary access to the object ar the
given URL using only returned URL.
:param expiration: expiration time of the link (in seconds), default
is one minute.
:param force_download: force download.
:returns: URL that can be used to access object or None.
"""
content_disposition = "attachment" if force_download else "inline"
response = None
try:
@validate_url
def exists(self, url):
"""Get if the object at the given URL exist."""
return (self.base_path / url).exists()
@validate_url
def get(self, url, stream):
"""Get data from the given URL and write it into the given stream."""
blob = self.bucket.blob(os.fspath(url))
blob.download_to_file(stream)
@validate_url
def get_hash(self, url, hash_type):
"""Get the hash of the given type for the given object."""
blob = self.bucket.get_blob(os.fspath(url))
if blob is None:
return None
blob.update()
if hash_type in self.hash_propery:
prop = self.hash_propery[hash_type]
return base64.b64decode(getattr(blob, prop)).hex()
else:
return blob.metadata[hash_type]
@validate_url
def push(self, stream, url, hash_type=None, data_hash=None):
"""Push data from the stream to the given URL."""
url = os.fspath(url)
mime_type = mimetypes.guess_type(url)[0]
blob = self.bucket.blob(url)
if hash_type is not None:
assert hash_type in self.supported_hash
prop = self.hash_propery[hash_type]
setattr(blob, prop, data_hash)
blob.upload_from_file(stream, content_type=mime_type)