Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_contextualize_after_configure(writer):
logger.add(writer, format="{message} {extra[foobar]}")
with logger.contextualize(foobar="baz"):
logger.configure(extra={"foobar": "baz_2"})
logger.info("A")
logger.info("B")
assert writer.read() == "A baz\nB baz_2\n"
def test_stream_handler(capsys):
logger.add(StreamHandler(sys.stderr), format="{level} {message}")
logger.info("test")
logger.remove()
logger.warning("nope")
out, err = capsys.readouterr()
assert out == ""
assert err == "INFO test\n"
def test_nested_contextualize(writer):
logger.add(writer, format="{message} {extra[foobar]}")
with logger.contextualize(foobar="a"):
with logger.contextualize(foobar="b"):
logger.info("B")
logger.info("A")
with logger.contextualize(foobar="c"):
logger.info("C")
assert writer.read() == "B b\nA a\nC c\n"
def create_group(name, domain):
global google, cfg
# create group
logger.info(f"Creating group named: {name} - {name}@{domain}")
success, group = google.create_group(name, domain)
if success:
logger.info(f"Created group {name!r}:\n{group}")
sys.exit(0)
else:
logger.error(f"Failed to create group {name!r}:\n{group}")
sys.exit(1)
def get_proxy(self) -> None:
if self.dljl_proxy_api:
response = requests.get(self.dljl_proxy_api)
if response.status_code == 200:
proxy_data = json.loads(response.text)
if proxy_data['data']['code'] == 0:
proxy_list = proxy_data['data']['list']['ProxyIpInfoList']
proxy_result = []
for proxy in proxy_list:
proxy = 'http://' + proxy['IP'] + ":" + str(proxy['Port'])
proxy_obj = {'http': proxy, 'score': 5}
proxy_result.append(proxy_obj)
logger.info('成功获取网站ip代理')
self.proxy_list = proxy_result
else:
logger.info('请检查代理api...')
raise KeyError
def serve(redis=None, port=SERVER_PORT, address=SERVER_HOST):
if not redis:
redis = RedisClient()
application = Application([
(r'/', Server, dict(redis=redis)),
(r'/(.*)', Server, dict(redis=redis)),
])
application.listen(port, address=address)
logger.info(f'API listening on http://{address}:{port}')
tornado.ioloop.IOLoop.instance().start()
except FileNotFoundError:
logger.warning("tree command not exists, ignore.")
if os.path.isdir(project_name):
logger.warning(
f"Project folder {project_name} exists, please specify a new project name."
)
show_tree(project_name)
return 1
elif os.path.isfile(project_name):
logger.warning(
f"Project name {project_name} conflicts with existed file, please specify a new one."
)
return 1
logger.info(f"Create new project: {project_name}")
print(f"Project Root Dir: {os.path.join(os.getcwd(), project_name)}\n")
def create_folder(path):
os.makedirs(path)
msg = f"created folder: {path}"
print(msg)
def create_file(path, file_content=""):
with open(path, "w", encoding="utf-8") as f:
f.write(file_content)
msg = f"created file: {path}"
print(msg)
demo_testcase_request_content = """
config:
name: "request methods testcase with functions"
def put(self, email_template, email_template_id, **kwargs):
# pylint: disable=unused-argument
"""
更新电子邮件模板
"""
email_template = models.EmailTemplate.update_by_id(
email_template_id, params.EmailTemplateParam, email_template
)
logger.info(f"{current_user.username}更新了电子邮件模板{email_template.id}")
return {"data": email_template}
request_data = request.args.to_dict()
except Exception:
logger.exception(f"Exception parsing request data from {request.remote_addr}: ")
if 'teamdrive_id' in request_data:
teamdrive_id = request_data['teamdrive_id']
item_name = manager.get_item_name_from_cache(request_file, teamdrive_id)
# transcoded version request?
if 'transcode' in request_data:
transcoded_versions = manager.get_transcodes(request_file)
if not transcoded_versions or not len(transcoded_versions):
logger.error(f"Failed to retrieve transcoded versions for {request_file} / {item_name}")
else:
logger.info(f"Found {len(transcoded_versions)} transcoded versions for {request_file} / {item_name}: "
f"{sorted_transcodes_string(transcoded_versions)}")
if request_data['transcode'] not in transcoded_versions:
logger.error(
f"There was no {request_data['transcode']} version available for {request_file} / {item_name}")
else:
logger.info(f"Proxy stream request from {request.remote_addr} for {request_file} / {item_name} / "
f"transcode: {request_data['transcode']}")
try:
return serve_partial(transcoded_versions[request_data['transcode']], request.headers.get('Range'),
teamdrive_id=teamdrive_id)
except TimeoutError:
pass
except Exception:
logger.exception(
f"Exception proxying stream request from {request.remote_addr} for "
f"{request_file} / {item_name} / transcode: {request_data['transcode']}: ")
class_paths.append(
environ.get('SKLEARN_PORTER_PYTEST_GSON_PATH')
)
else:
path = src_path.parent / 'gson.jar'
if not path.exists():
url = language.value.GSON_DOWNLOAD_URI
urllib.request.urlretrieve(url, str(path))
created_files.append(path)
class_paths.append(str(path))
if bool(class_paths):
cmd_args['class_path'] = '-cp ' + ':'.join(class_paths)
cmd = cmd.format(**cmd_args)
L.info('Compilation command: `{}`'.format(cmd))
subp_args = dict(
shell=True,
universal_newlines=True,
stderr=STDOUT,
executable='/bin/bash'
)
try:
check_output(cmd, **subp_args)
except CalledProcessError as e:
msg = 'Command "{}" return with error (code {}):\n\n{}'
msg = msg.format(e.cmd, e.returncode, e.output)
if language is enum.Language.JAVA:
if 'code too large' in e.output:
raise exception.CodeTooLarge(msg)
elif 'too many constants' in e.output: