Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_SubStream(self.stream, i * self.chunk_size, last_block_size if i == blocks - 1 else self.chunk_size,
lock))
def process_substream_block(self, block_data):
return self._upload_substream_block_with_progress(block_data[0], block_data[1])
def _upload_substream_block_with_progress(self, block_id, block_stream):
range_id = self._upload_substream_block(block_id, block_stream)
return range_id
def set_response_properties(self, resp):
self.etag = resp.etag
self.last_modified = resp.last_modified
class _BlockBlobChunkUploader(_BlobChunkUploader):
def _upload_chunk(self, chunk_offset, chunk_data):
# TODO: This is incorrect, but works with recording.
block_id = encode_base64(url_quote(encode_base64('{0:032d}'.format(chunk_offset))))
self.blob_service.stage_block(
block_id,
len(chunk_data),
chunk_data,
timeout=self.timeout,
lease_access_conditions=self.lease_access_conditions,
validate_content=self.validate_content,
**self.request_options)
return BlobBlock(block_id)
def _upload_substream_block(self, block_id, block_stream):
try:
_SubStream(self.stream, i * self.chunk_size, last_block_size if i == blocks - 1 else self.chunk_size,
lock))
def process_substream_block(self, block_data):
return self._upload_substream_block_with_progress(block_data[0], block_data[1])
def _upload_substream_block_with_progress(self, block_id, block_stream):
range_id = self._upload_substream_block(block_id, block_stream)
self._update_progress(len(block_stream))
return range_id
def set_response_properties(self, resp):
self.response_properties = resp
class _BlockBlobChunkUploader(_BlobChunkUploader):
def _upload_chunk(self, chunk_offset, chunk_data):
block_id = url_quote(_encode_base64('{0:032d}'.format(chunk_offset)))
self.blob_service._put_block(
self.container_name,
self.blob_name,
chunk_data,
block_id,
validate_content=self.validate_content,
lease_id=self.lease_id,
timeout=self.timeout,
cpk=self.cpk,
)
return BlobBlock(block_id)
def _upload_substream_block(self, block_id, block_stream):
try:
timeout=self.timeout,
range=content_range,
lease_access_conditions=self.lease_access_conditions,
modified_access_conditions=self.modified_access_conditions,
validate_content=self.validate_content,
cls=return_response_headers,
**self.request_options
)
if not self.parallel:
self.modified_access_conditions = get_modification_conditions(
if_match=self.response_headers['ETag'])
return None
class _AppendBlobChunkUploader(_BlobChunkUploader):
def _upload_chunk(self, chunk_offset, chunk_data):
if not hasattr(self, 'current_length'):
self.response_headers = self.blob_service.append_block(
chunk_data,
content_length=len(chunk_data),
timeout=self.timeout,
lease_access_conditions=self.lease_access_conditions,
modified_access_conditions=self.modified_access_conditions,
validate_content=self.validate_content,
append_position_access_conditions=self.append_conditions,
cls=return_response_headers,
**self.request_options
)
self.current_length = int(self.response_headers['x-ms-blob-append-offset'])
else:
try:
self.blob_service.stage_block(
block_id,
len(block_stream),
block_stream,
validate_content=self.validate_content,
lease_access_conditions=self.lease_access_conditions,
timeout=self.timeout,
**self.request_options
)
finally:
block_stream.close()
return BlobBlock(block_id)
class _PageBlobChunkUploader(_BlobChunkUploader):
def _is_chunk_empty(self, chunk_data):
# read until non-zero byte is encountered
# if reached the end without returning, then chunk_data is all 0's
for each_byte in chunk_data:
if each_byte != 0 and each_byte != b'\x00':
return False
return True
def _upload_chunk(self, chunk_start, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_start + len(chunk_data) - 1
content_range = 'bytes={0}-{1}'.format(chunk_start, chunk_end)
computed_md5 = None
self.response_headers = self.blob_service.upload_pages(
chunk_start,
chunk_end,
validate_content=self.validate_content,
lease_id=self.lease_id,
if_match=self.if_match,
timeout=self.timeout,
cpk=self.cpk,
)
if not self.parallel:
self.if_match = resp.etag
self.set_response_properties(resp)
class _AppendBlobChunkUploader(_BlobChunkUploader):
def _upload_chunk(self, chunk_offset, chunk_data):
if not hasattr(self, 'current_length'):
resp = self.blob_service.append_block(
self.container_name,
self.blob_name,
chunk_data,
validate_content=self.validate_content,
lease_id=self.lease_id,
maxsize_condition=self.maxsize_condition,
timeout=self.timeout,
if_modified_since=self.if_modified_since,
if_unmodified_since=self.if_unmodified_since,
if_match=self.if_match,
if_none_match=self.if_none_match,
cpk=self.cpk,
)
self.blob_service._put_block(
self.container_name,
self.blob_name,
block_stream,
block_id,
validate_content=self.validate_content,
lease_id=self.lease_id,
timeout=self.timeout,
cpk=self.cpk,
)
finally:
block_stream.close()
return BlobBlock(block_id)
class _PageBlobChunkUploader(_BlobChunkUploader):
def _is_chunk_empty(self, chunk_data):
# read until non-zero byte is encountered
# if reached the end without returning, then chunk_data is all 0's
for each_byte in chunk_data:
if each_byte != 0 and each_byte != b'\x00':
return False
return True
def _upload_chunk(self, chunk_start, chunk_data):
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_start + len(chunk_data) - 1
resp = self.blob_service._update_page(
self.container_name,
self.blob_name,
chunk_data,