Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
user=self.user,
password=self.password,
host=self.host,
port=self.port,
loop=self.loop,
**kwargs)
# TODO: fix this, should autocommit be enabled by default?
await conn.autocommit(True)
engine = mock.Mock()
engine.dialect = sa.engine._dialect
def release(*args):
return
engine.release = release
ret = sa.SAConnection(conn, engine)
return ret
host=self.host,
loop=self.loop,
port=self.port,
**kwargs)
await conn.autocommit(True)
cur = await conn.cursor()
await cur.execute("DROP TABLE IF EXISTS sa_tbl")
await cur.execute("CREATE TABLE sa_tbl "
"(id serial, name varchar(255))")
await cur.execute("INSERT INTO sa_tbl (name)"
"VALUES ('first')")
await cur._connection.commit()
# yield from cur.close()
engine = mock.Mock()
engine.dialect = sa.engine._dialect
return sa.SAConnection(conn, engine)
def make_engine(self, use_loop=True, **kwargs):
if use_loop:
return (yield from sa.create_engine(db=self.db,
user=self.user,
password=self.password,
host=self.host,
loop=self.loop,
minsize=10,
**kwargs))
else:
return (yield from sa.create_engine(db=self.db,
user=self.user,
password=self.password,
host=self.host,
minsize=10,
**kwargs))
async def test_async_iter_over_sa_result(mysql_params, table, loop):
ret = []
engine = await sa.create_engine(**mysql_params, loop=loop)
conn = await engine.acquire()
async for i in (await conn.execute(tbl.select())):
ret.append(i)
assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
engine.terminate()
def make_engine(self, use_loop=True, **kwargs):
if use_loop:
return (yield from sa.create_engine(db=self.db,
user=self.user,
password=self.password,
host=self.host,
loop=self.loop,
minsize=10,
**kwargs))
else:
return (yield from sa.create_engine(db=self.db,
user=self.user,
password=self.password,
host=self.host,
minsize=10,
**kwargs))
async def init_mysql(conf, loop):
engine = await aiomysql.sa.create_engine(
db=conf['database'],
user=conf['user'],
password=conf['password'],
host=conf['host'],
port=conf['port'],
minsize=1,
maxsize=2,
loop=loop)
return engine
engine = loop.run_until_complete(init_mysql(mysql_params, loop))
def go():
conn = yield from self.engine.acquire()
tr = yield from conn.begin()
with self.assertRaises(sa.InvalidRequestError):
self.engine.release(conn)
del tr
self.loop.run_until_complete(go())
async def test_sa_connection(table, mysql_params, loop):
async with sa.create_engine(loop=loop, **mysql_params) as engine:
connection = await engine.acquire()
assert not connection.closed
async with connection:
ret = []
async for i in connection.execute(tbl.select()):
ret.append(i)
assert [(1, 'a'), (2, 'b'), (3, 'c')] == ret
assert connection.closed
async def test_incompatible_cursor_fails(loop, mysql_params):
mysql_params['cursorclass'] = DictCursor
with pytest.raises(sa.ArgumentError) as ctx:
await sa.create_engine(loop=loop, **mysql_params)
msg = 'SQLAlchemy engine does not support this cursor class'
assert str(ctx.value) == msg
async def init_db(app):
db_url = app['config']['database_url']
if db_url.startswith('mysql'):
engine = await aiomysql.sa.create_engine(db_url, loop=app.loop)
else:
engine = sa.create_engine(db_url)
if db_url.startswith('sqlite'):
users.table.create(engine)
wwwpushsubs.table.create(engine)
emailsubs.table.create(engine)
app['config']['db'] = engine