Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start():
runtime_check()
tasks_count = kb.task_queue.qsize()
info_msg = "pocsusite got a total of {0} tasks".format(tasks_count)
logger.info(info_msg)
logger.debug("pocsuite will open {} threads".format(conf.threads))
try:
run_threads(conf.threads, task_run)
logger.info("Scan completed,ready to print")
finally:
task_done()
if conf.mode == "shell" and not conf.api:
info_msg = "connect back ip: {0} port: {1}".format(
desensitization(conf.connect_back_host) if conf.ppt else conf.connect_back_host, conf.connect_back_port)
logger.info(info_msg)
info_msg = "watting for shell connect to pocsuite"
logger.info(info_msg)
if conf.console_mode:
handle_listener_connection_for_console()
else:
handle_listener_connection()
def exception_handled_function(thread_function, args=(), silent=False):
try:
thread_function(*args)
except KeyboardInterrupt:
kb.thread_continue = False
kb.thread_exception = True
raise
except Exception as ex:
if not silent:
logger.error("thread {0}: {1}".format(threading.currentThread().getName(), str(ex)))
if conf.verbose > 1:
traceback.print_exc()
if conf.url_file:
for line in get_file_items(conf.url_file, lowercase=False, unique=True):
kb.targets.add(line)
if conf.dork:
# enable plugin 'target_from_zoomeye' by default
if 'target_from_shodan' not in conf.plugins and 'target_from_fofa' not in conf.plugins:
conf.plugins.append('target_from_zoomeye')
if conf.dork_zoomeye:
conf.plugins.append('target_from_zoomeye')
if conf.dork_shodan:
conf.plugins.append('target_from_shodan')
if conf.dork_censys:
conf.plugins.append('target_from_censys')
if conf.dork_fofa:
conf.plugins.append('target_from_fofa')
conf.random_agent = None
conf.proxy = None
conf.proxy_cred = None
conf.proxies = {}
conf.timeout = 30
conf.retry = 0
conf.delay = 0
conf.http_headers = {}
conf.agents = [DEFAULT_USER_AGENT] # 数据源从插件加载的时候无默认值需要处理
conf.login_user = None
conf.login_pass = None
conf.shodan_token = None
conf.fofa_user = None
conf.fofa_token = None
conf.censys_uid = None
conf.censys_secret = None
conf.dork = None
conf.dork_zoomeye = None
conf.dork_shodan = None
conf.dork_fofa = None
conf.dork_censys = None
conf.max_page = 1
conf.search_type = 'host'
conf.comparison = False
conf.vul_keyword = None
conf.ssvid = None
conf.plugins = []
conf.threads = 1
conf.batch = False
conf.check_requires = False
conf.quiet = False
conf.update_all = False
conf.threads = 1
conf.batch = False
conf.check_requires = False
conf.quiet = False
conf.update_all = False
conf.verbose = 1
conf.ipv6 = False
conf.multiple_targets = False
conf.pocs_path = None
conf.output_path = None
conf.plugin_name = None
conf.plugin_code = None
conf.connect_back_host = None
conf.connect_back_port = DEFAULT_LISTENER_PORT
conf.console_mode = False
conf.show_version = False
conf.api = False # api for zipoc
conf.ppt = False
Cleanup configuration attributes.
"""
if conf.agent:
conf.agent = re.sub(r"[\r\n]", "", conf.agent)
if conf.cookie:
if isinstance(conf.cookie, str):
conf.cookie = re.sub(r"[\r\n]", "", conf.cookie)
conf.cookie = extract_cookies(conf.cookie)
elif not isinstance(conf.cookie, dict):
raise PocsuiteHeaderTypeException('Does not support type for cookie')
if conf.delay:
conf.delay = float(conf.delay)
if conf.retry:
conf.retry = min(conf.retry, 10)
if conf.url:
if isinstance(conf.url, str):
conf.url = [conf.url]
conf.url = [x.strip() for x in conf.url]
if conf.poc:
if isinstance(conf.poc, str):
conf.poc = [conf.poc]
conf.poc = [poc.lower() if poc.lower().startswith('ssvid-') else poc for poc in conf.poc]
if conf.url_file:
conf.url_file = os.path.expanduser(conf.url_file)
check_file(conf.url_file)
if conf.plugins:
_adjust_logging_formatter()
_cleanup_options()
_basic_option_validation()
_create_directory()
_init_kb_comparison()
update()
_set_multiple_targets()
_set_user_pocs_path()
_set_pocs_modules() # poc module模块要在插件模块前,poc选项中某些参数调用了插件
_set_plugins()
_init_targets_plugins()
_init_pocs_plugins()
_set_task_queue()
_init_results_plugins()
if any((conf.url, conf.url_file, conf.plugins)):
_set_http_cookie()
_set_http_host()
_set_http_referer()
_set_http_user_agent()
_set_http_extra_headers()
_set_connect_back()
_set_network_proxy()
_set_network_timeout()
_set_threads()
_set_listener()
patch_all()
remove_extra_log_message()
conf.dork_fofa = None
conf.dork_censys = None
conf.max_page = 1
conf.search_type = 'host'
conf.comparison = False
conf.vul_keyword = None
conf.ssvid = None
conf.plugins = []
conf.threads = 1
conf.batch = False
conf.check_requires = False
conf.quiet = False
conf.update_all = False
conf.verbose = 1
conf.ipv6 = False
conf.multiple_targets = False
conf.pocs_path = None
conf.output_path = None
conf.plugin_name = None
conf.plugin_code = None
conf.connect_back_host = None
conf.connect_back_port = DEFAULT_LISTENER_PORT
conf.console_mode = False
conf.show_version = False
conf.api = False # api for zipoc
conf.ppt = False
def data_to_stdout(data, bold=False):
"""
Writes text to the stdout (console) stream
"""
if 'quiet' not in conf or not conf.quiet:
message = ""
if isinstance(data, str):
message = stdout_encode(data)
else:
message = data
sys.stdout.write(set_color(message, bold))
try:
sys.stdout.flush()
except IOError:
pass
return
conf.fofa_token = None
conf.censys_uid = None
conf.censys_secret = None
conf.dork = None
conf.dork_zoomeye = None
conf.dork_shodan = None
conf.dork_fofa = None
conf.dork_censys = None
conf.max_page = 1
conf.search_type = 'host'
conf.comparison = False
conf.vul_keyword = None
conf.ssvid = None
conf.plugins = []
conf.threads = 1
conf.batch = False
conf.check_requires = False
conf.quiet = False
conf.update_all = False
conf.verbose = 1
conf.ipv6 = False
conf.multiple_targets = False
conf.pocs_path = None
conf.output_path = None
conf.plugin_name = None
conf.plugin_code = None
conf.connect_back_host = None
conf.connect_back_port = DEFAULT_LISTENER_PORT
conf.console_mode = False
conf.show_version = False
conf.api = False # api for zipoc