Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
headers = kwargs.pop('headers', {})
headers.update(add_metadata_headers(metadata))
access_conditions = get_access_conditions(lease)
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
try:
return self._client.blob.set_metadata(
timeout=timeout,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
headers=headers,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
if_modified_since, if_unmodified_since, if_match, if_none_match)
try:
return self._client.page_blob.upload_pages(
page[:length],
content_length=length,
transactional_content_md5=None,
timeout=timeout,
range=content_range,
lease_access_conditions=access_conditions,
sequence_number_access_conditions=seq_conditions,
modified_access_conditions=mod_conditions,
validate_content=validate_content,
cls=return_response_headers,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
source_if_modified_since,
source_if_unmodified_since,
source_if_match,
source_if_none_match)
start_copy = self._client.blob.start_copy_from_url(
source_url,
timeout=timeout,
source_modified_access_conditions=source_mod_conditions,
modified_access_conditions=dest_mod_conditions,
lease_access_conditions=dest_access_conditions,
headers=headers,
cls=return_response_headers,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
poller = CopyStatusPoller(
self, start_copy,
polling=polling,
configuration=self._config,
lease_access_conditions=destination_lease,
timeout=timeout)
return poller
blob = self.service.download(
timeout=self.timeout,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
error_map=basic_error_map(),
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=0,
data_stream_current=0,
**self.request_options)
# Set the download size to empty
self.download_size = 0
self.blob_size = 0
else:
process_storage_error(error)
# If the blob is small, the download is complete at this point.
# If blob size is large, download the rest of the blob in chunks.
if blob.properties.content_length != self.download_size:
# Lock on the etag. This can be overriden by the user by specifying '*'
if not self.mod_conditions:
self.mod_conditions = ModifiedAccessConditions()
if not self.mod_conditions.if_match:
self.mod_conditions.if_match = blob.properties.etag
else:
self._download_complete = True
return blob
access_conditions = get_access_conditions(lease)
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
if sequence_number_action is None:
raise ValueError("A sequence number action must be specified")
try:
return self._client.page_blob.update_sequence_number(
sequence_number_action=sequence_number_action,
timeout=timeout,
blob_sequence_number=sequence_number,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
modified_access_conditions=mod_conditions,
timeout=timeout,
range=page_range,
**kwargs
)
else:
ranges = self._client.page_blob.get_page_ranges(
snapshot=self.snapshot,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
timeout=timeout,
range=page_range,
**kwargs
)
except StorageErrorException as error:
process_storage_error(error)
page_range = []
clear_range = []
if ranges.page_range:
page_range = [{'start': b.start, 'end': b.end} for b in ranges.page_range]
if ranges.clear_range:
clear_range = [{'start': b.start, 'end': b.end} for b in ranges.clear_range]
return page_range, clear_range
:returns: BlobProperties
"""
access_conditions = get_access_conditions(lease)
mod_conditions = get_modification_conditions(
if_modified_since, if_unmodified_since, if_match, if_none_match)
try:
blob_props = self._client.blob.get_properties(
timeout=timeout,
snapshot=self.snapshot,
lease_access_conditions=access_conditions,
modified_access_conditions=mod_conditions,
cls=deserialize_blob_properties,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
blob_props.name = self.name
blob_props.container = self.container
return blob_props
def _update_status(self):
try:
self.blob = self._client._client.blob.get_properties(
cls=deserialize_blob_properties,
error_map=basic_error_map(),
**self.kwargs)
except StorageErrorException as error:
process_storage_error(error)
self._status = self.blob.copy.status
self.etag = self.blob.etag
self.last_modified = self.blob.last_modified
raise ValueError("start_range must be an integer that aligns with 512 page size")
if end_range is None or end_range % 512 != 511:
raise ValueError("end_range must be an integer that aligns with 512 page size")
content_range = 'bytes={0}-{1}'.format(start_range, end_range)
try:
return self._client.page_blob.clear_pages(
content_length=0,
timeout=timeout,
range=content_range,
lease_access_conditions=access_conditions,
sequence_number_access_conditions=seq_conditions,
modified_access_conditions=mod_conditions,
cls=return_response_headers,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
container's lease is active and matches this ID.
:param int timeout:
The timeout parameter is expressed in seconds.
:return: properties for the specified container within a container object.
:rtype: ~azure.storage.blob.models.ContainerProperties
"""
access_conditions = get_access_conditions(lease)
try:
response = self._client.container.get_properties(
timeout=timeout,
lease_access_conditions=access_conditions,
cls=deserialize_container_properties,
error_map=basic_error_map(),
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
response.name = self.name
return response