Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def homepage(request):
return JSONResponse({"hello": "world"})
async def app(scope, receive, send):
request = Request(scope, receive)
data = {"method": request.method, "url": str(request.url)}
response = JSONResponse(data)
await response(scope, receive, send)
def trace_item(item_id: str):
return JSONResponse(media_type="message/http")
for key, value in data.multi_items():
if key not in output:
output[key] = []
if isinstance(value, UploadFile):
content = await value.read()
output[key].append(
{
"filename": value.filename,
"content": content.decode(),
"content_type": value.content_type,
}
)
else:
output[key].append(value)
await request.close()
response = JSONResponse(output)
await response(scope, receive, send)
def head_item(item_id: str):
return JSONResponse(headers={"x-fastapi-item-id": item_id})
async def app(scope, receive, send):
request = Request(scope, receive)
headers = dict(request.headers)
response = JSONResponse({"headers": headers})
await response(scope, receive, send)
def homepage(request):
return JSONResponse(
{
"authenticated": request.user.is_authenticated,
"user": request.user.display_name,
}
async def http_exception(request, exc):
url = urlparse(str(request.url))
if not url.path.startswith("/api/v1"):
if (
Path(webui_dir("dist")).is_dir()
and Path(webui_dir(join("dist", "index.html"))).is_file()
):
return FileResponse(webui_dir(join("dist", "index.html")))
return JSONResponse({"detail": "Not Found"}, status_code=404)
async def _predict_test_endpoint(
self, request: Request
) -> Union[JSONResponse, Jinja2Templates.TemplateResponse]:
if request.method == "GET":
self._validate_http_headers(
request, "accept", MediaTypes.html_media_types(), 406
)
elif request.method == "POST":
self._validate_http_headers(
request, "accept", MediaTypes.json_media_types(), 406
)
test_data = await self._read_test_data()
formatted_data = self._format_input(test_data)
logger.debug("Formatted test data")
processed_results = self._process_prediction(formatted_data)
logger.debug("Completed prediction test process")
formatted_output = self._format_output(processed_results)
logger.debug("Formatted prediction test results")
if request.method == "GET":
async def predict(request: Request):
form = await request.form()
files, entry = convert_input(form)
try:
if (entry.keys() & input_features) != input_features:
return JSONResponse(ALL_FEATURES_PRESENT_ERROR,
status_code=400)
try:
resp = model.predict(data_dict=[entry]).to_dict('records')[0]
return JSONResponse(resp)
except Exception as e:
logger.error("Error: {}".format(str(e)))
return JSONResponse(COULD_NOT_RUN_INFERENCE_ERROR,
status_code=500)
finally:
for f in files:
os.remove(f.name)