Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.prefetch = prefetch
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.reconnect_backoff = 1
self.redirected = None
self.error = None
self.properties = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
def _settle_deferred(self, settlement, lock_tokens, dead_letter_details=None):
message = {
'disposition-status': settlement,
'lock-tokens': types.AMQPArray(lock_tokens),
'session-id': self.session_id}
if dead_letter_details:
message.update(dead_letter_details)
return self._mgmt_request_response(
REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION,
message,
mgmt_handlers.default)
def enqueue_sequence_number(self, value):
if not self.message.annotations:
self.message.annotations = {}
self.message.annotations[types.AMQPSymbol(self._X_OPT_ENQUEUE_SEQUENCE_NUMBER)] = value
:end-before: [END receive_deferred_messages]
:language: python
:dedent: 4
:caption: Get the messages which were previously deferred in the session
"""
if not sequence_numbers:
raise ValueError("At least one sequence number must be specified.")
self._can_run()
try:
receive_mode = mode.value.value
except AttributeError:
receive_mode = int(mode)
message = {
'sequence-numbers': types.AMQPArray([types.AMQPLong(s) for s in sequence_numbers]),
'receiver-settle-mode': types.AMQPuInt(receive_mode),
'session-id': self.session_id
}
handler = functools.partial(mgmt_handlers.deferred_message_op, mode=receive_mode)
messages = self._mgmt_request_response(
REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER,
message,
handler)
for m in messages:
m._receiver = self # pylint: disable=protected-access
return messages