Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python3
import censys.ipv4
import configparser
import json
from base import censys_get_user_input
config = configparser.ConfigParser()
config.read("config.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
chosen_query = censys_get_user_input()
c = censys.ipv4.CensysIPv4(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
for record in c.search(chosen_query):
nrOfResults += 1
print(json.dumps(record))
print("Results received:", nrOfResults)
#!/usr/bin/env python3
import censys.query
import configparser
import censysobject
config = configparser.ConfigParser()
config.read("config.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
c = censys.query.CensysQuery(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
censys_object = censysobject.CensysObject()
print(c.get_series_details("ipv4"))
query = "select ip from ipv4." + censys_object.get_latest_ipv4_tables(censys_object) + " where ip = \"8.8.8.8\""
# Start SQL job
res = c.new_job(query)
job_id = res["job_id"]
# Wait for job to finish and get job metadata
print(c.check_job_loop(job_id))
# Iterate over the results from that job
print(c.get_results(job_id, page=1))
#!/usr/bin/env python3
import censys.query
import configparser
import censysobject
config = configparser.ConfigParser()
config.read("config.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
c = censys.export.CensysExport(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
censys_object = censysobject.CensysObject()
query = "select * from ipv4." + censys_object.get_latest_ipv4_tables(censys_object) + " where ip = \"8.8.8.8\""
# query = ''
print("Executing query: " + query)
# Start new Job
res = c.new_job(query)
job_id = res["job_id"]
result = c.check_job_loop(job_id)
if result['status'] == 'success':
print(result)
for path in result['download_paths']:
print('all paths:')
print(path)
else:
def censys_search_certs(host):
try:
certificates = censys.certificates.CensysCertificates(api_id=UID, api_secret=TOKEN)
cert_query = certificates.search("parsed.names: {0} AND tags.raw: trusted AND NOT parsed.names: cloudflaressl.com".format(host))
result = set([cert['parsed.fingerprint_sha256'] for cert in cert_query])
hosts_query = censys.ipv4.CensysIPv4(api_id=UID, api_secret=TOKEN)
hosts = ' OR '.join(result)
if hosts:
searching = hosts_query.search(hosts)
host_result = set([ search_result['ip'] for search_result in searching ])
return host_result
except:
print("[-] We got an error here, maybe with your credentials!")
exit(1)
def get_certificates():
try:
if not CENSYS_API_ID or not CENSYS_API_SECRET:
logging.info("\033[1;31m[!] API KEY or Secret for Censys not provided.\033[1;m" \
"\nYou'll have to provide them in the script")
sys.exit()
logging.info("[+] Extracting certificates using Censys")
censys_certificates = censys.certificates.CensysCertificates(CENSYS_API_ID, CENSYS_API_SECRET)
return censys_certificates
except censys.base.CensysUnauthorizedException:
logging.info('\033[93m[!] Your Censys credentials look invalid.\n\033[1;m')
sys.exit(1)
except censys.base.CensysRateLimitExceededException:
logging.info('\033[93m[!] Looks like you exceeded your Censys account limits rate. Exiting\n\033[1;m')
sys.exit(1)
api = censys.ipv4.CensysIPv4(api_id=self.api_id, api_secret=self.secret)
self.utility.print_message(OK, 'Check open web ports.')
# Extract search result.
for result in api.search('ip:{}'.format(ip_addr)):
for idx, items in enumerate(result['protocols']):
# Get port number and protocol type.
server_info.append({'Open Port': items.split('/')[0], 'Protocol': items.split('/')[1]})
self.utility.print_message(WARNING, 'Open web port {}: {}'.format(idx+1, items))
if items.split('/')[1] == 'https':
is_https = True
# Check certification.
if is_https is True:
self.utility.print_message(OK, 'Check certification.')
api = censys.certificates.CensysCertificates(api_id=self.api_id, api_secret=self.secret)
fields = ['parsed.subject_dn', 'parsed.validity', 'parsed.signature_algorithm', 'parsed.subject']
# Extract search result.
for cert in api.search('tags: trusted and parsed.names: {}'.format(fqdn), fields=fields):
# Get signature algorithm.
sig_alg = cert['parsed.signature_algorithm.name']
self.utility.print_message(WARNING, 'Signature Algorithm: {}'.format(sig_alg))
# Get common name.
common_names = []
for idx, common_name in enumerate(cert['parsed.subject.common_name']):
common_names.append(common_name)
self.utility.print_message(WARNING, 'Common Name {}: {}'.format(idx+1, common_name))
# Get validity start and end date.
valid_start = cert['parsed.validity.start']
def search(self):
temp_domains = []
try:
main_of_domain = tldextract.extract(self.domain).domain
c = censys.certificates.CensysCertificates(api_id=self.api_id, api_secret=self.api_secret)
# iterate over certificates that match a search
fields = ["parsed.subject_dn", "parsed.fingerprint_sha256"] #parsed.issuer_dn
for cert in c.search("{0}".format(self.domain), fields=fields):
#print cert["parsed.subject_dn"]
cn_domain= cert["parsed.subject_dn"].split(",")[-1].split("=")[-1]#cn一定是在最后吗
main_of_cn_domain =tldextract.extract(cn_domain).domain
if main_of_domain in main_of_cn_domain:
detail = c.view(cert["parsed.fingerprint_sha256"]) #print c.view("a762bf68f167f6fbdf2ab00fdefeb8b96f91335ad6b483b482dfd42c179be076")
#print detail
#print detail["parsed"]["names"]
temp_domains.extend(detail["parsed"]["names"])
temp_domains = list(set(temp_domains))
except Exception,e:
logger.error("Error in {0}: {1}".format(__file__.split('/')[-1], e))
import socket
import sys
from base import censys_get_user_input
HOST = 'localhost'
PORT = 5041
config = configparser.ConfigParser()
config.read("config.ini")
CENSYS_API_ID = (config['SectionOne']['CENSYS_API_ID'])
CENSYS_API_KEY = (config['SectionOne']['CENSYS_API_KEY'])
nrOfResults = 0
nrOfResultsSent = 0
chosen_query = censys_get_user_input()
c = censys.ipv4.CensysIPv4(api_id=CENSYS_API_ID, api_secret=CENSYS_API_KEY)
for record in c.search(chosen_query):
nrOfResults += 1
msg = json.dumps(record).encode('utf-8')
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as msg:
sys.stderr.write("[ERROR] %s\n" % msg[1])
sys.exit(1)
try:
sock.connect((HOST, PORT))
sock.send(msg)
nrOfResultsSent += 1
except socket.error as msg:
sys.stderr.write("[ERROR] %s\n" % msg[1])
sys.exit(2)
print("Results received:", nrOfResults)
def module_run(self, hosts):
api_id = self.get_key('censysio_id')
api_secret = self.get_key('censysio_secret')
c = CensysIPv4(api_id, api_secret)
IPV4_FIELDS = [ 'ip', 'protocols', 'location.country',
'location.latitude', 'location.longitude']
for host in hosts:
self.heading(host, level=0)
try:
payload = c.search('a:{0}'.format(host), IPV4_FIELDS)
except CensysException:
continue
for result in payload:
self.insert_hosts(host=host,
ip_address=result['ip'],
country=result.get('location.country', ''),
latitude=result.get('location.latitude', ''),
longitude=result.get('location.longitude', ''))
for protocol in result['protocols']:
port, service = protocol.split('/')
def censys_search(title):
try:
api = censys.ipv4.CensysIPv4(api_id=UID, api_secret=TOKEN)
query = api.search('80.http.get.title: "{0}"'.format(title))
title_result = set([host['ip'] for host in query])
if title_result:
return title_result
except:
print("[-] We got an error here, maybe with your credentials!")
exit(1)