Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle_request (self, handler):
self.request = handler.request
stream_id = self.get_new_stream_id ()
self.add_request (stream_id, handler)
self.asyncon.set_active (False)
headers, content_encoded = handler.get_request_header ("2.0", False)
payload = handler.get_request_payload ()
producer = None
if payload:
if type (payload) is bytes:
producer = producers.globbing_producer (
producers.simple_producer (payload)
)
else:
# multipart, grpc_producer
producer = producers.globbing_producer (payload)
header = h2header_producer (stream_id, headers, producer, self.conn, self._clock)
self.asyncon.push (header)
if producer:
payload = h2frame_producer (stream_id, 0, 1, producer, self.conn, self._clock)
# is it proper?
#payload = producers.ready_globbing_producer (payload)
self.asyncon.push (payload)
self.request = handler.request
stream_id = self.get_new_stream_id ()
self.add_request (stream_id, handler)
self.asyncon.set_active (False)
headers, content_encoded = handler.get_request_header ("2.0", False)
payload = handler.get_request_payload ()
producer = None
if payload:
if type (payload) is bytes:
producer = producers.globbing_producer (
producers.simple_producer (payload)
)
else:
# multipart, grpc_producer
producer = producers.globbing_producer (payload)
header = h2header_producer (stream_id, headers, producer, self.conn, self._clock)
self.asyncon.push (header)
if producer:
payload = h2frame_producer (stream_id, 0, 1, producer, self.conn, self._clock)
# is it proper?
#payload = producers.ready_globbing_producer (payload)
self.asyncon.push (payload)