Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getConnection(self, nodb=False):
"""
@rtype L{pymysqlConnection}
"""
if not self.startMysqld():
raise RuntimeError("Can't start mysqld server!")
if nodb:
conn = pymysql.connect(unix_socket=self.getSocket(), user=self.getUser(), passwd=self.getPassword())
else:
if self._conn:
return self._conn
conv = pymysql.converters.conversions.copy()
conv[246]=float # convert decimals to floats
conv[10]=str # convert dates to strings
conn = self._conn = pymysql.connect(
unix_socket=self.getSocket(),
user=self.getUser(),
passwd=self.getPassword(),
db=self.getDbName(),
conv=conv
)
self._conn.autocommit(self.getAutocommit())
cur = conn.cursor()
if not self.getAutocommit():
cur.execute("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
cur.close()
:return converter: Converter object
"""
converter = None
try:
import MySQLdb.converters # pylint: disable=import-error
from MySQLdb.constants import FIELD_TYPE # pylint: disable=import-error
converter = MySQLdb.converters.conversions.copy()
converter[FIELD_TYPE.DECIMAL] = float
converter[FIELD_TYPE.NEWDECIMAL] = float
except ImportError:
try:
from pymysql.constants import FIELD_TYPE
from pymysql.converters import conversions as conv
converter = conv.copy()
converter[FIELD_TYPE.DECIMAL] = float
converter[FIELD_TYPE.NEWDECIMAL] = float
except ImportError:
raise RucioException('Trying to use MySQL without mysql-python or pymysql installed!')
return converter
def connect(self, database=None, user=None, password=None, host=None, port=None, charset=None):
db = (database or self.dbname)
user = (user or self.user)
password = (password or self.password)
host = (host or self.host)
port = (port or self.port)
charset = (charset or self.charset)
conv = conversions.copy()
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_mysql_timestamp(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
})
conn = pymysql.connect(
database=db, user=user, password=password, host=host, port=port,
use_unicode=True, charset=charset,autocommit=True, conv=conv,
client_flag=pymysql.constants.CLIENT.INTERACTIVE,
max_allowed_packet=1024 * 1024 * 1024,
)
if hasattr(self, 'conn'):
'\tuser: %r'
'\thost: %r'
'\tport: %r'
'\tsocket: %r'
'\tcharset: %r'
'\tlocal_infile: %r'
'\tssl: %r'
'\tssh_user: %r'
'\tssh_host: %r'
'\tssh_port: %r'
'\tssh_password: %r'
'\tssh_key_filename: %r',
db, user, host, port, socket, charset, local_infile, ssl,
ssh_user, ssh_host, ssh_port, ssh_password, ssh_key_filename
)
conv = conversions.copy()
conv.update({
FIELD_TYPE.TIMESTAMP: lambda obj: (convert_mysql_timestamp(obj) or obj),
FIELD_TYPE.DATETIME: lambda obj: (convert_datetime(obj) or obj),
FIELD_TYPE.TIME: lambda obj: (convert_timedelta(obj) or obj),
FIELD_TYPE.DATE: lambda obj: (convert_date(obj) or obj),
})
defer_connect = False
if ssh_host:
defer_connect = True
conn = pymysql.connect(
database=db, user=user, password=password, host=host, port=port,
unix_socket=socket, use_unicode=True, charset=charset,
autocommit=True, client_flag=pymysql.constants.CLIENT.INTERACTIVE,