Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAsync.from_shared_access_key(**self.auth_config)
self._handler = AMQPClientAsync(
self.endpoint,
loop=self.loop,
auth=auth,
debug=self.debug,
properties=self.properties,
error_policy=self.error_policy,
encoding=self.encoding,
**self.handler_kwargs)
def _build_handler(self):
auth = None if self.connection else authentication.SASTokenAsync.from_shared_access_key(**self.auth_config)
self._handler = ReceiveClientAsync(
self.endpoint,
auth=auth,
debug=self.debug,
properties=self.properties,
error_policy=self.error_policy,
client_name=self.name,
auto_complete=False,
encoding=self.encoding,
loop=self.loop,
**self.handler_kwargs)
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)
username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
return authentication.SASLPlain(
self.address.hostname, username, password, http_proxy=self.http_proxy)
return authentication.SASTokenAsync.from_shared_access_key(
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)