Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_admin_user(self, conn_string, choose_password):
engine = create_engine(conn_string)
# TODO change the random_password variable name, it is not always
# random anymore
if choose_password:
random_password = click.prompt(
'Enter the desired password for the "faraday" user',
confirmation_prompt=True,
hide_input=True
)
else:
random_password = self.generate_random_pw(12)
already_created = False
try:
engine.execute("INSERT INTO \"faraday_user\" (username, name, password, "
"is_ldap, active, last_login_ip, current_login_ip, role, state_otp) VALUES ('faraday', 'Administrator', "
"'{0}', false, true, '127.0.0.1', '127.0.0.1', 'admin', 'disabled');".format(random_password))
except sqlalchemy.exc.IntegrityError as ex:
if is_unique_constraint_violation(ex):
# when re using database user could be created previously
already_created = True
print(
def _add_pandas_datasource(context):
path = click.prompt(
msg_prompt_filesys_enter_base_path,
# default='/data/',
type=click.Path(
exists=True,
file_okay=False,
dir_okay=True,
readable=True
),
show_default=True
)
if path.startswith("./"):
path = path[2:]
if path.endswith("/"):
basenamepath = path[:-1]
else:
nfs_registry_mountpoint ='/exports'
tags.append('nfs')
else:
if nfs_registry_host == '':
nfs_registry_host = click.prompt("Please enter the NFS Server fqdn for persistent registry:")
if nfs_registry_mountpoint is '':
nfs_registry_mountpoint = click.prompt("Please enter NFS share name for persistent registry:")
tags.append('prod')
if byo_lb == "no":
lb_host = lb_host + '.' + public_hosted_zone
tags.append('haproxy')
else:
if lb_host == '':
lb_host = click.prompt("Please enter the load balancer hostname for installation:")
lb_host = lb_host + '.' + public_hosted_zone
if create_ocp_vars is True:
click.echo('Configured OCP variables:')
click.echo('\tauth_type: %s' % auth_type)
click.echo('\tldap_fqdn: %s' % ldap_fqdn)
click.echo('\tldap_user: %s' % ldap_user)
click.echo('\tldap_user_password: %s' % ldap_user_password)
click.echo('\tpublic_hosted_zone: %s' % public_hosted_zone)
click.echo('\tapp_dns_prefix: %s' % app_dns_prefix)
click.echo('\tbyo_lb: %s' % byo_lb)
click.echo('\tlb_host: %s' % lb_host)
click.echo('\tUsing values from: %s' % vmware_ini_path)
if not no_confirm:
click.confirm('Continue using these values?', abort=True)
if auth_type == 'ldap':
def _generate_from_location(location, runtime, dependency_manager, output_dir, name, app_template, no_input):
location = click.prompt("\nTemplate location (git, mercurial, http(s), zip, path)", type=str)
summary_msg = """
-----------------------
Generating application:
-----------------------
Location: {location}
Output Directory: {output_dir}
""".format(
location=location, output_dir=output_dir
)
click.echo(summary_msg)
do_generate(location, runtime, dependency_manager, output_dir, name, no_input, None)
def user_access_token():
app_id = click.prompt('Please input your app id', type=str)
app_secret = click.prompt('Please input your app secret', hide_input=True, type=str)
facebook = OAuth2Session(client_id=app_id, redirect_uri=DEFAULT_REDIRECT_URL)
facebook = facebook_compliance_fix(facebook)
authorization_url, state = facebook.authorization_url(DEFAULT_OAUTH_URL)
webbrowser.open(authorization_url)
response = click.prompt('Please input full callback url', type=str)
facebook.fetch_token(
token_url=DEFAULT_TOKEN_URL, client_secret=app_secret,
authorization_response=response
)
click.echo("Your access token is: \n{}".format(facebook.access_token))
cursor = db.cursor()
cursor.execute('''SELECT artist, album
FROM collection
WHERE skip = 0
AND complete = 0
AND track_count > 5
ORDER BY track_count DESC''')
all_albums = cursor.fetchall()
for i, (artist, album) in enumerate(all_albums):
print_header(artist, album, i, len(all_albums))
results = gmusic.search_all_access(artist + ' ' + album)['album_hits']
if not results:
click.secho("Can't find it, fixup please.", fg='yellow')
fixed_artist = click.prompt('Artist'.format(artist), default=artist)
fixed_album = click.prompt('Album'.format(album), default=album)
results = gmusic.search_all_access(fixed_artist + ' ' + fixed_album)['album_hits']
if not results:
handle_not_found(db, artist, album)
continue
# swizzle results into a slightly more usable form
results = [r['album'] for r in results]
try:
album_object = select_album(artist, album, results, i, len(all_albums))
except SkipAlbum:
handle_not_found(db, artist, album, prompt=False)
continue
if album_object:
add_to_gmusic(db, gmusic, album_object, artist, album)
def get_input_key():
"""Input API key and validate"""
click.secho("No API key found!", fg="yellow", bold=True)
click.secho("Please visit {} and get an API token.".format(RequestHandler.BASE_URL),
fg="yellow",
bold=True)
while True:
confkey = click.prompt(click.style("Enter API key",
fg="yellow", bold=True))
if len(confkey) == 32: # 32 chars
try:
int(confkey, 16) # hexadecimal
except ValueError:
click.secho("Invalid API key", fg="red", bold=True)
else:
break
else:
click.secho("Invalid API key", fg="red", bold=True)
return confkey
"pip": {
},
"pip3": {
}
},
"author": self.author,
"authorEmail": self.author_email
}
string = json.dumps(self.json, ensure_ascii=False, indent=4)
self.json = string
print(self.json)
click.echo('This will remove your previous pack.json file and replace with new one.')
sure = click.prompt('Are you sure you want to renew/write to your pack.json file?', type=str, default='y')
if sure == 'y':
self.insert_into_folder()
else:
click.secho('Process aborted', fg='red')
def login(email, password):
"""Use fxa to get the firefox account api
TODO: Make it work with 2fa
"""
client = fxa.core.Client("https://api.accounts.firefox.com")
logging.debug("Signing in as %s ..." % email)
session = client.login(email, password, keys=True)
try:
status = session.get_email_status()
while not status["verified"]:
ret = click.prompt(
"Please click through the confirmation email or type 'resend' to resend the mail or 'c' to continue")
if ret == "resend":
session.resend_email_code()
status = session.get_email_status()
assertion = session.get_identity_assertion("https://token.services.mozilla.com/")
_, kB = session.fetch_keys()
finally:
session.destroy_session()
return assertion, kB
"by request %s:" % json.dumps(filters),
fg="yellow",
err=True)
from platformio.commands.lib import print_lib_item
for item in result['items']:
print_lib_item(item)
if not interactive:
click.secho(
"Automatically chose the first available library "
"(use `--interactive` option to make a choice)",
fg="yellow",
err=True)
lib_info = result['items'][0]
else:
deplib_id = click.prompt("Please choose library ID",
type=click.Choice([
str(i['id'])
for i in result['items']
]))
for item in result['items']:
if item['id'] == int(deplib_id):
lib_info = item
break
if not lib_info:
if list(filters) == ["name"]:
raise exception.LibNotFound(filters['name'])
raise exception.LibNotFound(str(filters))
if not silent:
click.echo("Found: %s" % click.style(
"https://platformio.org/lib/show/{id}/{name}".format(