Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
self.assertGreaterEqual(count, 1)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.searchhopdomain(hop['domains'][0])
)
self.assertGreaterEqual(count, 1)
# Indexes
addr = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.flt_empty
))['addr']
addr_net = '.'.join(addr.split('.')[:3]) + '.0/24'
queries = [
ivre.db.db.nmap.searchhost(addr),
ivre.db.db.nmap.searchnet(addr_net),
ivre.db.db.nmap.searchrange(max(ivre.utils.ip2int(addr) - 256, 0),
min(ivre.utils.ip2int(addr) + 256,
4294967295)),
]
for query in queries:
result = ivre.db.db.nmap.get(query)
count = ivre.db.db.nmap.count(query)
if DATABASE == "mongo":
nscanned = json.loads(ivre.db.db.nmap.explain(
ivre.db.db.nmap._get(query)
))
try:
nscanned = nscanned['nscanned']
except KeyError:
nscanned = nscanned['executionStats']['totalDocsExamined']
self.assertEqual(count, nscanned)
self.assertEqual(
)
self.assertEqual(count, hosts_count)
nets = ivre.utils.range2nets(addrrange)
count = 0
for net in nets:
count += ivre.db.db.nmap.count(
ivre.db.db.nmap.searchnet(net)
)
start, stop = (ivre.utils.ip2int(addr) for addr in
ivre.utils.net2range(net))
for addr in ivre.db.db.nmap.distinct(
"addr",
flt=ivre.db.db.nmap.searchnet(net),
):
addr = ivre.utils.ip2int(ivre.db.db.nmap.internal2ip(addr))
self.assertTrue(start <= addr <= stop)
self.assertEqual(count, addr_range_count)
# Networks in `nets` are separated sets
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.flt_and(
*(ivre.db.db.nmap.searchnet(net) for net in nets)
)
)
self.assertEqual(count, 0 if len(nets) > 1 else addr_range_count)
count = ivre.db.db.nmap.count(
ivre.db.db.nmap.flt_or(
*(ivre.db.db.nmap.searchnet(net) for net in nets)
)
)
self.assertEqual(count, addr_range_count)
self.assertIsNone(ivre.utils.get_addr_type('8.8.8.8'))
self.assertEqual(ivre.utils.get_addr_type('10.0.0.0'), 'Private')
self.assertIsNone(ivre.utils.get_addr_type('100.63.255.255'))
self.assertEqual(ivre.utils.get_addr_type('100.67.89.123'), 'CGN')
self.assertEqual(ivre.utils.get_addr_type('239.255.255.255'),
'Multicast')
self.assertEqual(ivre.utils.get_addr_type('240.0.0.0'), 'Reserved')
self.assertEqual(ivre.utils.get_addr_type('255.255.255.254'),
'Reserved')
self.assertEqual(ivre.utils.get_addr_type('255.255.255.255'),
'Broadcast')
# ip2int() / int2ip()
self.assertEqual(ivre.utils.ip2int("1.0.0.1"), (1 << 24) + 1)
self.assertEqual(ivre.utils.int2ip((1 << 24) + 1), "1.0.0.1")
self.assertEqual(ivre.utils.ip2int('::2:0:0:0:2'), (2 << 64) + 2)
self.assertEqual(ivre.utils.int2ip((2 << 64) + 2), '::2:0:0:0:2')
# Math utils
# http://stackoverflow.com/a/15285588/3223422
def is_prime(n):
if n == 2 or n == 3:
return True
if n < 2 or n % 2 == 0:
return False
if n < 9:
return True
if n % 3 == 0:
return False
r = int(n**0.5)
f = 5
while f <= r:
self.assertGreater(out.count(b'\n'), result)
result = ivre.db.db.passive.count(
ivre.db.db.passive.searchhost("127.12.34.56")
)
self.assertEqual(result, 0)
addrrange = sorted(
(
ivre.db.db.passive.internal2ip(x)
for x in ivre.db.db.passive.distinct(
'addr',
flt=ivre.db.db.passive.searchipv4(),
) if x
),
key=ivre.utils.ip2int,
)
self.assertGreaterEqual(len(addrrange), 2)
if len(addrrange) < 4:
addrrange = [addrrange[0], addrrange[-1]]
else:
addrrange = [addrrange[1], addrrange[-2]]
result = ivre.db.db.passive.count(
ivre.db.db.passive.searchrange(*addrrange)
)
self.assertGreaterEqual(result, 2)
addresses_1 = [
ivre.db.db.passive.internal2ip(x)
for x in ivre.db.db.passive.distinct(
'addr',
flt=ivre.db.db.passive.searchrange(*addrrange),
)
def searchhost(addr, neg=False):
"""Filters (if `neg` == True, filters out) one particular host
(IP address).
"""
try:
addr = utils.ip2int(addr)
except (TypeError, utils.socket.error):
pass
return {'addr': {'$ne': addr} if neg else addr}
def r2res(r):
return [r2time(r), utils.ip2int(r['addr']),
r['openports']['count']]
else:
def __init__(self, start, stop, categories=None, rand=True, maxnbr=None,
state=None):
Target.__init__(
self,
geoiputils.IPRanges(ranges=[(utils.ip2int(start),
utils.ip2int(stop))]),
rand=rand, maxnbr=maxnbr, state=state
)
if categories is None:
categories = ['RANGE-%s-%s' % (start, stop)]
self.infos = {'categories': categories}
get_notepad_pages = lambda: [
ivre.utils.ip2int(x[:-4])
for x in os.listdir("/var/lib/dokuwiki/data/pages")
if x.endswith('.txt') and ipaddr.match(x[:-4])
]
def convert(addr):
try:
return utils.ip2int(addr)
except (TypeError, utils.socket.error):
return addr
return {'addr': {'$nin' if neg else '$in': [convert(host)
# modified in the loop.
i = 0
while i < len(itrs):
try:
next_recs.append(next(itrs[i]))
except StopIteration:
# We need to remove the corresponding iterator from itrs,
# which happens to be the n-th where n is the current
# length of next_recs.
del itrs[len(next_recs)] # Do not increment i here
else:
i += 1
next_addrs = [rec['addr'] for rec in next_recs]
cur_rec = None
try:
cur_addr = min(next_addrs, key=utils.ip2int)
except ValueError:
# next_addrs is empty
cur_addr = None
while next_recs:
# We cannot use a `for i in range(len(itrs))` loop because
# itrs is modified in the loop.
i = 0
while i < len(itrs):
if next_addrs[i] == cur_addr:
cur_rec = next_record(cur_rec, next_recs[i])
try:
next_recs[i] = next(itrs[i])
except StopIteration:
del next_addrs[i]
del next_recs[i]
del itrs[i]