Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
args = get_args()
logging.basicConfig(
level=logging.INFO, stream=sys.stderr,
format='%(asctime)s - %(levelname)s (%(name)s): %(message)s')
logger = logging.getLogger('urlextract')
try:
urlextract = URLExtract()
if args.ignore_file:
urlextract.load_ignore_list(args.ignore_file)
urlextract.update_when_older(30)
content = args.input_file.read()
for url in urlextract.find_urls(content, args.unique):
print(url)
except CacheFileError as e:
logger.error(str(e))
sys.exit(-1)
finally:
args.input_file.close()
import regex
from urlextract import URLExtract
EMAIL_REGEX = regex.compile(
r'[\p{L}0-9]+[\p{L}0-9_.+-]*[\p{L}0-9_+-]+@[\p{L}0-9]+[\p{L}0-9.-]*\.\p{L}+' # noqa
)
PUNCTUATION_SIGNS = set('.,;:¡!¿?…⋯&‹›«»\"“”[]()⟨⟩}{/|\\')
url_extractor = URLExtract()
def clean_text(text, allowed_chars='- '):
text = ' '.join(text.lower().split())
text = ''.join(ch for ch in text if ch.isalnum() or ch in allowed_chars)
return text
def contains_letters(word):
return any(ch.isalpha() for ch in word)
def contains_numbers(word):
return any(ch.isdigit() for ch in word)
parser.add_argument(
'input_file', nargs='?', metavar='',
type=argparse.FileType(), default=sys.stdin,
help='input text file with URLs to extract')
parsed_args = parser.parse_args()
return parsed_args
args = get_args()
logging.basicConfig(
level=logging.INFO, stream=sys.stderr,
format='%(asctime)s - %(levelname)s (%(name)s): %(message)s')
logger = logging.getLogger('urlextract')
try:
urlextract = URLExtract()
if args.ignore_file:
urlextract.load_ignore_list(args.ignore_file)
urlextract.update_when_older(30)
content = args.input_file.read()
for url in urlextract.find_urls(content, args.unique):
print(url)
except CacheFileError as e:
logger.error(str(e))
sys.exit(-1)
finally:
args.input_file.close()
if not os.path.exists(dir_path_user):
try:
os.makedirs(dir_path_user, exist_ok=True)
except PermissionError:
# if PermissionError exception is raised we should continue
# and try to set the last fallback dir
pass
if os.access(dir_path_user, os.W_OK):
return dir_path_user
dir_path_temp = tempfile.gettempdir()
if os.access(dir_path_temp, os.W_OK):
return dir_path_temp
raise CacheFileError("Cache directories are not writable.")
def _get_default_cache_file_path(self):
"""
Returns default cache file path
:return: default cache file path (to data directory)
:rtype: str
"""
default_list_path = os.path.join(
self._get_default_cache_dir(), self._CACHE_FILE_NAME)
if not os.access(default_list_path, os.F_OK):
raise CacheFileError(
"Default cache file does not exist "
"'{}'!".format(default_list_path)
)
return default_list_path
def _load_cached_tlds(self):
"""
Loads TLDs from cached file to set.
:return: Set of current TLDs
:rtype: set
"""
# check if cached file is readable
if not os.access(self._tld_list_path, os.R_OK):
self._logger.error("Cached file is not readable for current "
"user. ({})".format(self._tld_list_path))
raise CacheFileError(
"Cached file is not readable for current user."
)
set_of_tlds = set()
with open(self._tld_list_path, 'r') as f_cache_tld:
for line in f_cache_tld:
tld = line.strip().lower()
# skip empty lines
if not tld:
continue
# skip comments
if tld[0] == '#':
continue
set_of_tlds.add("." + tld)
set_of_tlds.add("." + idna.decode(tld))
def _get_cache_file_path(self, cache_dir=None):
"""
Get path for cache file
:param str cache_dir: base path for TLD cache, defaults to data dir
:raises: CacheFileError when cached directory is not writable for user
:return: Full path to cached file with TLDs
:rtype: str
"""
if cache_dir is None:
# Tries to get writable cache dir with fallback to users data dir
# and temp directory
cache_dir = self._get_writable_cache_dir()
else:
if not os.access(cache_dir, os.W_OK):
raise CacheFileError("None of cache directories is writable.")
# get directory for cached file
return os.path.join(cache_dir, self._CACHE_FILE_NAME)
def _get_extractor(syntax: Syntax):
from urlextract import URLExtract # type: ignore
u = URLExtract()
# https://github.com/lipoja/URLExtract/issues/13
if syntax in {'org', 'orgmode', 'org-mode'}: # TODO remove hardcoding..
u._stop_chars_right |= {'[', ']'}
u._stop_chars_left |= {'[', ']'}
elif syntax in {'md', 'markdown'}:
pass
# u._stop_chars_right |= {','}
# u._stop_chars_left |= {','}
return u
def findHttpUrls(searchRootDirectory):
alterableUrlsStore = {}
nonAlterableUrlsStore = {}
invalidUrlsStore = {}
extractor = URLExtract()
lengthOfOriginalRootPath = -1
for root, _, files in os.walk(searchRootDirectory, onerror=None):
if lengthOfOriginalRootPath == -1:
lengthOfOriginalRootPath = len(root)
for filename in files:
if pathlib.Path(filename).suffix in ['.props', '.pyproj', '.vcxproj', '.snk'] or '.git' in root:
continue
absoluteFilePath = os.path.join(root, filename)
relativeFilePath = '.' + absoluteFilePath[lengthOfOriginalRootPath:]
try:
with open(absoluteFilePath, "rb") as f:
data = f.read()
try:
data = data.decode("utf-8")
except Exception as e:
print("Unable to decodefile: {} in UTF-8 Encoding.".format(relativeFilePath))
def run(self, params={}):
p = HTMLTableParser()
p.feed(params.get(Input.TAP_ALERT))
data = p.tables
clean_data = TAP(data).data
# Get the Threat details URL which is NOT an HTML table element, but instead the <a> link of the
# table element
extractor = URLExtract()
cleaned_input_for_extractor = params.get(Input.TAP_ALERT)
cleaned_input_for_extractor.replace('\n', '')
urls_from_input = extractor.find_urls(cleaned_input_for_extractor)
threat_details_urls = list(filter(lambda u: r'threat/email' in u and r'threatinsight.proofpoint.com' in u[:40],
urls_from_input))
if threat_details_urls:
clean_data['threat']['threat_details_url'] = threat_details_urls[0]
return {Output.RESULTS: clean_data}
</a>