Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _check(self):
if os.getenv('CI'):
# on CI we should always be online
# I assume that all CI services set the CI environment variable!
return False
resolver = aiodns.DNSResolver()
try:
with timeout(1):
await resolver.query('google.com', 'A')
except (aiodns.error.DNSError, asyncio.TimeoutError) as e:
print(f'\nnot online: {e.__class__.__name__} {e}\n', file=sys.stderr)
return True
else:
return False
async def query(self, sub):
"""
Query domain
:param sub:
:return:
"""
ret = None
# root domain
if sub == '@' or sub == '':
sub_domain = self.domain
else:
sub = ''.join(sub.rsplit(self.domain, 1)).rstrip('.')
sub_domain = '{sub}.{domain}'.format(sub=sub, domain=self.domain)
try:
ret = await self.resolver.query(sub_domain, 'A')
except aiodns.error.DNSError as e:
err_code, err_msg = e.args[0], e.args[1]
# 1: DNS server returned answer with no data
# 4: Domain name not found
# 11: Could not contact DNS servers
# 12: Timeout while contacting DNS servers
if err_code not in [1, 4, 11, 12]:
logger.warning('{domain} {exception}'.format(domain=sub_domain, exception=e))
except Exception as e:
logger.info(sub_domain)
logger.warning(traceback.format_exc())
else:
ret = [r.host for r in ret]
domain_ips = [s for s in ret]
# It is a wildcard domain name and
# the subdomain IP that is burst is consistent with the IP
def dns_resolve(loop, name):
logger.info('Resolving: {}'.format(name))
try:
result = yield from resolver.query(name, 'A')
except aiodns.error.DNSError as e:
raise RuntimeError('Could not resolve name:', name)
return result[0].host
continue
except Exception as e:
fail_mode = str(e)
raise
print("Download failure:", fail_mode)
continue
else:
# handle http/https download --
try:
digest = None
with async_timeout.timeout(timeout):
fail_mode, digest = await http_fetch(real_uri, outfile, digest_func)
except asyncio.TimeoutError as e:
fail_mode = "timeout"
continue
except aiodns.error.DNSError as e:
fail_mode = "dnsfail"
continue
except ValueError as e:
fail_mode = "bad_url"
continue
except aiohttp.errors.ClientOSError as e:
fail_mode = "refused"
continue
except Exception as e:
fail_mode = str(e)
raise
print("Download failure:", fail_mode)
continue
del progress_map[d_id]
async def get_mx_ip(hostname):
'''Get MX record by hostname.
'''
if hostname not in MX_DNS_CACHE:
try:
resolver = aiodns.DNSResolver()
MX_DNS_CACHE[hostname] = await resolver.query(hostname, 'MX')
except aiodns.error.DNSError as e:
MX_DNS_CACHE[hostname] = None
return MX_DNS_CACHE[hostname]
async def mx_records(self, domain_part: str) -> MXRecords:
"""Resolve MX records for a domain returning a list of tuples with the
MX priority and value.
:param domain_part: The domain to resolve MX records for
:type domain_part: str
:rtype: :data:`~email_normalize.MXRecords`
"""
if self._skip_cache(domain_part):
try:
records = await self._resolver.query(domain_part, 'MX')
except error.DNSError as err:
LOGGER.debug('Failed to resolve %r: %s', domain_part, err)
if not self.cache_failures:
return []
mx_records, ttl = [], self.failure_ttl
else:
mx_records = [(r.priority, r.host) for r in records]
ttl = min(r.ttl for r in records) \
if records else self.failure_ttl
# Prune the cache if it's >= the limit, finding least used, oldest
if len(self.cache.keys()) >= self.cache_limit:
del self.cache[sorted(
self.cache.items(),
key=lambda i: (i[1].hits, i[1].last_access))[0][0]]
self.cache[domain_part] = CachedItem(
def _callback(fut: asyncio.Future, result: Any, errorno: int) -> None:
if fut.cancelled():
return
if errorno is not None:
fut.set_exception(error.DNSError(errorno, pycares.errno.strerror(errorno)))
else:
fut.set_result(result)
def query_ip(self,ip):
if ip not in self.rlookup_history:
try:
hostnames = yield from self.resolver.query(pycares.reverse_address(ip),"PTR")
except aiodns.error.DNSError:
hostnames = []
self.rlookup_history[ip] = hostnames
return self.rlookup_history[ip]
async def resolve(self, domain, last_known_id=None):
if domain.startswith('.'):
return STSFetchResult.NONE, None
# Cleanup domain name
domain = domain.rstrip('.')
# Construct name of corresponding MTA-STS DNS record for domain
sts_txt_domain = '_mta-sts.' + domain
# Try to fetch it
try:
txt_records = await asyncio.wait_for(
self._resolver.query(sts_txt_domain, 'TXT'),
timeout=self._timeout)
except aiodns.error.DNSError as error:
if error.args[0] == aiodns.error.ARES_ETIMEOUT: # pragma: no cover pylint: disable=no-else-return,no-member
# This branch is not covered because of aiodns bug:
# https://github.com/saghul/aiodns/pull/64
# It's hard to decide what to do in case of timeout
# Probably it's better to threat this as fetch error
# so caller probably shall report such cases.
return STSFetchResult.FETCH_ERROR, None
elif error.args[0] == aiodns.error.ARES_ENOTFOUND: # pylint: disable=no-else-return,no-member
return STSFetchResult.NONE, None
elif error.args[0] == aiodns.error.ARES_ENODATA: # pylint: disable=no-else-return,no-member
return STSFetchResult.NONE, None
else: # pragma: no cover
return STSFetchResult.NONE, None
except asyncio.TimeoutError:
return STSFetchResult.FETCH_ERROR, None