Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _find_fingerprint():
for host in ivre.db.db.nmap.get(ivre.db.db.nmap.searchsshkey()):
for port in host.get('ports', []):
for script in port.get('scripts', []):
if script['id'] == 'ssh-hostkey':
for key in script.get('ssh-hostkey', []):
if 'fingerprint' in key:
return host['addr'], key['fingerprint']
)
self.check_nmap_count_value(
"nmap_extended_eu_count",
ivre.db.db.nmap.searchcountry(['EU', 'CH', 'NO']),
["--country=EU,CH,NO"], "country:EU,CH,NO"
)
# Filters
addr = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.flt_empty, fields=["addr"]
))['addr']
self.check_nmap_count_value(1, ivre.db.db.nmap.searchhost(addr),
['--host', ivre.utils.force_int2ip(addr)],
ivre.utils.force_int2ip(addr))
result = next(ivre.db.db.nmap.get(
ivre.db.db.nmap.searchhost(addr)
))
self.assertEqual(result['addr'], addr)
self.check_count_value_api(1, ivre.db.db.nmap.flt_and(
ivre.db.db.nmap.searchhost(addr),
ivre.db.db.nmap.searchhost(addr),
), database=ivre.db.db.nmap)
recid = ivre.db.db.nmap.getid(
next(ivre.db.db.nmap.get(ivre.db.db.nmap.flt_empty))
)
self.check_count_value_api(1, ivre.db.db.nmap.searchid(recid),
database=ivre.db.db.nmap)
self.assertIsNotNone(
ivre.db.db.nmap.getscan(
ivre.db.db.nmap.getscanids(
next(ivre.db.db.nmap.get(ivre.db.db.nmap.flt_empty))
else:
sys.stdout.write("%s(%s);\n" % (callback, json.dumps(series)))
exit(0)
# extract info
if action in ["onlyips", "ipsports", "timeline", "coordinates",
"countopenports", "diffcats"]:
preamble = "[\n"
postamble = "]\n"
r2res = lambda x: x
if action == "timeline":
if hasattr(db.nmap, "get_open_port_count"):
result = list(db.nmap.get_open_port_count(flt, archive=archive))
count = len(result)
else:
result = db.nmap.get(
flt, archive=archive,
fields=['addr', 'starttime', 'openports.count']
)
count = result.count()
if params.get("modulo") is None:
r2time = lambda r: int(r['starttime'].strftime('%s'))
else:
r2time = lambda r: (int(r['starttime'].strftime('%s'))
% int(params.get("modulo")))
if ipsasnumbers:
r2res = lambda r: [r2time(r), force_ip_int(r['addr']),
r['openports']['count']]
else:
r2res = lambda r: [r2time(r), force_ip_str(r['addr']),
r['openports']['count']]
elif action == "coordinates":
def getgraph(flt=db.db.nmap.flt_empty):
h, p = [], []
allhosts = db.db.nmap.get(flt)
for ap in allhosts:
hh, pp = graphhost(ap)
h += hh
p += pp
return h, p
def gettoarchive(addr, source):
return db.nmap.get(
db.nmap.flt_and(db.nmap.searchhost(addr),
db.nmap.searchsource(source))
)
def from_nmap(flt, category=None):
"""Return an Nmap entry in the View format."""
cur_addr = None
cur_rec = None
result = None
for rec in db.nmap.get(flt, sort=[("addr", 1)]):
if 'addr' not in rec:
continue
rec = nmap_record_to_view(rec, category=category)
if cur_addr is None:
cur_addr = rec['addr']
cur_rec = rec
continue
if cur_addr != rec['addr']:
result = cur_rec
cur_rec = rec
cur_addr = rec['addr']
yield result
else:
cur_rec = db.view.merge_host_docs(cur_rec, rec)
continue
if cur_rec is not None:
count = result.count()
else:
count = db.nmap.count(flt, archive=archive,
fields=['addr', 'openports.count'])
if ipsasnumbers:
r2res = lambda r: [force_ip_int(r['addr']),
r['openports']['count']]
else:
r2res = lambda r: [force_ip_str(r['addr']),
r['openports']['count']]
elif action == "ipsports":
if hasattr(db.nmap, "get_ips_ports"):
result = list(db.nmap.get_ips_ports(flt, archive=archive))
count = sum(len(host.get('ports', [])) for host in result)
else:
result = db.nmap.get(
flt, archive=archive,
fields=['addr', 'ports.port', 'ports.state_state']
)
count = sum(len(host.get('ports', [])) for host in result)
result.rewind()
if ipsasnumbers:
r2res = lambda r: [
force_ip_int(r['addr']),
[[p['port'], p['state_state']]
for p in r.get('ports', [])
if 'state_state' in p]
]
else:
r2res = lambda r: [
force_ip_str(r['addr']),
[[p['port'], p['state_state']]
r['openports']['count']]
elif action == "coordinates":
preamble = '{"type": "GeometryCollection", "geometries": ['
postamble = ']}'
result = list(db.nmap.getlocations(flt, archive=archive))
count = len(result)
r2res = lambda r: {
"type": "Point",
"coordinates": r['_id'],
"properties": {"count": r['count']},
}
elif action == "countopenports":
if hasattr(db.nmap, "get_open_port_count"):
result = db.nmap.get_open_port_count(flt, archive=archive)
else:
result = db.nmap.get(flt, archive=archive,
fields=['addr', 'openports.count'])
if hasattr(result, "count"):
count = result.count()
else:
count = db.nmap.count(flt, archive=archive,
fields=['addr', 'openports.count'])
if ipsasnumbers:
r2res = lambda r: [force_ip_int(r['addr']),
r['openports']['count']]
else:
r2res = lambda r: [force_ip_str(r['addr']),
r['openports']['count']]
elif action == "ipsports":
if hasattr(db.nmap, "get_ips_ports"):
result = list(db.nmap.get_ips_ports(flt, archive=archive))
count = sum(len(host.get('ports', [])) for host in result)