Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# sqlite3.OperationalError: Expression tree is too
# large (maximum depth 10000)
continue
res, out, err = RUN(["ivre", "ipinfo", "--count", "--country",
cname])
self.assertEqual(ret, 0)
self.assertTrue(not err)
self.check_value("passive_count_country_%s" % cname, int(out))
# Delete
flt = ivre.db.db.passive.searchcert()
count = ivre.db.db.passive.count(flt)
# Test case OK?
self.assertGreater(count, 0)
ivre.db.db.passive.remove(flt)
new_count = ivre.db.db.passive.count(
ivre.db.db.passive.flt_empty
)
self.assertEqual(count + new_count, total_count)
ret, out, _ = RUN(["ivre", "ipinfo", "--short"])
self.assertEqual(ret, 0)
count = sum(1 for _ in out.splitlines())
self.check_value("passive_ipinfo_short_count", count)
ret, out, _ = RUN(["ivre", "iphost", "/./"])
self.assertEqual(ret, 0)
count = sum(1 for _ in out.splitlines())
self.check_value("passive_iphost_count", count)
ret, out, _ = RUN(["ivre", "iphost", "--sub", "com"])
self.assertEqual(ret, 0)
def test_10_data(self):
"""ipdata (Maxmind, thyme.apnic.net) functions"""
# Download
res = RUN(["ivre", "ipdata", "--download"])[0]
self.assertEqual(res, 0)
# Reinit passive DB since we have downloaded the files
ivre.db.db.data.reload_files()
if DATABASE != "maxmind":
print(u"Database files have been downloaded -- "
u"other data tests won't run")
return
# CSV creation -- disabled on Travis CI: this is way too slow.
# Files are downloaded from ivre.rocks in .travis.yml instead,
# and "touched" here to make sure they are newer than the
# .mmdb files. Only the Country file is created.
for sub in ['ASN', 'City']:
fname = os.path.join(ivre.config.GEOIP_PATH,
'GeoLite2-%s.dump-IPv4.csv' % sub)
if os.path.isfile(fname):
os.utime(fname, None)
fname = os.path.join(ivre.config.GEOIP_PATH,
# Get a scan id
scanid = next(iter(ivre.db.db.agent.get_scans()))
# Lock it
locked_scan = ivre.db.db.agent.lock_scan(scanid)
self.assertIsInstance(locked_scan, dict)
self.assertEqual(locked_scan['pid'], os.getpid())
self.assertIsNotNone(locked_scan.get('lock'))
# Check one scan is locked with our PID
res, out, _ = RUN(["ivre", "runscansagentdb", "--list-scans"])
self.assertEqual(res, 0)
self.assertTrue((' - locked (by %d)\n' % os.getpid()).encode() in out)
# Attempt to lock it again
with(self.assertRaises(ivre.db.LockError)):
ivre.db.db.agent.lock_scan(scanid)
# Unlock it
self.assertEqual(ivre.db.db.agent.unlock_scan(locked_scan), True)
# Attempt to unlock it again
with(self.assertRaises(ivre.db.LockError)):
ivre.db.db.agent.unlock_scan(locked_scan)
with(self.assertRaises(ivre.db.LockError)):
ivre.db.db.agent.unlock_scan(ivre.db.db.agent.get_scan(scanid))
# Check no scan is locked
res, out, _ = RUN(["ivre", "runscansagentdb", "--list-scans"])
self.assertEqual(res, 0)
self.assertTrue(b' - locked' not in out)
# Lock the scan again
locked_scan = ivre.db.db.agent.lock_scan(scanid)
self.assertIsInstance(locked_scan, dict)
self.assertEqual(locked_scan['pid'], os.getpid())
self.assertIsNotNone(locked_scan.get('lock'))
# Check one scan is locked with our PID
res, out, _ = RUN(["ivre", "runscansagentdb", "--list-scans"])
addr = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.flt_empty, fields=["addr"]
))['addr']
self.check_nmap_count_value(1, ivre.db.db.nmap.searchhost(addr),
['--host', ivre.utils.force_int2ip(addr)],
ivre.utils.force_int2ip(addr))
result = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.searchhost(addr)
))
self.assertEqual(result['addr'], addr)
self.check_count_value_api(1, ivre.db.db.nmap.flt_and(
ivre.db.db.nmap.searchhost(addr),
ivre.db.db.nmap.searchhost(addr),
), database=ivre.db.db.nmap)
recid = ivre.db.db.nmap.getid(
next(ivre.db.db.nmap.get(ivre.db.db.nmap.flt_empty))
)
self.check_count_value_api(1, ivre.db.db.nmap.searchid(recid),
database=ivre.db.db.nmap)
self.assertIsNotNone(
ivre.db.db.nmap.getscan(
ivre.db.db.nmap.getscanids(
next(ivre.db.db.nmap.get(ivre.db.db.nmap.flt_empty))
)[0]
)
)
self.check_nmap_count_value(0,
ivre.db.db.nmap.searchhost("127.12.34.56"),
["--host", "127.12.34.56"], "127.12.34.56")
generator = ivre.db.db.nmap.get(ivre.db.db.nmap.flt_empty)
out = sys.stdout
if args.plot and plt is None:
utils.LOGGER.critical("Matplotlib is required for --plot")
sys.exit(-1)
if args.init:
if os.isatty(sys.stdin.fileno()):
out.write(
'This will remove any flow result in your database. '
'Process ? [y/N] ')
ans = input()
if ans.lower() != 'y':
sys.exit(-1)
db.flow.init()
sys.exit(0)
if args.ensure_indexes:
if os.isatty(sys.stdin.fileno()):
out.write(
'This will lock your database. '
'Process ? [y/N] ')
ans = input()
if ans.lower() != 'y':
sys.exit(-1)
db.flow.ensure_indexes()
sys.exit(0)
if args.fields is not None and not args.fields:
# Print fields list
print_fields()
'%r %r not understood (this is probably a bug).\n' % (o, a))
sys.exit(-1)
first = True
flts = []
for a in args:
if first:
first = False
else:
print()
if utils.IPADDR.search(a) or a.isdigit():
flts.append(db.passive.flt_and(baseflt, db.passive.searchhost(a)))
else:
flts += [
db.passive.flt_and(
baseflt,
db.passive.searchdns(
utils.str2regexp(a), subdomains=subdomains)),
db.passive.flt_and(
baseflt,
db.passive.searchdns(
utils.str2regexp(a),
reverse=True, subdomains=subdomains))
]
for flt in flts:
for r in db.passive.get(flt, sort=[('source', 1)]):
disp_rec(r)
def main():
if USING_ARGPARSE:
parser = argparse.ArgumentParser(
description='Access and query the active scans database.',
parents=[db.db.nmap.argparser, CLI_ARGPARSER],
)
else:
parser = optparse.OptionParser(
description='Access and query the active scans database.',
)
for args, kargs in chain(db.db.nmap.argparser.args,
CLI_ARGPARSER.args):
parser.add_option(*args, **kargs)
parser.parse_args_orig = parser.parse_args
parser.parse_args = lambda: parser.parse_args_orig()[0]
parser.add_argument = parser.add_option
parser.add_argument('--no-screenshots', action='store_true',
help='When used with --json, do not output '
'screenshots data.')
parser.add_argument('--honeyd', action='store_true',
help='Output results as a honeyd config file.')
for r in firstrecs:
if 'addr' in r:
print(utils.force_int2ip(r['addr']), end=' ')
else:
print(r['targetval'], end=' ')
disp_rec(r)
sys.stdout.flush()
# 2. loop
try:
while True:
prevtime = r[field]
time.sleep(1)
for r in db.passive.get(
db.passive.flt_and(
baseflt,
db.passive.searchnewer(prevtime,
new=field == 'firstseen'),
),
sort=[(field, 1)]):
if 'addr' in r:
print(utils.force_int2ip(r['addr']), end=' ')
else:
print(r['targetval'], end=' ')
disp_rec(r)
sys.stdout.flush()
except KeyboardInterrupt:
pass
def callback(x):
return db.view.store_or_merge_host(
nmap_record_to_view(x)
)
parser.add_argument = parser.add_option
parser.add_argument('-s', '--sensor')
parser.add_argument('-c', '--count', action="store_true")
parser.add_argument('-r', '--resolve', action="store_true",
help="Resolve MAC manufacturer")
args = parser.parse_args()
flts = ([], []) # MAC & IP filters
for arg in args.ips_or_macs:
if arg[:1] in "-!~":
neg = True
arg = arg[1:]
else:
neg = False
match = MAC_ADDR.search(arg)
if match:
flts[0].append(db.passive.searchmac(mac=arg, neg=neg))
elif arg.startswith('/') and '/' in arg[1:]:
flts[0].append(db.passive.searchmac(mac=utils.str2regexp(arg),
neg=neg))
elif '/' in arg:
flts[1].append(db.passive.searchnet(arg, neg=neg))
else:
flts[1].append(db.passive.searchhost(arg, neg=neg))
if not flts[0]:
flts[0].append(db.passive.searchmac())
flt = db.passive.flt_or(*flts[0])
if flts[1]:
flt = db.passive.flt_and(flt, db.passive.flt_or(*flts[1]))
if args.sensor is not None:
flt = db.passive.flt_and(flt, db.passive.searchsensor(args.sensor))
if args.count:
print(db.passive.count(flt))