Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connectmysql_select(self, sql):# 查询数据库
self.conn = pymysql.connect(host=self.host, port=self.port, password=self.password, db=self.db, charset='utf8')
self.cursor = self.conn.cursor(pymysql.cursors.SSCursor)
self.cursor.execute(sql)
result = self.cursor.fetchall()
self.conn.commit()
self.conn.close()
# desc = self.cursor.description
column_name_max_size = max(len(i[0]) for i in self.cursor.description)
data = []
for result in result:
row = map(lambda x, y: (x, y), (i[0] for i in self.cursor.description), result)
for each_column in row:
data.append(str(each_column[0].rjust(column_name_max_size)) + " " + ":" + " " + str(
each_column[1]))
return data
def mysql_connection(self):
"""
Creates a MySQL database connection
"""
return pymysql.connect(host=DATABASE_HOST,
user=DATABASE_USER,
password=DATABASE_PASS,
db=DATABASE_DB,
charset='utf8mb4',
cursorclass=pymysql.cursors.SSCursor)
msg = f'正在处理数据'
self.pull_msg(msg)
self.execute_log.append(msg)
for row in rows:
# 过滤掉特殊字符
filter_illegal_characters_row = list(
map(
(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x), row
)
)
ws.append(filter_illegal_characters_row)
wb.save(self.file)
elif self.affected_row > 100000:
# 当导出数据量大于10W时,使用SSCursor进行迭代读取
self.conn.cursorclass = pymysql.cursors.SSCursor
with self.conn.cursor() as cursor:
msg = f'正在导出SQL:{self.sql}'
self.pull_msg(msg)
self.execute_log.append(msg)
cursor.execute(self.sql)
while True:
row = cursor.fetchone()
if row:
# 过滤掉特殊字符
filter_illegal_characters_row = list(
map(
(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x), row
)
)
ws.append(filter_illegal_characters_row)
msg = f'正在处理数据...'
self.pull_msg(msg)
self.execute_log.append(msg)
for row in rows:
# 过滤掉特殊字符
filter_illegal_characters_row = list(
map(
(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x), row
)
)
ws.append(filter_illegal_characters_row)
wb.save(self.file)
elif self.affected_row > 100000:
# 当导出数据量大于10W时,使用SSCursor进行迭代读取
self.conn.cursorclass = pymysql.cursors.SSCursor
with self.conn.cursor() as cursor:
msg = f'正在导出SQL:{self.sql}'
self.pull_msg(msg)
self.execute_log.append(msg)
cursor.execute(self.sql)
while True:
row = cursor.fetchone()
if row:
# 过滤掉特殊字符
filter_illegal_characters_row = list(
map(
(lambda x: ILLEGAL_CHARACTERS_RE.sub(r'', x) if isinstance(x, str) else x), row
)
)
ws.append(filter_illegal_characters_row)
return self
async def __anext__(self):
result = await async_call_method(self._cursor.fetchone)
if result is None:
raise StopAsyncIteration()
return result
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
""")
setattr(OriginSSCursor, "__tormysql_class__", SSCursor)
class SSDictCursor(SSCursor):
__delegate_class__ = OriginSSDictCursor
setattr(OriginSSDictCursor, "__tormysql_class__", SSDictCursor)
"""
The method creates a new connection to the mysql database.
The connection is made using the unbuffered cursor factory.
"""
db_conn = self.source_config["db_conn"]
db_conn = {key:str(value) for key, value in db_conn.items()}
db_conn["port"] = int(db_conn["port"])
db_conn["connect_timeout"] = int(db_conn["connect_timeout"])
self.conn_unbuffered=pymysql.connect(
host = db_conn["host"],
user = db_conn["user"],
port = db_conn["port"],
password = db_conn["password"],
charset = db_conn["charset"],
connect_timeout = db_conn["connect_timeout"],
cursorclass=pymysql.cursors.SSCursor
)
self.charset = db_conn["charset"]
self.cursor_unbuffered = self.conn_unbuffered.cursor()
"""
sql_conn = get_db_connection(self.data)
exp_query = 'SELECT num_hierarchy_levels FROM experiment WHERE id = %s'
job_query_args = dict(col=col, exp=exp, chan=chan)
job_query = (
'SELECT id, resolution FROM ingest_job ' +
'WHERE collection_id = %(col)s AND ' +
'experiment_id = %(exp)s AND ' +
'channel_id = %(chan)s'
)
num_hierarchy_levels = 1
res_job_ids = []
try:
with sql_conn.cursor(pymysql.cursors.SSCursor) as cursor:
num_rows = cursor.execute(exp_query, str(exp))
if num_rows > 0:
exp_row = cursor.fetchone()
num_hierarchy_levels = exp_row[0]
num_rows = cursor.execute(job_query, job_query_args)
if num_rows == 0:
return res_job_ids
for row in cursor.fetchall_unbuffered():
res_job_ids.append(ResJobId(job_id=row[0], res=row[1]))
for i in range(row[1]+1, num_hierarchy_levels):
res_job_ids.append(ResJobId(job_id=row[0], res=i))
finally:
sql_conn.close()
return res_job_ids
def _open(self):
""" DO NOT USE THIS UNLESS YOU close() FIRST"""
try:
self.db = connect(
host=self.settings.host,
port=self.settings.port,
user=coalesce(self.settings.username, self.settings.user),
passwd=coalesce(self.settings.password, self.settings.passwd),
db=coalesce(self.settings.schema, self.settings.db),
read_timeout=coalesce(self.settings.read_timeout, (EXECUTE_TIMEOUT / 1000) - 10 if EXECUTE_TIMEOUT else None, 5*60),
charset=u"utf8",
use_unicode=True,
ssl=coalesce(self.settings.ssl, None),
cursorclass=cursors.SSCursor
)
except Exception as e:
if self.settings.host.find("://") == -1:
Log.error(
u"Failure to connect to {{host}}:{{port}}",
host=self.settings.host,
port=self.settings.port,
cause=e
)
else:
Log.error(u"Failure to connect. PROTOCOL PREFIX IS PROBABLY BAD", e)
self.cursor = None
self.partial_rollback = False
self.transaction_level = 0
self.backlog = [] # accumulate the write commands so they are sent at once
if self.readonly: