Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_parse_empty_list(parser, web_request):
web_request.json = {"things": []}
args = {"things": fields.List(fields.Field())}
assert parser.parse(args, web_request) == {"things": []}
def test_it_should_return_missing_if_not_present(self):
field = fields.Field(multiple=False)
request = make_request()
result = parser.parse_headers(request, name, field)
assert result is missing
def test_parse_cookies_called_when_cookies_is_a_location(parse_cookies, web_request):
field = fields.Field()
p = Parser()
p.parse_arg("foo", field, web_request)
assert parse_cookies.call_count == 0
p.parse_arg("foo", field, web_request, locations=("cookies",))
parse_cookies.assert_called()
def echo_file(request):
args = {"myfile": fields.Field()}
result = parser.parse(args, request, locations=("files",))
myfile = result["myfile"]
content = myfile.read().decode("utf8")
return json_response({"myfile": content})
def echo_file():
args = {"myfile": fields.Field()}
result = parser.parse(args, locations=("files",))
fp = result["myfile"]
content = fp.read().decode("utf8")
return J({"myfile": content})
def echo_file(request):
args = {"myfile": fields.Field()}
result = parser.parse(args, request, locations=("files",))
myfile = result["myfile"]
content = myfile.file.read().decode("utf8")
return {"myfile": content}
def test_parse_json_called_by_parse_arg(parse_json, web_request):
field = fields.Field()
p = Parser()
p.parse_arg("foo", field, web_request)
parse_json.assert_called_with(web_request, "foo", field)
def test_validation_rules_property():
instance = model()
for rule in instance.__validation_rules__.items():
assert isinstance(rule[1], fields.Field)
if isinstance(rule[1], fields.List):
assert isinstance(rule[1].container, fields.Field)
def decorator(func):
task = rpc_task_name or func.__name__
if task in self._tasks:
raise FileExistsError(f'Task {task} has already been registered.')
if len(webargs_args) == 0:
signature = inspect.Signature.from_callable(func, follow_wrapped=False)
logger.info(f'Deriving parameters for task {task} from function definition.')
argmap = {
name: value.annotation
for name, value in signature.parameters.items()
if isinstance(value.annotation, fields.Field)
}
elif len(webargs_args) == 1:
argmap = webargs_args[0]
else:
raise TypeError('Only one positional argument (the webargs argmap) allowed.')
logger.info(f'Installing task {task}.')
@functools.wraps(func)
def call_task(*task_args, **task_kwargs):
nonlocal func, self
# Drop the first argument, it's the original arguments dict.
result = func(*task_args[1:], **task_kwargs)
return self._encode_result_as_json(result) if rpc_encode_as_json else result
func_webargs = self._arguments_parser.use_kwargs(argmap, **webargs_kwargs)(call_task)
def register_as_task(func):
parameters = inspect.Signature.from_callable(func, follow_wrapped=False).parameters
func.rpc_task = {
'webargs_argmap': {
name: value.annotation for name, value in parameters.items() if isinstance(value.annotation, fields.Field)
}
}
return func