Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def headers (self):
if self.__headerdict:
return self.__headerdict
headerdict = attrdict.CaseInsensitiveDict ()
for line in self.header:
try:
k, v = line.split (": ", 1)
except ValueError:
k, v = line.split (":", 1)
if k in headerdict:
try: headerdict [k].append (v)
except AttributeError:
headerdict [k] = [headerdict [k], v]
else:
headerdict [k] = v
self.__headerdict = headerdict
return headerdict
def normheader (headers):
nheader = attrdict.CaseInsensitiveDict ({})
if not headers:
return nheader
for k, v in type (headers) is dict and headers.items () or headers:
nheader [k] = v
return nheader
def fit_headers (self, headers):
if isinstance (headers, attrdict.CaseInsensitiveDict):
self.headers = headers
self.remove_header ("content-length", "connection")
if not self.headers.get ("accept"):
self.headers ["Accept"] = "*/*"
else:
self.headers = attrdict.CaseInsensitiveDict ()
self.headers ["Accept-Encoding"] = "gzip"
if headers:
for k, v in type (headers) is dict and headers.items () or headers:
n = k.lower ()
if n in ("content-length", "connection"):
# reanalyze
continue
self.headers [k] = v
def __init__ (self, uri, method, params = (), headers = None, auth = None, logger = None, meta = {}, http_version = "2.0"):
self.uri = uri
self.method = method
self.params = params
self.auth = (auth and type (auth) is not tuple and tuple (auth.split (":", 1)) or auth)
self.logger = logger
self.meta = meta
self.http_version = http_version
self.address, self.path = self.split (uri)
self.headers = attrdict.CaseInsensitiveDict ({
"grpc-timeout": "10S",
"grpc-accept-encoding": "identity,gzip",
"user-agent": self.user_agent,
"message-type": self.params [0].__class__.__name__,
"te": 'trailers',
})
if self.use_compress:
self.headers ["grpc-encoding"] = "gzip",
self.payload = self.serialize ()
if not self.payload:
self.method = "GET"
else:
self.headers ["Content-Type"] = "application/grpc+proto"
def fit_headers (self, headers):
if isinstance (headers, attrdict.CaseInsensitiveDict):
self.headers = headers
self.remove_header ("content-length", "connection")
if not self.headers.get ("accept"):
self.headers ["Accept"] = "*/*"
else:
self.headers = attrdict.CaseInsensitiveDict ()
self.headers ["Accept-Encoding"] = "gzip"
if headers:
for k, v in type (headers) is dict and headers.items () or headers:
n = k.lower ()
if n in ("content-length", "connection"):
# reanalyze
continue
self.headers [k] = v