Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except ImportError: # pragma: no cover
return response
else:
http_version, status_code, _, headers, stream = response
httpx_response = _Response(
status_code,
http_version=http_version.decode("ascii"),
headers=headers,
stream=stream, # type: ignore
request=request,
)
httpx_response.read()
return httpx_response
class ContentStream(AsyncByteStream, SyncByteStream):
def __init__(self, content: bytes) -> None:
self._content = content
self.close_func = None
self.aclose_func = None
def __iter__(self) -> Iterator[bytes]:
yield self._content
async def __aiter__(self) -> AsyncIterator[bytes]:
yield self._content
class ResponseTemplate:
def __init__(
self,
status_code: Optional[int] = None,
import httpx # TODO: Drop usage
from httpcore import AsyncByteStream, SyncByteStream
from httpx import Headers as HTTPXHeaders # TODO: Drop usage
URL = Tuple[bytes, bytes, Optional[int], bytes]
Headers = List[Tuple[bytes, bytes]]
TimeoutDict = Dict[str, Optional[float]]
Request = Tuple[
bytes, URL, Headers, Union[SyncByteStream, AsyncByteStream],
]
Response = Tuple[
bytes, # http version
int, # status code
bytes, # reason
Headers,
Union[SyncByteStream, AsyncByteStream], # body
]
HeaderTypes = Union[
HTTPXHeaders,
Dict[str, str],
Dict[bytes, bytes],
Sequence[Tuple[str, str]],
Sequence[Tuple[bytes, bytes]],
]
Regex = type(re.compile(""))
Kwargs = Dict[str, Any]
URLPatternTypes = Union[str, Pattern[str], URL]
ContentDataTypes = Union[
bytes, str, List, Dict, Callable, Exception,
]