Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
cleanups = []
def main(self):
del self.cleanups[:]
print ("hi this is geet main")
def cleanup(self, retcode):
self.cleanups.append(1)
print("geet cleaning up with rc = %s" % (retcode,))
@Geet.subcommand("add")
class GeetAdd(cli.Application):
def main(self, *files):
return "adding", files
@Geet.subcommand("commit")
class GeetCommit(cli.Application):
message = cli.Flag("-m", str)
def main(self):
if self.parent.debug:
return "committing in debug"
else:
return "committing"
def cleanup(self, retcode):
self.parent.cleanups.append(2)
print("geet commit cleaning up with rc = %s" % (retcode,))
class Sample(cli.Application):
DESCRIPTION = "A sample cli application"
DESCRIPTION_MORE = '''
ABC This is just a sample help text typed with a Dvorak keyboard.
@Geet.subcommand("commit")
class GeetCommit(cli.Application):
message = cli.Flag("-m", str)
def main(self):
if self.parent.debug:
return "committing in debug"
else:
return "committing"
def cleanup(self, retcode):
self.parent.cleanups.append(2)
print("geet commit cleaning up with rc = %s" % (retcode,))
class Sample(cli.Application):
DESCRIPTION = "A sample cli application"
DESCRIPTION_MORE = '''
ABC This is just a sample help text typed with a Dvorak keyboard.
Although this paragraph is not left or right justified
in source, we expect it to appear
formatted nicely on the output, maintaining the indentation of the first line.
DEF this one has a different indentation.
Let's test that list items are not combined as paragraphs.
- Item 1
GHI more text for item 1, which may be very very very very very very long and even more long and long and long to
prove that we can actually wrap list items as well.
- Item 2 and this is
some text for item 2
import pytest
import sys
from plumbum import cli, local
from plumbum.cli.terminal import get_terminal_size
class SimpleApp(cli.Application):
@cli.switch(["a"])
def spam(self):
print("!!a")
@cli.switch(["b", "bacon"], argtype=int, mandatory = True, envname="PLUMBUM_TEST_BACON")
def bacon(self, param):
"""give me some bacon"""
print ("!!b", param)
eggs = cli.SwitchAttr(["e"], str, help = "sets the eggs attribute", envname="PLUMBUM_TEST_EGGS")
cheese = cli.Flag(["--cheese"], help = "cheese, please")
chives = cli.Flag(["--chives"], help = "chives, instead")
verbose = cli.CountOf(["v"], help = "increases the verbosity level")
benedict = cli.CountOf(["--benedict"], help = """a very long help message with lots of
useless information that nobody would ever want to read, but heck, we need to test
text wrapping in help messages as well""")
/XYZ Invisible 1
/Invisible 2
* Star 1
* Star 2
Last paragraph can fill more than one line on the output as well. So many features is bound to cause lots of bugs.
Oh well...
'''
foo = cli.SwitchAttr("--foo")
Sample.unbind_switches("--version")
class Mumble(cli.Application):
pass
Sample.subcommand("mumble", Mumble)
class LazyLoaded(cli.Application):
def main(self):
print("hello world")
class AppA(cli.Application):
@cli.switch(['--one'])
def one(self):
pass
two = cli.SwitchAttr(['--two'])
self.buffer = ''
def write(self, text):
pass
def close(self, *args, **kw):
"""
This will force the thread to exit.
"""
self.exiting = True
# # #
class TwisterRunner(cli.Application):
userName = cli.SwitchAttr(['-u', '--user'], str, default='',
help='The username')
epName = cli.SwitchAttr(['-e', '-ep', '--epname'], str, default='',
help='The Execution Process name')
cePath = cli.SwitchAttr(['-s', '--server'], str, default='',
help='The Central Engine RPyc IP:Port')
logFile = cli.Flag(['-l', '--log'], default=False,
help='Log stdout in a file? Default: DISABLED.')
def __del__(self):
print('Caught Execution Process EXIT !\n')
self.exit()
def main(self):
logging.info("Found a minion configuration directory: %s" % minions_conf)
for filename in listdir(minions_conf):
filepath = join(minions_conf, filename)
puts(colored.blue("Found minion conf: %s" % filepath))
self.parent.config.setdefault('minion_conf', {})[filename] = filepath
else:
puts(colored.yellow("No minion configuration directory found: %s" % minions_conf))
# Write config file
self.parent.write_config_file()
@SaltPad.subcommand("create_vm")
class CreateVm(cli.Application, VagrantManagerMixin):
def main(self, project_name):
# Check directory
project_path = abspath(project_name)
if isdir(project_path):
puts(colored.red("The directory %s already exists, abort" % project_path))
sys.exit(1)
# Choose minion configuration
if len(self.parent.config.get('minion_conf', [])) == 0:
puts(colored.red("You must register at least one minion configuration,"
"use register_dir command to do so."))
sys.exit(1)
if len(self.parent.config.get('minion_conf', [])) == 1:
minion_conf = self.parent.config['minion_conf'].values()[0]
else:
puts(colored.blue("Minion config has changes, would you like to redo the snapshot? It will save the VM in its current state"))
choice = bool_choice("Yes/No: ")
if choice:
sandbox.sandbox_commit()
else:
puts(colored.blue("Would you like to enable snapshot on VM?"))
choice = bool_choice("Yes/No: ")
if choice:
puts(colored.blue("Starting snapshot"))
sandbox.sandbox_on()
puts(colored.blue("Done"))
@SaltPad.subcommand("halt")
class VagrantHalt(cli.Application, VagrantManagerMixin):
def main(self, project_name):
self.execute_vagrant_command_on_minion(project_name, 'halt')
@SaltPad.subcommand("destroy")
class VagrantDestroy(cli.Application, VagrantManagerMixin):
def main(self, project_name):
self.execute_vagrant_command_on_minion(project_name, 'destroy')
@SaltPad.subcommand("provision")
class VagrantProvision(cli.Application, VagrantManagerMixin):
def main(self, project_name):
dependencies += 1
else:
pass
# self.format_result(step_name, step, colored.red)
total = success + failure
if not failure:
puts(colored.green("All {0} step OK, {1} changes".format(total, changes)))
return True
else:
puts(colored.red("{0} steps, {1} failures, {4} dependencies failed, {2} OK, {3} changes".format(
total, failure, success, changes, dependencies)))
return False
@SaltPad.subcommand("healthchecks")
class Healthchecks(cli.Application):
"""Run healthchecks on minions matching target
"""
def main(self, target):
minions = self.parent.client.cmd(target, 'test.ping')
if len(minions) == 0:
puts(colored.red("No up minions matching, abort!"))
sys.exit(1)
for minion in minions:
puts(colored.blue("=" * 10))
puts(colored.blue("Minion: %s" % minion))
puts(colored.blue("Starting healthchecks on %s" % minion))
health_checks_result = self.parent.client.cmd(minion,
if self.num_conns == 0:
self.done_event.set()
self.listener.close()
self.join()
def join(self):
self.thread.join()
try:
pid, dummy = os.waitpid(self.proc, 0) # os.WNOHANG)
except OSError as ee:
print( ee )
class AngrDbgServer(cli.Application):
port = cli.SwitchAttr(["-p", "--port"], cli.Range(0, 65535), default=None,
help="The TCP listener port (default = %s, default for SSL = %s)" %
(DEFAULT_SERVER_PORT, DEFAULT_SERVER_SSL_PORT), group="Socket Options")
host = cli.SwitchAttr(
["--host"],
str,
default="127.0.0.1",
help="The host to bind to. "
"The default is INADDR_ANY",
group="Socket Options")
ipv6 = cli.Flag(["--ipv6"], help="Enable IPv6", group="Socket Options")
logfile = cli.SwitchAttr(
"--logfile",
str,
default=None,
from stem.descriptor import parse_file
from plumbum import cli, local
from stem.descriptor.remote import DescriptorDownloader
import logging as log
import threading
import nmap
import Queue
from time import gmtime, strftime
#
# Attack exit nodes of the TOR Network.
# Author: Adastra.
# http://thehackerway.com
#
class Cli(cli.Application):
'''
Command-Line options received.
'''
verbose = cli.Flag(["-v", '--verbose'], help="Verbose Mode.")
brute = cli.Flag(["-b", '--brute'], help="Brute Force Mode. (Specify -u/--users-file and -f/--passwords-file options to select the users and passwords files.)")
useMirror = cli.Flag(["-d", '--use-mirrors'], help="Use the mirror directories of TOR. This will help to not overwhelm the official directories")
useShodan = cli.Flag(["-s", '--use-shodan'], help="Use ShodanHQ Service. (Specify -k/--shodan-key to set up the file where's stored your shodan key.)")
threads = 1
mode = None
usersFile = None
passFile = None
exitNodesToAttack = 10 #Number of default exit-nodes to filter from the Server Descriptor file.
shodanKey = None
scanPorts = "21,22,23,53,69,80,88,110,139,143,161,389,443,445,1079,1080,1433,3306,5432,8080,9050,9051,5800" #Default ports used to scan with nmap.
scanProtocol = 'tcp' #Using TCP protocol to perform the nmap scan.
scanArguments = None #Scan Arguments passed to nmap.