Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, pool_recycle=-1, compiled_cache=None,
**kwargs):
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
The pool has *minsize* opened connections to MySQL server.
"""
deprecated_cursor_classes = [
DeserializationCursor, DictCursor, SSCursor, SSDictCursor,
]
cursorclass = kwargs.get('cursorclass', Cursor)
if not issubclass(cursorclass, Cursor) or any(
issubclass(cursorclass, cursor_class)
for cursor_class in deprecated_cursor_classes
):
raise ArgumentError('SQLAlchemy engine does not support '
'this cursor class')
coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop,
dialect=dialect, pool_recycle=pool_recycle,
compiled_cache=compiled_cache, **kwargs)
return _EngineContextManager(coro)
def cursor(self, *cursors):
"""Instantiates and returns a cursor
By default, :class:`Cursor` is returned. It is possible to also give a
custom cursor through the cursor_class parameter, but it needs to
be a subclass of :class:`Cursor`
:param cursor: custom cursor class.
:returns: instance of cursor, by default :class:`Cursor`
:raises TypeError: cursor_class is not a subclass of Cursor.
"""
self._ensure_alive()
self._last_usage = self._loop.time()
try:
if cursors and \
any(not issubclass(cursor, Cursor) for cursor in cursors):
raise TypeError('Custom cursor must be subclass of Cursor')
except TypeError:
raise TypeError('Custom cursor must be subclass of Cursor')
if cursors and len(cursors) == 1:
cur = cursors[0](self, self._echo)
elif cursors:
cursor_name = ''.join(map(lambda x: x.__name__, cursors)) \
.replace('Cursor', '') + 'Cursor'
cursor_class = type(cursor_name, cursors, {})
cur = cursor_class(self, self._echo)
else:
cur = self.cursorclass(self, self._echo)
fut = self._loop.create_future()
fut.set_result(cur)
return _ContextManager(fut)
self._fields = fields
if fields and self._rows:
self._rows = [self._conv_row(r) for r in self._rows]
def _conv_row(self, row):
if row is None:
return None
return self.dict_type(zip(self._fields, row))
class DictCursor(_DictCursorMixin, Cursor):
"""A cursor which returns results as a dictionary"""
class SSCursor(Cursor):
"""Unbuffered Cursor, mainly useful for queries that return a lot of
data, or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this, is the client uses much less memory,
and rows are returned much faster when traveling over a slow network,
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn't support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn't
possible to scroll backwards, as only the current row is held in memory.
"""
@asyncio.coroutine
def close(self):
point = name if dict_flag else index
with contextlib.suppress(ValueError, TypeError):
row[point] = json.loads(row[point])
if dict_flag:
return row
else:
return tuple(row)
def _conv_row(self, row):
if row is None:
return None
row = super()._conv_row(row)
return self._deserialization_row(row)
class DeserializationCursor(_DeserializationCursorMixin, Cursor):
"""A cursor automatic deserialization of json type fields"""
class _DictCursorMixin:
# You can override this to use OrderedDict or other dict-like types.
dict_type = dict
async def _do_get_result(self):
await super()._do_get_result()
fields = []
if self._description:
for f in self._result.fields:
name = f.name
if name in fields:
name = f.table_name + '.' + name
fields.append(name)
if fields and self._rows:
self._rows = [self._conv_row(r) for r in self._rows]
def _conv_row(self, row):
if row is None:
return None
row = super()._conv_row(row)
return self.dict_type(zip(self._fields, row))
class DictCursor(_DictCursorMixin, Cursor):
"""A cursor which returns results as a dictionary"""
class SSCursor(Cursor):
"""Unbuffered Cursor, mainly useful for queries that return a lot of
data, or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this, is the client uses much less memory,
and rows are returned much faster when traveling over a slow network,
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn't support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn't
possible to scroll backwards, as only the current row is held in memory.
"""
async def close(self):
conn = self._connection
if name in fields:
name = f.table_name + '.' + name
fields.append(name)
self._fields = fields
if fields and self._rows:
self._rows = [self._conv_row(r) for r in self._rows]
def _conv_row(self, row):
if row is None:
return None
row = super()._conv_row(row)
return self.dict_type(zip(self._fields, row))
class DictCursor(_DictCursorMixin, Cursor):
"""A cursor which returns results as a dictionary"""
class SSCursor(Cursor):
"""Unbuffered Cursor, mainly useful for queries that return a lot of
data, or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this, is the client uses much less memory,
and rows are returned much faster when traveling over a slow network,
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn't support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn't
possible to scroll backwards, as only the current row is held in memory.
def create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, pool_recycle=-1, compiled_cache=None,
**kwargs):
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
The pool has *minsize* opened connections to MySQL server.
"""
deprecated_cursor_classes = [
DeserializationCursor, DictCursor, SSCursor, SSDictCursor,
]
cursorclass = kwargs.get('cursorclass', Cursor)
if not issubclass(cursorclass, Cursor) or any(
issubclass(cursorclass, cursor_class)
for cursor_class in deprecated_cursor_classes
):
raise ArgumentError('SQLAlchemy engine does not support '
'this cursor class')
coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop,
dialect=dialect, pool_recycle=pool_recycle,
compiled_cache=compiled_cache, **kwargs)
return _EngineContextManager(coro)
def __init__(self, host="localhost", user=None, password="",
db=None, port=3306, unix_socket=None,
charset='', sql_mode=None,
read_default_file=None, conv=decoders, use_unicode=None,
client_flag=0, cursorclass=Cursor, init_command=None,
connect_timeout=None, read_default_group=None,
no_delay=None, autocommit=False, echo=False,
local_infile=False, loop=None, ssl=None, auth_plugin='',
program_name='', server_public_key=None):
"""
Establish a connection to the MySQL database. Accepts several
arguments:
:param host: Host where the database server is located
:param user: Username to log in as
:param password: Password to use.
:param db: Database to use, None to not use a particular one.
:param port: MySQL port to use, default is usually OK.
:param unix_socket: Optionally, you can use a unix socket rather
than TCP/IP.
:param charset: Charset you want to use.
name = f.name
if name in fields:
name = f.table_name + '.' + name
fields.append(name)
self._fields = fields
if fields and self._rows:
self._rows = [self._conv_row(r) for r in self._rows]
def _conv_row(self, row):
if row is None:
return None
return self.dict_type(zip(self._fields, row))
class DictCursor(_DictCursorMixin, Cursor):
"""A cursor which returns results as a dictionary"""
class SSCursor(Cursor):
"""Unbuffered Cursor, mainly useful for queries that return a lot of
data, or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch
rows as needed. The upside of this, is the client uses much less memory,
and rows are returned much faster when traveling over a slow network,
or if the result set is very big.
There are limitations, though. The MySQL protocol doesn't support
returning the total number of rows, so the only way to tell how many rows
there are is to iterate over every row returned. Also, it currently isn't
possible to scroll backwards, as only the current row is held in memory.