Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
registry = config.get_env(profile_name, 'docker_registry')
vol_mapping = config.get_env(profile_name, 'vol_mapping')
port_mapping = config.get_env(profile_name, 'port_mapping')
env_vars = config.get_env(profile_name, 'env_vars')
command = "docker run -d "
command += '--name ' + profile_name
command += ' -p ' + port_mapping
command += ' -v ' + vol_mapping
command += ' -e ' + env_vars
tag = 'latest'
if tag_name == '':
logger.log_bl('\nGetting top 5 available tags of ' + profile_name + ' from Amazon ECR registry...')
tag_list = utils.cmd_exec("aws ecr describe-images --repository-name " + profile_name + " --output text --query 'sort_by(imageDetails,& imagePushedAt)[*].imageTags[*]' | tr '\t' '\n' | tail -5")
tag_list = list(reversed(tag_list.split('\n')))
tag = click.prompt(logger.style('\nSelect an tag to deploy: ?'), type=click.Choice(tag_list))
logger.log_g('\nYou have selected tag: [ ' + tag + ' ] for your application')
command += ' ' + registry + '/' + profile_name + ':' + tag
logger.debug('final command : ' + command)
logger.log_y('\nKilling old container if exist')
utils.cmd_exec('docker rm -f ' + profile_name)
utils.cmd_exec(command)
logger.log_g("\nSuccessfully started " + profile_name + " application . Please check logs using: ")
logger.log_cy("\n docker logs -f " + profile_name + " \n")
@click.command()
@click.option('--elasticsearch', '-e', default='localhost:9200', multiple=True)
@click.option('--format', '-f', default='csv', type=click.Choice(['csv', 'json']))
@click.argument('output', type=click.File('w'))
def main(elasticsearch, format, output):
hosts = [{'host': h, 'port': int(p)} for h, p in (x.split(':', 1) for x in elasticsearch)]
de = DiscograpyExtractor(hosts)
if format == 'json':
out = []
for p, n in de.get_discography():
out.append([p, n])
json.dump(out, output)
else:
writer = csv.writer(output, delimiter=',', quoting=csv.QUOTE_MINIMAL)
writer.writerow(["page_title", "album_title"])
for p, n in de.get_discography():
writer.writerow([p.encode('utf-8'), n.encode('utf-8')])
class NetworkChoiceType(click.Choice):
def convert(self, value, param, ctx):
if isinstance(value, int):
return value
elif isinstance(value, str) and value.isnumeric():
try:
return int(value)
except ValueError:
self.fail(f"invalid numeric network id: {value}", param, ctx)
else:
network_name = super().convert(value, param, ctx)
return NETWORKNAME_TO_ID[network_name]
class EnumChoiceType(Choice):
def __init__(self, enum_type: EnumMeta, case_sensitive=True):
self._enum_type = enum_type
# https://github.com/python/typeshed/issues/2942
super().__init__(
[choice.value for choice in enum_type], case_sensitive=case_sensitive # type: ignore
)
def convert(self, value, param, ctx):
try:
return self._enum_type(value)
except ValueError:
self.fail(f"'{value}' is not a valid {self._enum_type.__name__.lower()}", param, ctx)
class GasPriceChoiceType(click.Choice):
""" Returns a GasPriceStrategy for the choice """
type=click.Choice(ioutils_formats_list),
default=None,
help="Format the output",
)
@click.option(
"ostyle",
"--output-style",
"-S",
default=None,
help="Style of the output (tabulate table format)",
)
@click.argument("userid", nargs=-1, metavar="[]")
@pass_context
def ul_showdb(ctx, oformat, ostyle, userid):
"""Show details for location records in database
\b
type=click.Choice(["qc","assembly","genomes","genecatalog","None","all"]),
# show_default=True,
# help="Execute only subworkflow.",
)
@click.option("-w",
"--working-dir",
type=click.Path(dir_okay=True,writable=True,resolve_path=True),
help="location to run atlas.",
default="."
)
@click.option("-c",
"--config-file",
type=click.Path(exists=True,resolve_path=True),
help="config-file generated with 'atlas init'",
)
@click.option(
"-j",
type=click.Choice(['id',
'identifier',
'type',
'network_space',
'datacenter',
'vlan_id',
'IPs',
'hardware',
'vs']))
@click.option('--datacenter', '-d',
help="Filter by datacenter shortname (sng01, dal05, ...)")
@click.option('--identifier', help="Filter by network identifier")
@click.option('--subnet-type', '-t', help="Filter by subnet type")
@click.option('--network-space', help="Filter by network space")
@click.option('--ipv4', '--v4', is_flag=True, help="Display only IPv4 subnets")
@click.option('--ipv6', '--v6', is_flag=True, help="Display only IPv6 subnets")
@environment.pass_env
@click.option('--replacement', type=click.Choice([u'nothing', u'space']),
default=u'nothing')
@click.option('--out_dir', '-o', default=os.getcwd(), type=click.Path())
def remove_newlines(in_file, replacement, out_dir):
create_dirs(out_dir)
text = in_file.read()
if replacement == u'space':
text = re.sub('\n+', u' ', text)
else:
text = text.replace(u'\n', u'')
text = text.strip()
stdout_text = click.get_text_stream('stdout')
stdout_text.write(text)
type=click.Choice(['json', 'marcxml']),
default='marcxml',
help='Whether to use JSON or MARCXML.')
@click.option(
'--recid',
'-r',
help='Record ID to load (NOTE: will load only one record!).',
default=None)
@with_appcontext
def dryrun(sources, source_type, recid):
"""Load records migration dump."""
from invenio_logging.fs import InvenioLoggingFS
current_app.config['LOGGING_FS'] = True
current_app.config['LOGGING_FS_LOGFILE'] = '/tmp/migration.log'
InvenioLoggingFS(current_app)
current_app.config['MIGRATOR_RECORDS_DUMPLOADER_CLS'] = \
'cds.modules.migrator.records:DryRunCDSRecordDumpLoader'
def linux_distribution_option(command: Callable[..., None],
) -> Callable[..., None]:
"""
An option decorator for choosing Linux distribution options on AWS.
"""
function = click.option(
'--linux-distribution',
type=click.Choice(sorted(LINUX_DISTRIBUTIONS.keys())),
default='centos-7',
show_default=True,
help='The Linux distribution to use on the nodes.',
)(command) # type: Callable[..., None]
return function