Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def _execute(self, query: str, *args) -> AsyncGenerator[Record, None]:
query_type = self.query_type(query)
if query_type == QueryTypes.FETCH:
query += " FORMAT TSVWithNamesAndTypes"
if args:
if query_type != QueryTypes.INSERT:
raise ChClientError(
"It is possible to pass arguments only for INSERT queries"
)
params = {**self.params, "query": query}
data = rows2ch(*args)
else:
params = self.params
data = query.encode()
async with self._session.post(
self.url, params=params, data=data
) as resp: # type: client.ClientResponse
if resp.status != 200:
raise ChClientError((await resp.read()).decode(errors='replace'))
if query_type == QueryTypes.FETCH:
rf = RecordsFabric(
def query_type(cls, query: str) -> QueryTypes:
check = query.lstrip()[:8].upper()
if any(
[
check.startswith("SELECT"),
check.startswith("SHOW"),
check.startswith("DESCRIBE"),
check.startswith("EXISTS"),
]
):
return QueryTypes.FETCH
if check.startswith("INSERT"):
return QueryTypes.INSERT
return QueryTypes.OTHER