Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def what_py_type(name: str, container: bool = False) -> BaseType:
""" Returns needed type class from clickhouse type name """
name = name.strip()
try:
return CH_TYPES_MAPPING[name.split("(")[0]](name, container=container)
except KeyError:
raise ChClientError(f"Unrecognized type name: '{name}'")
def py2ch(value):
try:
return PY_TYPES_MAPPING[type(value)](value)
except KeyError:
raise ChClientError(
f"Unrecognized type: '{type(value)}'. "
f"The value type should be exactly one of "
if args:
if query_type != QueryTypes.INSERT:
raise ChClientError(
"It is possible to pass arguments only for INSERT queries"
)
params = {**self.params, "query": query}
data = rows2ch(*args)
else:
params = self.params
data = query.encode()
async with self._session.post(
self.url, params=params, data=data
) as resp: # type: client.ClientResponse
if resp.status != 200:
raise ChClientError((await resp.read()).decode(errors='replace'))
if query_type == QueryTypes.FETCH:
rf = RecordsFabric(
names=await resp.content.readline(),
tps=await resp.content.readline(),
)
async for line in resp.content:
yield rf.new(line)
async def _execute(self, query: str, *args) -> AsyncGenerator[Record, None]:
query_type = self.query_type(query)
if query_type == QueryTypes.FETCH:
query += " FORMAT TSVWithNamesAndTypes"
if args:
if query_type != QueryTypes.INSERT:
raise ChClientError(
"It is possible to pass arguments only for INSERT queries"
)
params = {**self.params, "query": query}
data = rows2ch(*args)
else:
params = self.params
data = query.encode()
async with self._session.post(
self.url, params=params, data=data
) as resp: # type: client.ClientResponse
if resp.status != 200:
raise ChClientError((await resp.read()).decode(errors='replace'))
if query_type == QueryTypes.FETCH:
rf = RecordsFabric(
names=await resp.content.readline(),