How to use the starlette.responses.Response function in starlette

To help you get started, we’ve selected a few starlette examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github sage-org / sage-engine / sage / http_server / fastapi.py View on Github external
async def server_void(request: Request):
        """Describe all RDF datasets hosted by the Sage endpoint"""
        try:
            mimetypes = request.headers['accept'].split(",")
            url = urlunparse(request.url.components[0:3] + (None, None, None))
            if url.endswith('/'):
                url = url[0:len(url) - 1]
            void_format, res_mimetype = choose_void_format(mimetypes)
            description = many_void(url, dataset, void_format)
            return Response(description, media_type=res_mimetype)
        except Exception as err:
            logging.error(err)
            raise HTTPException(status_code=500, detail=str(err))
github encode / hostedapi / source / endpoints.py View on Github external
headers = {"Content-Disposition": f'attachment; filename="{table_id}.json"'}
        return Response(content, headers=headers)
    elif export == "csv":
        output = io.StringIO()
        writer = csv.writer(output)
        queryset = await datasource.all()

        headers = [field.title for field in datasource.schema.fields.values()]
        writer.writerow(headers)
        for item in queryset:
            row = [item.get(key, default="") for key in datasource.schema.fields.keys()]
            writer.writerow(row)

        content = output.getvalue()
        headers = {"Content-Disposition": f'attachment; filename="{table_id}.csv"'}
        return Response(content, headers=headers)

    #  Perform pagination
    datasource = datasource.offset(offset).limit(PAGE_SIZE)
    queryset = await datasource.all()

    # Get pagination and column controls to render on the page
    column_controls = ordering.get_column_controls(
        url=request.url,
        columns=columns,
        selected_column=order_column,
        is_reverse=is_reverse,
    )
    page_controls = pagination.get_page_controls(
        url=request.url, current_page=current_page, total_pages=total_pages
    )
github florimondmanca / asgi-caches / src / asgi_caches / utils / cache.py View on Github external
def deserialize_response(serialized_response: dict) -> Response:
    """
    Given the JSON representation of a response, re-build the
    original response object.
    """
    return Response(
        content=json_string_to_bytes(serialized_response["content"]),
        status_code=serialized_response["status_code"],
        headers=serialized_response["headers"],
    )
github drkane / find-that-charity / findthatcharity / apps / admin.py View on Github external
{s['items']:,.0f} items found.
        ''')
        if s['errors'] > 0:
            fe.title('Spider {} completed'.format(s['spider']))
        else:
            fe.title('Spider {} completed with errors'.format(s['spider']))
        fe.link(href=request.url_for('get_scrape', scrape_id=s['id']))

    if feedformat == 'atom':
        return Response(
            fg.atom_str(pretty=True),
            media_type='text/plain',
            # media_type='application/atom+xml',
        )
    return Response(
        fg.rss_str(pretty=True),
        media_type='text/plain',
        # media_type='application/rss+xml',
github ClericPy / newspaper / newspaper / views.py View on Github external
title_date: str = ttime(time.time() - 86400 * date_delta)[:10]
        # 当日 0 点发布前一天的结果
        pubDate: str = ttime(ptime(ttime(time.time() - 86400 *
                                         (date_delta - 1))[:10],
                                   fmt='%Y-%m-%d'),
                             fmt='%a, %d %b %Y')
        link: str = f'https://{ONLINE_HOST}/newspaper/daily.python/{title_date}?lang={language}'
        item: dict = {
            'title': f'Python Daily [{title_date}]',
            'link': link,
            'guid': link,
            'pubDate': pubDate
        }
        xml_data['items'].append(item)
    xml: str = gen_rss(xml_data)
    return Response(xml, media_type='text/xml')
github encode / starlette / starlette / responses.py View on Github external
ensure_ascii=False,
            allow_nan=False,
            indent=None,
            separators=(",", ":"),
        ).encode("utf-8")


class UJSONResponse(JSONResponse):
    media_type = "application/json"

    def render(self, content: typing.Any) -> bytes:
        assert ujson is not None, "ujson must be installed to use UJSONResponse"
        return ujson.dumps(content, ensure_ascii=False).encode("utf-8")


class RedirectResponse(Response):
    def __init__(
        self, url: typing.Union[str, URL], status_code: int = 307, headers: dict = None
    ) -> None:
        super().__init__(content=b"", status_code=status_code, headers=headers)
        self.headers["location"] = quote_plus(str(url), safe=":/%#?&=@[]!$&'()*+,;")


class StreamingResponse(Response):
    def __init__(
        self,
        content: typing.Any,
        status_code: int = 200,
        headers: dict = None,
        media_type: str = None,
        background: BackgroundTask = None,
    ) -> None:
github sage-org / sage-engine / sage / http_server / server.py View on Github external
* next_link: Link to a SaGe saved plan. Use `None` if there is no one, i.e., the query execution has completed during the quantum.
      * stats: Statistics about query execution.
      * skol_url: URL used for the skolemization of blank nodes.

    Returns:
      An HTTP response built from the input mimetypes and the SPARQL query results.
    """
    if "application/json" in mimetypes:
        iterator = responses.raw_json_streaming(bindings, next_page, stats, skol_url)
        return StreamingResponse(iterator, media_type="application/json")
    elif "application/sparql-results+json" in mimetypes:
        iterator = responses.w3c_json_streaming(bindings, next_page, stats, skol_url)
        return StreamingResponse(iterator, media_type="application/json")
    elif "application/xml" in mimetypes or "application/sparql-results+xml" in mimetypes:
        iterator = responses.w3c_xml(bindings, next_page, stats)
        return Response(iterator, media_type="application/xml")
    return JSONResponse({
        "bindings": bindings,
        "next": next_page,
        "stats": stats
    })
github florimondmanca / asgi-caches / src / asgi_caches / middleware.py View on Github external
# Defer sending this message until we figured out
            # whether the response can be cached.
            self.initial_message = message
            return

        assert message["type"] == "http.response.body"
        if message.get("more_body", False):
            logger.trace("response_not_cachable reason=is_streaming")
            self.is_response_cachable = False
            await self.send(self.initial_message)
            await self.send(message)
            return

        assert self.request is not None
        body = message["body"]
        response = Response(content=body, status_code=self.initial_message["status"])
        # NOTE: be sure not to mutate the original headers directly, as another Response
        # object might be holding a reference to the same list.
        response.raw_headers = list(self.initial_message["headers"])

        try:
            await store_in_cache(response, request=self.request, cache=self.cache)
        except ResponseNotCachable:
            self.is_response_cachable = False
        else:
            # Apply any headers added or modified by 'store_in_cache()'.
            self.initial_message["headers"] = list(response.raw_headers)

        await self.send(self.initial_message)
        await self.send(message)
github encode / starlette / starlette / exceptions.py View on Github external
def http_exception(self, request: Request, exc: HTTPException) -> Response:
        if exc.status_code in {204, 304}:
            return Response(b"", status_code=exc.status_code)
        return PlainTextResponse(exc.detail, status_code=exc.status_code)
github tiangolo / fastapi / docs / src / response_directly / tutorial002.py View on Github external
def get_legacy_data():
    data = """
    
    <header>
        Apply shampoo here.
    </header>
    
        You'll have to use soap here.
    
    
    """
    return Response(content=data, media_type="application/xml")