Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _upload_chunk(self, chunk_offset, chunk_data):
# TODO: This is incorrect, but works with recording.
block_id = encode_base64(url_quote(encode_base64('{0:032d}'.format(chunk_offset))))
self.blob_service.stage_block(
block_id,
len(chunk_data),
chunk_data,
timeout=self.timeout,
lease_access_conditions=self.lease_access_conditions,
validate_content=self.validate_content,
**self.request_options)
return BlobBlock(block_id)
def _upload_substream_block(self, block_id, block_stream):
try:
self.blob_service.stage_block(
block_id,
len(block_stream),
block_stream,
validate_content=self.validate_content,
lease_access_conditions=self.lease_access_conditions,
timeout=self.timeout,
**self.request_options
)
finally:
block_stream.close()
return BlobBlock(block_id)
access_conditions = get_access_conditions(lease)
try:
blocks= self._client.block_blob.get_block_list(
list_type=block_list_type,
snapshot=self.snapshot,
timeout=timeout,
lease_access_conditions=access_conditions,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
committed = []
uncommitted = []
if blocks.committed_blocks:
committed = [BlobBlock._from_generated(b) for b in blocks.committed_blocks]
if blocks.uncommitted_blocks:
uncommitted = [BlobBlock._from_generated(b) for b in blocks.uncommitted_blocks]
return committed, uncommitted
def _upload_chunk(self, chunk_offset, chunk_data):
block_id = url_quote(_encode_base64('{0:032d}'.format(chunk_offset)))
self.blob_service._put_block(
self.container_name,
self.blob_name,
chunk_data,
block_id,
validate_content=self.validate_content,
lease_id=self.lease_id,
timeout=self.timeout,
cpk=self.cpk,
)
return BlobBlock(block_id)
if self.blob_type != BlobType.BlockBlob:
raise TypeError("This operation is only available for BlockBlob type blobs.")
access_conditions = get_access_conditions(lease)
try:
blocks= self._client.block_blob.get_block_list(
list_type=block_list_type,
snapshot=self.snapshot,
timeout=timeout,
lease_access_conditions=access_conditions,
**kwargs)
except StorageErrorException as error:
process_storage_error(error)
committed = []
uncommitted = []
if blocks.committed_blocks:
committed = [BlobBlock._from_generated(b) for b in blocks.committed_blocks]
if blocks.uncommitted_blocks:
uncommitted = [BlobBlock._from_generated(b) for b in blocks.uncommitted_blocks]
return committed, uncommitted
def _upload_substream_block(self, block_id, block_stream):
try:
self.blob_service._put_block(
self.container_name,
self.blob_name,
block_stream,
block_id,
validate_content=self.validate_content,
lease_id=self.lease_id,
timeout=self.timeout,
cpk=self.cpk,
)
finally:
block_stream.close()
return BlobBlock(block_id)
Converts xml response to block list class.
'''
if response is None or response.body is None:
return None
block_list = BlobBlockList()
list_element = ETree.fromstring(response.body)
committed_blocks_element = list_element.find('CommittedBlocks')
if committed_blocks_element is not None:
for block_element in committed_blocks_element.findall('Block'):
block_id = _decode_base64_to_text(block_element.findtext('Name', ''))
block_size = int(block_element.findtext('Size'))
block = BlobBlock(id=block_id, state=BlobBlockState.Committed)
block._set_size(block_size)
block_list.committed_blocks.append(block)
uncommitted_blocks_element = list_element.find('UncommittedBlocks')
if uncommitted_blocks_element is not None:
for block_element in uncommitted_blocks_element.findall('Block'):
block_id = _decode_base64_to_text(block_element.findtext('Name', ''))
block_size = int(block_element.findtext('Size'))
block = BlobBlock(id=block_id, state=BlobBlockState.Uncommitted)
block._set_size(block_size)
block_list.uncommitted_blocks.append(block)
return block_list