Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getWarningLevel( t_tld_orig, item ):
w_level = 0
if item in t_help:
return 0
if not item.startswith('http'):
item = 'https://'+item
tmp_parse = urlparse( item )
tmp_tld = tldextract.extract( tmp_parse.netloc )
# print(tmp_parse)
if tmp_tld.subdomain == t_tld_orig.subdomain and tmp_tld.domain == t_tld_orig.domain and tmp_tld.suffix == t_tld_orig.suffix:
w_level = 1
elif tmp_tld.domain == t_tld_orig.domain and tmp_tld.suffix == t_tld_orig.suffix:
w_level = 2
else:
w_level = 3
if '*' in tmp_parse.netloc:
w_level+=1
return w_level
def grabSubs( domain ):
print( "[+] Grabbing subdomains from crt.sh: %s" % domain )
url = 'https://crt.sh/?q=%25.' + domain + '&output=json'
try:
ex = 0
r = requests.get( url )
except Exception as e:
ex = 1
print( colored("[-] error occured: %s" % e, 'red') )
if ex == 0 and r.status_code == 200:
n = 0
j = r.json()
for item in j:
parse = tldextract.extract( item['name_value'] )
sub = item['name_value'].replace( '*.', '' )
if sub != domain and not sub in t_subs:
t_subs.append( sub )
try:
ex = 0
data = socket.gethostbyname( sub )
if not data in t_ips:
n = n + 1
t_ips.append( data )
except Exception as e:
ex = 1
print( colored("[+] %d subdomains found, %d ips added" % (len(t_subs),n), 'green') )
rule = line.split('$')[0]
if is_acceptable_rule(rule):
rules.append(rule)
except Exception:
logger.exception('Unexpected error while applying easylist rules.')
abr = AdblockRules(rules)
elapsed = timeit.default_timer() - start_time
logger.info('Took %i secs to parse easylist rules' % elapsed)
i = 0
for url in third_party_requests:
if abr.should_block(url):
ext = tldextract.extract(url)
trackers.append("{}.{}".format(ext.domain, ext.suffix))
i = i + 1
if i % 20 == 0:
elapsed = timeit.default_timer() - start_time
logger.info("Checked %i domains, %i secs elapsed..." % (i, elapsed))
return list(set(trackers))
def get_related_domains(self):
result = []
main_of_domain = tldextract.extract(self.domain).domain
reg_urls = re.compile('<a href="\?id=(.*?)">
urls = reg_urls.findall(self.resp)
reg_domains = re.compile('DNS:(.*?)<br>') #DNS:*.jdpay.com<br>
for item in urls:
url = "https://crt.sh/?id={0}".format(item)
resp = req.get(url, proxies=self.proxy).content
reg_common_name = re.compile("Subject:<br>(.*?)<br>")
common_name = reg_common_name.findall(resp)
if len(common_name) !=0:
common_name = common_name[0].replace(" ", "").split("=")[-1]
main_of_cn_domain = tldextract.extract(common_name).domain</a>
def get_root_domain(full_link: str, use_www=True) ->(False, str, str, str, str, str, str):
"""
get the root domain from url
:param full_link: e.g "http://www.google.com"
:return:Tuple(True is the domain is root domain else Sub-domain, the real root domain, link to root domain,
link to sub.domain, sub.domain, suffix of the domain, domain pure)
"""
scheme = "http"
if full_link.startswith("https"):
scheme = "https"
#scheme, target_domain, a, b, c = urlsplit(full_link)
#scheme = urlsplit(full_link)[0]
scheme += "://"
#ext = tldextract.extract(target_domain)
ext = tldextract.extract(full_link)
root = ext.domain+"."+ext.suffix
prefix = "www."
if len(ext.domain) == 0 or len(ext.suffix) == 0:
return False, "", "", "", "", "", ""
elif ext.subdomain is None or len(ext.subdomain) == 0:
if use_www and prefix not in full_link:
return True, root, scheme+prefix+root, scheme+prefix+root, prefix+root, ext.suffix, ext.domain
else:
return True, root, scheme+root, scheme+root, root, ext.suffix, ext.domain
else:
sub_domain = ext.subdomain+"."+root
if use_www:
return False, root, scheme+prefix+root, scheme+sub_domain, sub_domain, ext.suffix, ext.domain
else:
return False, root, scheme+root, scheme+sub_domain, sub_domain, ext.suffix, ext.domain
def setup(self, settings):
'''
Setup redis and tldextract
'''
self.extract = tldextract.TLDExtract()
self.redis_conn = redis.Redis(host=settings['REDIS_HOST'],
port=settings['REDIS_PORT'],
db=settings.get('REDIS_DB'))
try:
self.redis_conn.info()
self.logger.debug("Connected to Redis in ScraperHandler")
except ConnectionError:
self.logger.error("Failed to connect to Redis in ScraperHandler")
# plugin is essential to functionality
sys.exit(1)
urldict = {}
skipped = 0
for urldir in os.listdir(outputdir):
if urldir in INFOFILES:
continue
try:
urlfile = os.path.join(outputdir, urldir, '__urls')
related_urls = get_unique_urls(urldir, urlfile)
except IOError or OSError as einfo:
print "Unable to read", urldir, einfo, "skipping"
continue
TLDio = TLDExtract(cache_file='mozilla_tld_file.dat')
for dirty_url in related_urls:
# dirty_url because may contain ":"
if dirty_url.split(':') != -1:
url = dirty_url.split(':')[0]
else:
url = dirty_url
if urldict.has_key(url):
skipped +=1
continue
dnsplit= TLDio(url)
urldict.update({url : {
'domain' : dnsplit.domain,
'tld' : dnsplit.suffix,
# from the host or the host from the port. If there's a scheme we
# want to limit it to http or https.
if ':' in url:
scheme, host = url.split(':', 1)
# If there's a . in the scheme, then there wasn't a scheme
# and the : is delimiting the host from the port
if '.' not in scheme and scheme not in ('http', 'https'):
return u''
# Get a thread-local extractor if there is one. If not, create it.
extractor = getattr(_cached_tldextract, 'extractor', None)
if extractor is None:
# FIXME - This uses the tld set included with tldextract which
# will age over time. We should fix this so that we get a new
# file on deployment and use that file.
extractor = tldextract.TLDExtract(
suffix_list_url=None, # disable fetching the file via http
)
_cached_tldextract.extractor = extractor
res = extractor(url)
# If there's no tld, then this is probably an ip address or
# localhost. Also ignore .mil and .arpa addresses.
if res.suffix in ('', 'mil', 'in-addr.arpa'):
return u''
# Suffix is the tld. We want that plus the next level up.
return res.domain.decode('utf-8') + u'.' + res.suffix.decode('utf-8')
logger.setLevel(logging.INFO)
""" Print version """
logger.info(pkg_resources.require("music_dl")[0])
""" Validate parameters """
logger.info('Validating parameters...')
try:
# Validate download url
url_parsed = urlparse(self.download_url)
if not url_parsed.scheme.startswith('http'):
raise DirectoryException('Invalid URL. URL must start with http*. Input value is {}'.format(self.download_url))
tld_parsed = tldextract.extract(self.download_url)
if not (tld_parsed.domain in ['youtube', 'soundcloud']):
raise DirectoryException('Invalid URL. Music Downloader supports only YouTube and SoundCloud. Input value is {}'.format(self.download_url))
# Validate download directory
if not is_path_exists_or_creatable(self.working_dir):
raise DirectoryException('Invalid directory. Please specify valid download directory. Input value is {}'.format(self.working_dir))
except DirectoryException as e:
logger.error(e.message)
logger.fatal('Aborted.')
exit()
# Validate playlist configuration
try:
self.playlist.validate()
except PlaylistParameterException as e:
# if the file type is a media type, reject instantly
if file_type and file_type not in ALLOWED_TYPES:
if verbose: print('\t%s rejected due to bad filetype' % url)
return False
last_chunk = path_chunks[-1].split('.')
# the file type is not of use to use anymore, remove from url
if len(last_chunk) > 1:
path_chunks[-1] = last_chunk[-2]
# Index gives us no information
if 'index' in path_chunks:
path_chunks.remove('index')
# extract the tld (top level domain)
tld_dat = tldextract.extract(url)
subd = tld_dat.subdomain
tld = tld_dat.domain.lower()
url_slug = path_chunks[-1] if path_chunks else ''
if tld in BAD_DOMAINS:
if verbose: print('%s caught for a bad tld' % url)
return False
if len(path_chunks) == 0:
dash_count, underscore_count = 0, 0
else:
dash_count = url_slug.count('-')
underscore_count = url_slug.count('_')
# If the url has a news slug title