Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_multidict_headers(aiohttp_client) -> None:
async def handler(request):
assert await request.read() == data
return web.Response()
app = web.Application()
app.router.add_post('/', handler)
client = await aiohttp_client(app)
data = b'sample data'
r = await client.post('/', data=data,
headers=MultiDict(
{'Content-Length': str(len(data))}))
assert r.status == 200
"""Parses a MIME type into its components.
mimetype is a MIME type string.
Returns a MimeType object.
Example:
>>> parse_mimetype('text/html; charset=utf-8')
MimeType(type='text', subtype='html', suffix='',
parameters={'charset': 'utf-8'})
"""
if not mimetype:
return MimeType(type='', subtype='', suffix='',
parameters=MultiDictProxy(MultiDict()))
parts = mimetype.split(';')
params = MultiDict() # type: MultiDict[str]
for item in parts[1:]:
if not item:
continue
key, value = cast(Tuple[str, str],
item.split('=', 1) if '=' in item else (item, ''))
params.add(key.lower().strip(), value.strip(' "'))
fulltype = parts[0].strip().lower()
if fulltype == '*':
fulltype = '*/*'
mtype, stype = (cast(Tuple[str, str], fulltype.split('/', 1))
if '/' in fulltype else (fulltype, ''))
def links(self) -> 'MultiDictProxy[MultiDictProxy[Union[str, URL]]]':
links_str = ", ".join(self.headers.getall("link", []))
if not links_str:
return MultiDictProxy(MultiDict())
links = MultiDict() # type: MultiDict[MultiDictProxy[Union[str, URL]]]
for val in re.split(r",(?=\s*<)", links_str):
match = re.match(r"\s*<(.*)>(.*)", val)
if match is None: # pragma: no cover
# the check exists to suppress mypy error
continue
url, params_str = match.groups()
params = params_str.split(";")[1:]
link = MultiDict() # type: MultiDict[Union[str, URL]]
for param in params:
match = re.match(
r"^\s*(\S*)\s*=\s*(['\"]?)(.*?)(\2)\s*$",
def __init__(self, request, options: Dict, stream: Callable) -> None:
self.request = request
self.options = options
self.stream = stream
self.limit = request.cache.cfg.stream_buffer
self.result = (MultiDict(), MultiDict())
def __init__(
self, url: typing.Union[str, yarl.URL],
client: ClientSessionType = None,
loop: asyncio.AbstractEventLoop = None,
headers: HeadersType = None,
client_owner: bool = True,
loads=json.loads, dumps=json.dumps, **kwargs
):
self.headers = MultiDict(headers or {})
self.headers.setdefault("Content-Type", "application/json")
self.headers.setdefault("User-Agent", self.USER_AGENT)
self.url = str(url)
self.loop = loop or asyncio.get_event_loop()
self.client = client or aiohttp.client.ClientSession(
loop=self.loop, **kwargs
)
self.client_owner = bool(client_owner)
self.loads = loads
self.dumps = dumps
async def pcap(request, response):
project = await Controller.instance().get_loaded_project(request.match_info["project_id"])
link = project.get_link(request.match_info["link_id"])
if not link.capturing:
raise aiohttp.web.HTTPConflict(text="This link has no active packet capture")
compute = link.compute
pcap_streaming_url = link.pcap_streaming_url()
headers = multidict.MultiDict(request.headers)
headers['Host'] = compute.host
headers['Router-Host'] = request.host
body = await request.read()
connector = aiohttp.TCPConnector(limit=None, force_close=True)
async with aiohttp.ClientSession(connector=connector, headers=headers) as session:
async with session.request(request.method, pcap_streaming_url, timeout=None, data=body) as response:
proxied_response = aiohttp.web.Response(headers=response.headers, status=response.status)
if response.headers.get('Transfer-Encoding', '').lower() == 'chunked':
proxied_response.enable_chunked_encoding()
await proxied_response.prepare(request)
async for data in response.content.iter_any():
if not data:
break
await proxied_response.write(data)
def query(self):
"""A MultiDictProxy representing parsed query parameters in decoded
representation.
Empty value if URL has no query part.
"""
ret = MultiDict(parse_qsl(self.raw_query_string, keep_blank_values=True))
return MultiDictProxy(ret)
form_parameters=None,
header_parameters=None,
files_parameters=None,
debug=False,
path_url_regex=PATH_URL_REGEX,
) -> None:
super().__init__(default_error_builder=default_error_builder)
self.debug = debug
self._handled_exceptions = [] # type: typing.List[HandledException]
self.app = app
self._exceptions_handler_installed = False
self.path_url_regex = path_url_regex
self.path_parameters = path_parameters or {}
self.query_parameters = query_parameters or MultiDict()
self.body_parameters = body_parameters or {}
self.form_parameters = form_parameters or MultiDict()
self.header_parameters = header_parameters or {}
self.files_parameters = files_parameters or {}
if not self._send_request_done:
raise ProtocolError('Request was not sent yet')
if self._recv_initial_metadata_done:
raise ProtocolError('Initial metadata was already received')
with self._wrapper:
headers = await self._stream.recv_headers()
self._recv_initial_metadata_done = True
headers_map = dict(headers)
self._raise_for_status(headers_map)
self._raise_for_content_type(headers_map)
if 'grpc-status' in headers_map: # trailers-only response
self._trailers_only = True
im = cast(_Metadata, MultiDict())
im, = await self._dispatch.recv_initial_metadata(im)
self.initial_metadata = im
tm = decode_metadata(headers)
tm, = await self._dispatch.recv_trailing_metadata(tm)
self.trailing_metadata = tm
self._raise_for_grpc_status(headers_map)
else:
im = decode_metadata(headers)
im, = await self._dispatch.recv_initial_metadata(im)
self.initial_metadata = im
async def task_command(self, ctx: Context, task_id: str):
"""
Get a task and return information specific to it
"""
if task_id.startswith("#"):
task_id = task_id[1:]
embed = Embed(colour=Colour.blurple())
embed.set_author(
name=f"ClickUp Task: #{task_id}",
icon_url="https://clickup.com/landing/favicons/favicon-32x32.png",
url=f"https://app.clickup.com/{ClickUpConfig.team}/{ClickUpConfig.space}/t/{task_id}"
)
params = MultiDict()
params.add("statuses[]", "Open")
params.add("statuses[]", "in progress")
params.add("statuses[]", "review")
params.add("statuses[]", "Closed")
response = await self.bot.http_session.get(
GET_TASKS_URL.format(team_id=ClickUpConfig.team), headers=HEADERS, params=params
)
result = await response.json()
if "err" in result:
log.error("ClickUp responded to the get task request with an error!\n"
f"error code: '{result['ECODE']}'\n"
f"error: {result['err']}")
embed.description = f"`{result['ECODE']}`: {result['err']}"
embed.colour = Colour.red()