Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
header_parameters['x-ms-lease-action'] = self._serialize.header("action", action, 'str')
if if_modified_since is not None:
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-lease-id': self._deserialize('str', response.headers.get('x-ms-lease-id')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
acquire_lease.metadata = {'url': '/{containerName}'}
data_stream_total=None,
data_stream_current=0,
**self.request_options)
# Parse the total blob size and adjust the download size if ranges
# were specified
self.blob_size = parse_length_from_content_range(blob.properties.content_range)
if self.length is not None:
# Use the length unless it is over the end of the blob
self.download_size = min(self.blob_size, self.length - self.offset + 1)
elif self.offset is not None:
self.download_size = self.blob_size - self.offset
else:
self.download_size = self.blob_size
except StorageErrorException as error:
if self.offset is None and error.response.status_code == 416:
# Get range will fail on an empty blob. If the user did not
# request a range, do a regular get request in order to get
# any properties.
blob = self.service.download(
timeout=self.timeout,
lease_access_conditions=self.access_conditions,
modified_access_conditions=self.mod_conditions,
error_map=basic_error_map(),
validate_content=self.validate_content,
cls=deserialize_blob_stream,
data_stream_total=0,
data_stream_current=0,
**self.request_options)
# Set the download size to empty
query_parameters = {}
query_parameters['restype'] = self._serialize.query("self.restype", self.restype, 'str')
query_parameters['comp'] = self._serialize.query("comp", comp, 'str')
# Construct headers
header_parameters = {}
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-sku-name': self._deserialize(models.SkuName, response.headers.get('x-ms-sku-name')),
'x-ms-account-kind': self._deserialize(models.AccountKind, response.headers.get('x-ms-account-kind')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
get_account_info.metadata = {'url': '/{containerName}/{blobName}'}
# Construct headers
header_parameters = {}
header_parameters['Accept'] = 'application/xml'
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
header_dict = {}
deserialized = None
if response.status_code == 200:
deserialized = self._deserialize('ListBlobsFlatSegmentResponse', response)
header_dict = {
'Content-Type': self._deserialize('str', response.headers.get('Content-Type')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
if cls:
return cls(response, deserialized, header_dict)
serialization_ctxt = {'xml': {'name': 'SignedIdentifiers', 'itemsName': 'SignedIdentifier', 'wrapped': True}}
if container_acl is not None:
body_content = self._serialize.serialize_iter(container_acl, 'SignedIdentifier',
serialization_ctxt=serialization_ctxt)
print(body_content)
else:
body_content = None
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters, body_content)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
set_access_policy.metadata = {'url': '/{containerName}'}
# Construct headers
header_parameters = {}
header_parameters['Accept'] = 'application/xml'
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
header_dict = {}
deserialized = None
if response.status_code == 200:
deserialized = self._deserialize('StorageServiceProperties', response)
header_dict = {
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
if cls:
return cls(response, deserialized, header_dict)
return deserialized
get_properties.metadata = {'url': '/'}
# Construct headers
header_parameters = {}
header_parameters['x-ms-version'] = self._serialize.header("self._config.version", self._config.version, 'str')
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
if lease_id is not None:
header_parameters['x-ms-lease-id'] = self._serialize.header("lease_id", lease_id, 'str')
# Construct and send request
request = self._client.get(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-meta': self._deserialize('{str}', response.headers.get('x-ms-meta')),
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-lease-duration': self._deserialize(models.LeaseDurationType, response.headers.get('x-ms-lease-duration')),
'x-ms-lease-state': self._deserialize(models.LeaseStateType, response.headers.get('x-ms-lease-state')),
'x-ms-lease-status': self._deserialize(models.LeaseStatusType, response.headers.get('x-ms-lease-status')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-blob-public-access': self._deserialize('str', response.headers.get('x-ms-blob-public-access')),
'x-ms-has-immutability-policy': self._deserialize('bool', response.headers.get('x-ms-has-immutability-policy')),
'x-ms-has-legal-hold': self._deserialize('bool', response.headers.get('x-ms-has-legal-hold')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
if lease_id is not None:
header_parameters['x-ms-lease-id'] = self._serialize.header("lease_id", lease_id, 'str')
if if_modified_since is not None:
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
# Construct and send request
request = self._client.delete(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
delete.metadata = {'url': '/{containerName}'}
if request_id is not None:
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id", request_id, 'str')
header_parameters['x-ms-lease-action'] = self._serialize.header("action", action, 'str')
if if_modified_since is not None:
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
release_lease.metadata = {'url': '/{containerName}'}
header_parameters['If-Modified-Since'] = self._serialize.header("if_modified_since", if_modified_since, 'rfc-1123')
if if_unmodified_since is not None:
header_parameters['If-Unmodified-Since'] = self._serialize.header("if_unmodified_since", if_unmodified_since, 'rfc-1123')
if if_match is not None:
header_parameters['If-Match'] = self._serialize.header("if_match", if_match, 'str')
if if_none_match is not None:
header_parameters['If-None-Match'] = self._serialize.header("if_none_match", if_none_match, 'str')
# Construct and send request
request = self._client.put(url, query_parameters, header_parameters)
pipeline_response = self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response
if response.status_code not in [202]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise models.StorageErrorException(response, self._deserialize)
if cls:
response_headers = {
'ETag': self._deserialize('str', response.headers.get('ETag')),
'Last-Modified': self._deserialize('rfc-1123', response.headers.get('Last-Modified')),
'x-ms-lease-time': self._deserialize('int', response.headers.get('x-ms-lease-time')),
'x-ms-request-id': self._deserialize('str', response.headers.get('x-ms-request-id')),
'x-ms-version': self._deserialize('str', response.headers.get('x-ms-version')),
'Date': self._deserialize('rfc-1123', response.headers.get('Date')),
'x-ms-error-code': self._deserialize('str', response.headers.get('x-ms-error-code')),
}
return cls(response, None, response_headers)
break_lease.metadata = {'url': '/{containerName}/{blob}'}