Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if mysql_module_name == 'MySQLdb':
conv[FIELD_TYPE.BLOB] = [(FLAG.BINARY, buffer)]
else:
if PY2:
def encode_buffer(val, encoders=None):
return string_literal(str(val), encoders)
conv[buffer] = encode_buffer
def encode_timedelta(val, encoders=None):
return string_literal(timedelta2str(val), encoders)
conv[timedelta] = encode_timedelta
conv[FIELD_TYPE.TIMESTAMP] = str2datetime
conv[FIELD_TYPE.DATETIME] = str2datetime
conv[FIELD_TYPE.TIME] = str2timedelta
kwargs['conv'] = conv
if 'charset' not in kwargs:
kwargs['charset'] = 'utf8'
kwargs['client_flag'] = kwargs.get('client_flag', 0) | CLIENT.FOUND_ROWS
return Pool(mysql_module, *args, **kwargs)
def __eq__(self, other):
from sets import BaseSet
if isinstance(other, BaseSet):
return super(DBAPISet, self).__eq__(self, other)
else:
return other in self
STRING = DBAPISet([FIELD_TYPE.ENUM, FIELD_TYPE.STRING,
FIELD_TYPE.VAR_STRING])
BINARY = DBAPISet([FIELD_TYPE.BLOB, FIELD_TYPE.LONG_BLOB,
FIELD_TYPE.MEDIUM_BLOB, FIELD_TYPE.TINY_BLOB])
NUMBER = DBAPISet([FIELD_TYPE.DECIMAL, FIELD_TYPE.DOUBLE, FIELD_TYPE.FLOAT,
FIELD_TYPE.INT24, FIELD_TYPE.LONG, FIELD_TYPE.LONGLONG,
FIELD_TYPE.TINY, FIELD_TYPE.YEAR])
DATE = DBAPISet([FIELD_TYPE.DATE, FIELD_TYPE.NEWDATE])
TIME = DBAPISet([FIELD_TYPE.TIME])
TIMESTAMP = DBAPISet([FIELD_TYPE.TIMESTAMP, FIELD_TYPE.DATETIME])
DATETIME = TIMESTAMP
ROWID = DBAPISet()
def Binary(x):
"""Return x as a binary type."""
return str(x)
def Connect(*args, **kwargs):
from connections import Connection
return Connection(*args, **kwargs)
connect = Connection = Connect
def get_connection(self):
warnings.filterwarnings('ignore', category=pymysql.Warning)
usessl = 0
if frappe.conf.db_ssl_ca and frappe.conf.db_ssl_cert and frappe.conf.db_ssl_key:
usessl = 1
ssl_params = {
'ca':frappe.conf.db_ssl_ca,
'cert':frappe.conf.db_ssl_cert,
'key':frappe.conf.db_ssl_key
}
conversions.update({
FIELD_TYPE.NEWDECIMAL: float,
FIELD_TYPE.DATETIME: get_datetime,
UnicodeWithAttrs: conversions[text_type]
})
if PY2:
conversions.update({
TimeDelta: conversions[binary_type]
})
if usessl:
conn = pymysql.connect(self.host, self.user or '', self.password or '',
port=self.port, charset='utf8mb4', use_unicode = True, ssl=ssl_params,
conv = conversions, local_infile = frappe.conf.local_infile)
else:
conn = pymysql.connect(self.host, self.user or '', self.password or '',
port=self.port, charset='utf8mb4', use_unicode = True, conv = conversions,
def convert_bytes_to_date(value) -> str:
return convert_bytes_to_str(value)
def convert_bytes_to_time(value) -> str:
return convert_bytes_to_str(value)
def convert_bytes_to_datetime(value) -> str:
return convert_bytes_to_str(value)
def convert_bytes_to_timedelta(value) -> str:
return convert_bytes_to_str(value)
MYSQL_DATATYPE_READER_MAP = {
FIELD_TYPE.BIT: convert_bytes_to_int,
FIELD_TYPE.TINY: convert_bytes_to_int,
FIELD_TYPE.SHORT: convert_bytes_to_int,
FIELD_TYPE.LONG: convert_bytes_to_int,
FIELD_TYPE.FLOAT: convert_bytes_to_float,
FIELD_TYPE.DOUBLE: convert_bytes_to_float,
FIELD_TYPE.LONGLONG: convert_bytes_to_long_long,
FIELD_TYPE.INT24: convert_bytes_to_int,
FIELD_TYPE.YEAR: convert_bytes_to_int,
FIELD_TYPE.TIMESTAMP: convert_bytes_to_datetime,
FIELD_TYPE.DATETIME: convert_bytes_to_datetime,
FIELD_TYPE.TIME: convert_bytes_to_time,
FIELD_TYPE.DATE: convert_bytes_to_date,
FIELD_TYPE.NEWDATE: convert_bytes_to_date,
FIELD_TYPE.SET: convert_bytes_to_str,
FIELD_TYPE.BLOB: convert_bytes_to_str,
FIELD_TYPE.TINY_BLOB: convert_bytes_to_str,
def connect(self, database=None, user=None, password=None, host=None, port=None, charset=None):
db = (database or self.dbname)
user = (user or self.user)
password = (password or self.password)
host = (host or self.host)
port = (port or self.port)
charset = (charset or self.charset)
conv = conversions.copy()
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_mysql_timestamp(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
})
conn = pymysql.connect(
database=db, user=user, password=password, host=host, port=port,
use_unicode=True, charset=charset,autocommit=True, conv=conv,
client_flag=pymysql.constants.CLIENT.INTERACTIVE,
max_allowed_packet=1024 * 1024 * 1024,
)
if hasattr(self, 'conn'):
self.conn.close()
self.conn = conn
self.dbname = db
self.user = user
encode_struct_time,
encode_timedelta,
encode_time,
TimeDelta_or_None,
)
encoders = {
time.struct_time: encode_struct_time,
datetime.timedelta: encode_timedelta,
datetime.time: encode_time,
}
decoders = {
FIELD_TYPE.TIME: TimeDelta_or_None,
FIELD_TYPE.NEWDECIMAL: float,
}
def notouch(x):
return x
class ResultSet(object):
def __init__(self, affected_rows=None, insert_id=None, description=None,
rows=None):
self.affected_rows = affected_rows
self.insert_id = insert_id
self.description = description
self.rows = rows
def connect(self, database=None, user=None, password=None, host=None, port=None, charset=None):
db = (database or self.dbname)
user = (user or self.user)
password = (password or self.password)
host = (host or self.host)
port = (port or self.port)
charset = (charset or self.charset)
conv = conversions.copy()
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_mysql_timestamp(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
})
conn = pymysql.connect(
database=db, user=user, password=password, host=host, port=port,
use_unicode=True, charset=charset,autocommit=True, conv=conv,
client_flag=pymysql.constants.CLIENT.INTERACTIVE,
max_allowed_packet=1024 * 1024 * 1024,
)
if hasattr(self, 'conn'):
self.conn.close()
self.conn = conn
def get_pool(provider, *args, **kwargs):
if 'conv' not in kwargs:
conv = mysql_converters.conversions.copy()
if mysql_module_name == 'MySQLdb':
conv[FIELD_TYPE.BLOB] = [(FLAG.BINARY, buffer)]
else:
if PY2:
def encode_buffer(val, encoders=None):
return string_literal(str(val), encoders)
conv[buffer] = encode_buffer
def encode_timedelta(val, encoders=None):
return string_literal(timedelta2str(val), encoders)
conv[timedelta] = encode_timedelta
conv[FIELD_TYPE.TIMESTAMP] = str2datetime
conv[FIELD_TYPE.DATETIME] = str2datetime
conv[FIELD_TYPE.TIME] = str2timedelta
kwargs['conv'] = conv
if 'charset' not in kwargs:
'\tport: %r'
'\tsocket: %r'
'\tcharset: %r'
'\tlocal_infile: %r'
'\tssl: %r'
'\tssh_user: %r'
'\tssh_host: %r'
'\tssh_port: %r'
'\tssh_password: %r'
'\tssh_key_filename: %r',
db, user, host, port, socket, charset, local_infile, ssl,
ssh_user, ssh_host, ssh_port, ssh_password, ssh_key_filename
)
conv = conversions.copy()
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_mysql_timestamp(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
})
defer_connect = False
if ssh_host:
defer_connect = True
conn = pymysql.connect(
database=db, user=user, password=password, host=host, port=port,
unix_socket=socket, use_unicode=True, charset=charset,
autocommit=True, client_flag=pymysql.constants.CLIENT.INTERACTIVE,
local_infile=local_infile, conv=conv, ssl=ssl, program_name="mycli",
defer_connect=defer_connect