Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
result = ivre.db.db.nmap.get(
ivre.db.db.nmap.searchhopdomain(re.compile('.'))
)
hop = random.choice([
hop for hop in
reduce(lambda x, y: x['hops'] + y['hops'],
next(result)['traces'],
{'hops': []})
if 'domains' in hop and hop['domains']
])
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchhop(hop['ipaddr'])
)
self.assertGreaterEqual(count, 1)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchhopdomain(hop['domains'][0])
)
self.assertGreaterEqual(count, 1)
# Indexes
addr = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.flt_empty
))['addr']
addr_net = '.'.join(addr.split('.')[:3]) + '.0/24'
queries = [
ivre.db.db.nmap.searchhost(addr),
ivre.db.db.nmap.searchnet(addr_net),
ivre.db.db.nmap.searchrange(max(ivre.utils.ip2int(addr) - 256, 0),
min(ivre.utils.ip2int(addr) + 256,
4294967295)),
]
for query in queries:
ivre.db.db.nmap.searchbanner(re.compile("^SSH-"))
)
self.check_value("nmap_ssh_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchvncauthbypass())
self.check_value("nmap_vncauthbypass_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchmssqlemptypwd())
self.check_value("nmap_mssql_emptypwd_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchmysqlemptypwd())
self.check_value("nmap_mysql_emptypwd_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchxp445())
self.check_value("nmap_xp445_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchtorcert())
self.check_value("nmap_torcert_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchgeovision())
self.check_value("nmap_geovision_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchwebcam())
self.check_value("nmap_webcam_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchphonedev())
self.check_value("nmap_phonedev_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchnetdev())
self.check_value("nmap_netdev_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchdomain("com"))
# Test case OK?
self.assertGreater(count, 0)
self.check_value("nmap_domain_com_count", count)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchdomain("com", neg=True)
)
self.check_value("nmap_not_domain_com_count", count)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchdomain(re.compile("^(com|net)$"),
neg=True)
fields=['endtime'],
sort=[['endtime', -1]]
))['endtime']
)
)
self.assertEqual(count, hosts_count)
nets = ivre.utils.range2nets(addrrange)
count = 0
for net in nets:
count += ivre.db.db.nmap.count(
ivre.db.db.nmap.searchnet(net)
)
start, stop = (ivre.utils.ip2int(addr) for addr in
ivre.utils.net2range(net))
for addr in ivre.db.db.nmap.distinct(
"addr",
flt=ivre.db.db.nmap.searchnet(net),
):
addr = ivre.utils.ip2int(ivre.db.db.nmap.internal2ip(addr))
self.assertTrue(start <= addr <= stop)
self.assertEqual(count, addr_range_count)
# Networks in `nets` are separated sets
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.flt_and(
*(ivre.db.db.nmap.searchnet(net) for net in nets)
)
)
self.assertEqual(count, 0 if len(nets) > 1 else addr_range_count)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.flt_or(
*(ivre.db.db.nmap.searchnet(net) for net in nets)
if DATABASE == "mongo":
nscanned = json.loads(ivre.db.db.nmap.explain(
ivre.db.db.nmap._get(query)
))
try:
nscanned = nscanned['nscanned']
except KeyError:
nscanned = nscanned['executionStats']['totalDocsExamined']
self.assertEqual(count, nscanned)
self.assertEqual(
query,
ivre.db.db.nmap.str2flt(ivre.db.db.nmap.flt2str(query))
)
# FIXME: test PostgreSQL indexes
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchx11())
self.check_value("nmap_x11_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchx11access())
self.check_value("nmap_x11access_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchnfs())
self.check_value("nmap_nfs_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchypserv())
self.check_value("nmap_nis_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchphpmyadmin())
self.check_value("nmap_phpmyadmin_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchwebfiles())
self.check_value("nmap_webfiles_count", count)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchbanner(re.compile("^SSH-"))
)
self.check_value("nmap_ssh_count", count)
count = ivre.db.db.nmap.count(ivre.db.db.nmap.searchvncauthbypass())
field = action[10:]
if field[0] in '-!':
field = field[1:]
least = True
else:
least = False
topnbr = 15
if ':' in field:
field, topnbr = field.rsplit(':', 1)
try:
topnbr = int(topnbr)
except ValueError:
field = '%s:%s' % (field, topnbr)
topnbr = 15
series = [{"label": t['_id'], "value": t['count']} for t in
db.nmap.topvalues(field, flt=flt,
least=least, topnbr=topnbr,
archive=archive)]
if callback is None:
sys.stdout.write("%s\n" % json.dumps(series))
else:
sys.stdout.write("%s(%s);\n" % (callback, json.dumps(series)))
exit(0)
# extract info
if action in ["onlyips", "ipsports", "timeline", "coordinates",
"countopenports", "diffcats"]:
preamble = "[\n"
postamble = "]\n"
r2res = lambda x: x
if action == "timeline":
if hasattr(db.nmap, "get_open_port_count"):