Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __new__(cls, endpoint, connection):
return super(TentativePage, cls).__new__(cls, to_str(endpoint))
def fetch(seen):
results = response.connection.get(
'/api/v2/unified_jobs', payload
).json()['results']
# erase lines we've previously printed
if print_stdout and sys.stdout.isatty():
for _ in seen:
sys.stdout.write('\x1b[1A')
sys.stdout.write('\x1b[2K')
for result in results:
result['name'] = to_str(result['name'])
if print_stdout:
print(' ↳ {id} - {name} '.format(**result), end='')
status = result['status']
if color_enabled():
color = STATUS_COLORS.get(status, 'white')
cprint(status, color)
else:
print(status)
seen.add(result['id'])
result['name'] = to_str(result['name'])
if print_stdout:
print(' ↳ {id} - {name} '.format(**result), end='')
status = result['status']
if color_enabled():
color = STATUS_COLORS.get(status, 'white')
cprint(status, color)
else:
print(status)
seen.add(result['id'])
if print_stdout:
cprint('------Starting Standard Out Stream------', 'red')
if print_stdout:
print('Launching {}...'.format(to_str(get().json.name)))
started = time.time()
seen = set()
while True:
if timeout and time.time() - started > timeout:
if print_stdout:
cprint('Monitoring aborted due to timeout.', 'red')
break
if sys.stdout.isatty():
# if this is a tty-like device, we can send ANSI codes
# to draw an auto-updating view
# otherwise, just wait for the job to finish and print it *once*
# all at the end
fetch(seen)
def fetch(next_line):
for result in events(**payload).json.results:
if result['start_line'] != next_line:
# If this event is a line from _later_ in the stdout,
# it means that the events didn't arrive in order;
# skip it for now and wait until the prior lines arrive and are
# printed
continue
stdout = to_str(result.get('stdout'))
if stdout and print_stdout:
print(stdout)
next_line = result['end_line']
return next_line
'scope': getattr(parsed, 'conf.scope', None),
}
try:
token = api.Api().get_oauth2_token(**kwargs)
except Exception as e:
self.print_help(parser)
cprint(
'Error retrieving an OAuth2.0 token ({}).'.format(e.__class__),
'red'
)
else:
fmt = client.get_config('format')
if fmt == 'human':
print('export TOWER_TOKEN={}'.format(token))
else:
print(to_str(FORMATTERS[fmt]({'token': token}, '.')).strip())
def __init__(self, endpoint, connection):
self.endpoint = to_str(endpoint)
self.connection = connection