Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handler(request):
self.assertEqual(len(data), request.content_length)
val = yield from request.read()
self.assertEqual(data, val)
return web.Response()
# read command and args
el_request = ET.fromstring(body)
el_body = el_request.find('envelope:Body', NS)
el_command = el_body.find('./')
command = el_command.tag.split('}')[1]
args = {el_arg.tag: el_arg.text for el_arg in el_command.findall('./')}
LOGGER.debug('Command: %s', command)
LOGGER.debug('Args: %s', args)
# do command
result = {}
for state_var in state_variables.values():
if state_var.matches_action(command):
result = yield from state_var.do_command(command, **args)
if result is None:
return web.Response(status=404)
LOGGER.debug('Result: %s', result)
# return result
response_base = '' \
'' \
'' \
''
el_envelope = ET.fromstring(response_base)
el_body = el_envelope.find('./envelope:Body', NS)
el_response = ET.SubElement(el_body, '{' + xml_ns + '}' + command + 'Response')
for key, value in result.items():
el_arg = ET.SubElement(el_response, key)
el_arg.text = str(value)
text = ET.tostring(el_envelope)
async def rpc_endpoint(self, request: aiohttp.web.Request):
self.events_in.put_nowait(await request.json())
return web.Response(text='{"Success": true}', content_type='application/json')
async def post_identity_import(self, request):
try:
content = await request.content.read()
request_dict = json.loads(content.decode())
except:
return web.Response(status=400, text='Need JSON request body.')
if not 'path' in request_dict:
return web.Response(status=400, text='Missing parameter "path".')
path = request_dict['path']
await self.app.import_user_key(path)
return JSONResponse({'status': 'ok'})
async def subscriptions(self, request):
if not self._accepting:
return web.Response(status=503)
web_sock = web.WebSocketResponse()
await web_sock.prepare(request)
async for msg in web_sock:
if msg.type == aiohttp.WSMsgType.TEXT:
await self._handle_message(web_sock, msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
LOGGER.warning(
'Web socket connection closed with exception %s',
web_sock.exception())
await web_sock.close()
await self._handle_unsubscribe(web_sock)
return web_sock
import aiohttp.web
import mimetypes
import asyncio
import logging
import jinja2
import sys
import os
from ..utils.get_resource import get_resource
from ..version import __version__
log = logging.getLogger(__name__)
renderer = jinja2.Environment(loader=jinja2.FileSystemLoader(get_resource('templates')))
class Response(aiohttp.web.Response):
def __init__(self, request=None, route=None, output_schema=None, headers={}, **kwargs):
self._route = route
self._output_schema = output_schema
self._request = request
headers['Connection'] = "close" # Disable keep alive because create trouble with old Qt (5.2, 5.3 and 5.4)
headers['X-Route'] = self._route
headers['Server'] = "Python/{0[0]}.{0[1]} GNS3/{1}".format(sys.version_info, __version__)
super().__init__(headers=headers, **kwargs)
def enable_chunked_encoding(self):
# Very important: do not send a content length otherwise QT closes the connection (curl can consume the feed)
if self.content_length:
self.content_length = None
super().enable_chunked_encoding()
def response_string(request, text: str, encoding='utf-8'):
response = web.Response()
response.content_type = 'text/html'
response.charset = encoding
response.text = text
return response
async def sort(request):
global idx
print("SORT: ")
image_idx = np.argsort(df_final.COLOR1.values)
temp = np.zeros(NX * NY, dtype="int") - 1
temp[: len(image_idx)] = image_idx
temp = temp.reshape((NY, NX))
idx = np.pad(
temp, ((0, NTILES - NY), (0, NTILES - NX)), "constant", constant_values=-1
)
print(idx[0][0:10])
response = web.Response(text="", status=200)
return response
dnsr = dns.message.make_response(dnsq)
dnsr.set_rcode(dns.rcode.SERVFAIL)
elif len(dnsr.answer):
ttl = min(r.ttl for r in dnsr.answer)
headers['cache-control'] = 'max-age={}'.format(ttl)
clientip = request.transport.get_extra_info('peername')[0]
interval = int((time.time() - self.time_stamp) * 1000)
self.logger.info('[HTTPS] {} (Original IP: {}) {} {}ms'.format(
clientip, request.remote, utils.dnsans2log(dnsr), interval))
if request.method == 'HEAD':
body = b''
else:
body = dnsr.to_wire()
return aiohttp.web.Response(
status=200,
body=body,
content_type=constants.DOH_MEDIA_TYPE,
headers=headers,
)
async def dispatch(request):
github = request.app.plugins["github"]
payload = await request.read()
try:
event = Event.from_http(request.headers, payload, secret=github.verify)
await github.router.dispatch(event, app=request.app)
except ValidationFailure:
LOG.debug(
"Github webhook failed verification: %s, %s", request.headers, payload
)
return Response(status=401)
except Exception as e:
LOG.exception(e)
return Response(status=500)
else:
return Response(status=200)