Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def server_void(request: Request):
"""Describe all RDF datasets hosted by the Sage endpoint"""
try:
mimetypes = request.headers['accept'].split(",")
url = urlunparse(request.url.components[0:3] + (None, None, None))
if url.endswith('/'):
url = url[0:len(url) - 1]
void_format, res_mimetype = choose_void_format(mimetypes)
description = many_void(url, dataset, void_format)
return Response(description, media_type=res_mimetype)
except Exception as err:
logging.error(err)
raise HTTPException(status_code=500, detail=str(err))
headers = {"Content-Disposition": f'attachment; filename="{table_id}.json"'}
return Response(content, headers=headers)
elif export == "csv":
output = io.StringIO()
writer = csv.writer(output)
queryset = await datasource.all()
headers = [field.title for field in datasource.schema.fields.values()]
writer.writerow(headers)
for item in queryset:
row = [item.get(key, default="") for key in datasource.schema.fields.keys()]
writer.writerow(row)
content = output.getvalue()
headers = {"Content-Disposition": f'attachment; filename="{table_id}.csv"'}
return Response(content, headers=headers)
# Perform pagination
datasource = datasource.offset(offset).limit(PAGE_SIZE)
queryset = await datasource.all()
# Get pagination and column controls to render on the page
column_controls = ordering.get_column_controls(
url=request.url,
columns=columns,
selected_column=order_column,
is_reverse=is_reverse,
)
page_controls = pagination.get_page_controls(
url=request.url, current_page=current_page, total_pages=total_pages
)
def deserialize_response(serialized_response: dict) -> Response:
"""
Given the JSON representation of a response, re-build the
original response object.
"""
return Response(
content=json_string_to_bytes(serialized_response["content"]),
status_code=serialized_response["status_code"],
headers=serialized_response["headers"],
)
{s['items']:,.0f} items found.
''')
if s['errors'] > 0:
fe.title('Spider {} completed'.format(s['spider']))
else:
fe.title('Spider {} completed with errors'.format(s['spider']))
fe.link(href=request.url_for('get_scrape', scrape_id=s['id']))
if feedformat == 'atom':
return Response(
fg.atom_str(pretty=True),
media_type='text/plain',
# media_type='application/atom+xml',
)
return Response(
fg.rss_str(pretty=True),
media_type='text/plain',
# media_type='application/rss+xml',
title_date: str = ttime(time.time() - 86400 * date_delta)[:10]
# 当日 0 点发布前一天的结果
pubDate: str = ttime(ptime(ttime(time.time() - 86400 *
(date_delta - 1))[:10],
fmt='%Y-%m-%d'),
fmt='%a, %d %b %Y')
link: str = f'https://{ONLINE_HOST}/newspaper/daily.python/{title_date}?lang={language}'
item: dict = {
'title': f'Python Daily [{title_date}]',
'link': link,
'guid': link,
'pubDate': pubDate
}
xml_data['items'].append(item)
xml: str = gen_rss(xml_data)
return Response(xml, media_type='text/xml')
ensure_ascii=False,
allow_nan=False,
indent=None,
separators=(",", ":"),
).encode("utf-8")
class UJSONResponse(JSONResponse):
media_type = "application/json"
def render(self, content: typing.Any) -> bytes:
assert ujson is not None, "ujson must be installed to use UJSONResponse"
return ujson.dumps(content, ensure_ascii=False).encode("utf-8")
class RedirectResponse(Response):
def __init__(
self, url: typing.Union[str, URL], status_code: int = 307, headers: dict = None
) -> None:
super().__init__(content=b"", status_code=status_code, headers=headers)
self.headers["location"] = quote_plus(str(url), safe=":/%#?&=@[]!$&'()*+,;")
class StreamingResponse(Response):
def __init__(
self,
content: typing.Any,
status_code: int = 200,
headers: dict = None,
media_type: str = None,
background: BackgroundTask = None,
) -> None:
* next_link: Link to a SaGe saved plan. Use `None` if there is no one, i.e., the query execution has completed during the quantum.
* stats: Statistics about query execution.
* skol_url: URL used for the skolemization of blank nodes.
Returns:
An HTTP response built from the input mimetypes and the SPARQL query results.
"""
if "application/json" in mimetypes:
iterator = responses.raw_json_streaming(bindings, next_page, stats, skol_url)
return StreamingResponse(iterator, media_type="application/json")
elif "application/sparql-results+json" in mimetypes:
iterator = responses.w3c_json_streaming(bindings, next_page, stats, skol_url)
return StreamingResponse(iterator, media_type="application/json")
elif "application/xml" in mimetypes or "application/sparql-results+xml" in mimetypes:
iterator = responses.w3c_xml(bindings, next_page, stats)
return Response(iterator, media_type="application/xml")
return JSONResponse({
"bindings": bindings,
"next": next_page,
"stats": stats
})
# Defer sending this message until we figured out
# whether the response can be cached.
self.initial_message = message
return
assert message["type"] == "http.response.body"
if message.get("more_body", False):
logger.trace("response_not_cachable reason=is_streaming")
self.is_response_cachable = False
await self.send(self.initial_message)
await self.send(message)
return
assert self.request is not None
body = message["body"]
response = Response(content=body, status_code=self.initial_message["status"])
# NOTE: be sure not to mutate the original headers directly, as another Response
# object might be holding a reference to the same list.
response.raw_headers = list(self.initial_message["headers"])
try:
await store_in_cache(response, request=self.request, cache=self.cache)
except ResponseNotCachable:
self.is_response_cachable = False
else:
# Apply any headers added or modified by 'store_in_cache()'.
self.initial_message["headers"] = list(response.raw_headers)
await self.send(self.initial_message)
await self.send(message)
def http_exception(self, request: Request, exc: HTTPException) -> Response:
if exc.status_code in {204, 304}:
return Response(b"", status_code=exc.status_code)
return PlainTextResponse(exc.detail, status_code=exc.status_code)
def get_legacy_data():
data = """
<header>
Apply shampoo here.
</header>
You'll have to use soap here.
"""
return Response(content=data, media_type="application/xml")