Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.subscriber = RecordingSubscriber()
self.second_subscriber = RecordingSubscriber()
self.call_args = CallArgs(subscribers=[
self.subscriber, self.second_subscriber]
)
self.transfer_meta = TransferMeta(self.call_args)
self.transfer_future = TransferFuture(self.transfer_meta)
def get_call_args(self, **kwargs):
default_call_args = {
'fileobj': self.filename, 'bucket': self.bucket,
'key': self.key, 'extra_args': self.extra_args,
'subscribers': self.subscribers
}
default_call_args.update(kwargs)
return CallArgs(**default_call_args)
def get_call_args(self, **kwargs):
default_call_args = {
'fileobj': self.filename, 'bucket': self.bucket,
'key': self.key, 'extra_args': self.extra_args,
'subscribers': self.subscribers
}
default_call_args.update(kwargs)
return CallArgs(**default_call_args)
def setUp(self):
super(TestDownloadSeekableOutputManager, self).setUp()
self.download_output_manager = DownloadSeekableOutputManager(
self.osutil, self.transfer_coordinator,
io_executor=self.io_executor)
# Create a fileobj to write to
self.fileobj = open(self.filename, 'wb')
self.call_args = CallArgs(fileobj=self.fileobj)
self.future = self.get_transfer_future(self.call_args)
client operation
:type subscribers: list(s3transfer.subscribers.BaseSubscriber)
:param subscribers: The list of subscribers to be invoked in the
order provided based on the event emit during the process of
the transfer request.
:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the download
"""
if extra_args is None:
extra_args = {}
if subscribers is None:
subscribers = []
self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
call_args = CallArgs(
bucket=bucket, key=key, fileobj=fileobj, extra_args=extra_args,
subscribers=subscribers
)
extra_main_kwargs = {'io_executor': self._io_executor}
if self._bandwidth_limiter:
extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
return self._submit_transfer(
call_args, DownloadSubmissionTask, extra_main_kwargs)
client operation
:type subscribers: list(s3transfer.subscribers.BaseSubscriber)
:param subscribers: The list of subscribers to be invoked in the
order provided based on the event emit during the process of
the transfer request.
:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the upload
"""
if extra_args is None:
extra_args = {}
if subscribers is None:
subscribers = []
self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
call_args = CallArgs(
fileobj=fileobj, bucket=bucket, key=key, extra_args=extra_args,
subscribers=subscribers
)
extra_main_kwargs = {}
if self._bandwidth_limiter:
extra_main_kwargs['bandwidth_limiter'] = self._bandwidth_limiter
return self._submit_transfer(
call_args, UploadSubmissionTask, extra_main_kwargs)
may happen at the source object. For example, this client is
used for the head_object that determines the size of the copy.
If no client is provided, the transfer manager's client is used
as the client for the source object.
:rtype: s3transfer.futures.TransferFuture
:returns: Transfer future representing the copy
"""
if extra_args is None:
extra_args = {}
if subscribers is None:
subscribers = []
if source_client is None:
source_client = self._client
self._validate_all_known_args(extra_args, self.ALLOWED_COPY_ARGS)
call_args = CallArgs(
copy_source=copy_source, bucket=bucket, key=key,
extra_args=extra_args, subscribers=subscribers,
source_client=source_client
)
return self._submit_transfer(call_args, CopySubmissionTask)