Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_commit()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_init(self):
self.assert_(isinstance(sql.Literal('foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(u'foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(b'foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(42), sql.Literal))
self.assert_(isinstance(
sql.Literal(dt.date(2016, 12, 31)), sql.Literal))
sql.Identifier('pg_catalog', 'pg_database')
),
# Literal
sql.SQL('''SELECT {}''').format(sql.Literal('foobar')),
# Placeholder
sql.SQL('''SELECT {}''').format(sql.Placeholder())
], ids=('str', 'bytes', 'unicode', 'Composed',
'Identifier', 'Literal', 'Placeholder'))
def test_execute_sql(tracer, engine, connection, method, query):
# Check that executing with objects of ``sql.Composable`` subtypes doesn't
# raise any exceptions.
metadata.create_all(engine)
with tracer.start_active_span('test'):
cur = connection.cursor()
params = ('foobar', )
def cleanup_postgresql(connection, truncate_tables):
print('cleanup_postgreql({!r}, {!r})'.format(connection, truncate_tables))
for table in truncate_tables:
with connection.cursor() as cursor:
try:
cursor.execute(pgsql.SQL('TRUNCATE TABLE {}').format(
pgsql.Identifier(table)))
except psycopg2.ProgrammingError as exc:
if exc.pgcode != psycopg2.errorcodes.UNDEFINED_TABLE:
raise
print("Error truncating {!r} table: {}".format(table, exc))
config_manager = ConfigurationManager(
[required_config],
app_name='middleware',
app_description=__doc__,
values_source_list=[
{'logger': mock_logging},
environment,
],
argv_source=[]
)
config = config_manager.get_config()
self.conn = config.database.database_class(
config.database
).connection()
assert self.conn.get_transaction_status() == \
psycopg2.extensions.TRANSACTION_STATUS_IDLE
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10)")
time.sleep(1)
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_inet_cast(self):
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
o['type'] = self.schema + '.' + self.name
return o
## create type casters for whole list
##
for t in types:
if type(t) == str:
caster = DictComposite._from_db(t, conn)
elif type(t) in [tuple, list]:
caster = CompositeCaster(*t)
else:
raise Exception("invalid type %s in flexmap type list" % type(t))
## register item and array casters
##
register_type(caster.typecaster, conn)
if caster.array_typecaster is not None:
register_type(caster.array_typecaster, conn)
## remember caster under 'schema.typename'
##
casters['%s.%s' % (caster.schema, caster.name)] = caster
class DictAdapter(object):
"""
A dictionary adapter converting Python dicts to PostgreSQL
JSON, Hstore or Composite Types depending on the dict field 'type'.
"""
def __init__(self, adapted):
## remember value to be adaptated - a Python dict
self.adapted = adapted
def test_order_iter(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("select 5 as foo, 4 as bar, 33 as baz, 2 as qux")
r = curs.fetchone()
self.assertEqual(list(r.iterkeys()), ['foo', 'bar', 'baz', 'qux'])
self.assertEqual(list(r.itervalues()), [5, 4, 33, 2])
self.assertEqual(list(r.iteritems()),
[('foo', 5), ('bar', 4), ('baz', 33), ('qux', 2)])
r1 = pickle.loads(pickle.dumps(r))
self.assertEqual(list(r1.iterkeys()), list(r.iterkeys()))
self.assertEqual(list(r1.itervalues()), list(r.itervalues()))
self.assertEqual(list(r1.iteritems()), list(r.iteritems()))