Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_issue193_splitting_function():
sql = """ CREATE FUNCTION a(x VARCHAR(20)) RETURNS VARCHAR(20)
BEGIN
DECLARE y VARCHAR(20);
RETURN x;
END;
SELECT * FROM a.b;"""
statements = sqlparse.split(sql)
assert len(statements) == 2
def test_psql_quotation_marks():
# issue83
# regression: make sure plain $$ work
t = sqlparse.split("""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $$
....
$$ LANGUAGE plpgsql;""")
assert len(t) == 2
# make sure $SOMETHING$ works too
t = sqlparse.split("""
CREATE OR REPLACE FUNCTION testfunc1(integer) RETURNS integer AS $PROC_1$
....
$PROC_1$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION testfunc2(integer) RETURNS integer AS $PROC_2$
....
$PROC_2$ LANGUAGE plpgsql;""")
assert len(t) == 2
def test_split_cursor_declare():
sql = ('DECLARE CURSOR "foo" AS SELECT 1;\n'
'SELECT 2;')
stmts = sqlparse.split(sql)
assert len(stmts) == 2
def test_duplicate_linebreaks(self):
# issue3
f = lambda sql: sqlparse.format(sql, reindent=True)
s = 'select c1 -- column1\nfrom foo'
assert f(s) == '\n'.join([
'select c1 -- column1',
'from foo'])
s = 'select c1 -- column1\nfrom foo'
r = sqlparse.format(s, reindent=True, strip_comments=True)
assert r == '\n'.join([
'select c1',
'from foo'])
s = 'select c1\nfrom foo\norder by c1'
assert f(s) == '\n'.join([
'select c1',
'from foo',
'order by c1'])
s = 'select c1 from t1 where (c1 = 1) order by c1'
assert f(s) == '\n'.join([
'select c1',
'from t1',
'where (c1 = 1)',
'order by c1'])
INSTALL build.example.com-casters-simple;
INSTALL build.example.com-casters-simple_stats;
MATERIALIZE build.example.com-casters-integers;
MATERIALIZE build.example.com-casters-simple_stats;
SELECT t1.uuid AS t1_uuid, t2.float_a AS t2_float_a, t3.a AS t3_a
FROM build.example.com-casters-simple AS t1
JOIN build.example.com-casters-simple_stats AS t2 ON t1.id = t2.index
JOIN build.example.com-casters-integers AS t3 ON t3_a = t2.index;
CREATE VIEW view1 AS SELECT col1 as c1, col2 as c2 FROM table1 WHERE foo is None and bar is baz;
"""
statements = sqlparse.parse(sqlparse.format(sql, strip_comments=True))
rec_keys = ['statement', 'install', 'materialize', 'tables', 'drop', 'indexes', 'joins']
expected = [
[u'INSTALL p00casters006003', set([u'p00casters006003']), None, None, None, None, None],
[u'INSTALL p00casters002003', set([u'p00casters002003']), None, None, None, None, None],
[u'MATERIALIZE p00casters004003', None, set([u'p00casters004003']), None, None, None, None],
[u'MATERIALIZE p00casters002003', None, set([u'p00casters002003']), None, None, None, None],
[u'SELECT t1.uuid AS t1_uuid, t2.float_a AS t2_float_a, t3.a AS t3_a FROM p00casters006003 AS t1 JOIN p00casters002003 AS t2 ON t1.id = t2.index JOIN p00casters004003 AS t3 ON t3_a = t2.index',
None, set([u'p00casters004003', u'p00casters006003', u'p00casters002003']), None, None,
set([(u'p00casters006003', (u'id',)), (u'p00casters002003', (u'index',))]), None],
[u'CREATE VIEW view1 AS SELECT col1 as c1, col2 as c2 FROM table1 WHERE foo is None and bar is baz',
None, None, None, ['DROP VIEW IF EXISTS view1;'], None, None],
[None, None, None, None, None, None, None]
]
def test_identifiercase_quotes(self):
sql = 'select * from "foo"."bar"'
res = sqlparse.format(sql, identifier_case="upper")
assert res == 'select * from "foo"."bar"'
def test_issue212_py2unicode():
t1 = sql.Token(T.String, u'schöner ')
t2 = sql.Token(T.String, 'bug')
token_list = sql.TokenList([t1, t2])
assert str(token_list) == 'schöner bug'
def test_token_str():
token = sql.Token(None, 'FoO')
assert str(token) == 'FoO'
def test_tokenlist_first():
p = sqlparse.parse(' select foo')[0]
first = p.token_first()
assert first.value == 'select'
assert p.token_first(skip_ws=False).value == ' '
assert sql.TokenList([]).token_first() is None
def test_issue40():
# make sure identifier lists in subselects are grouped
p = sqlparse.parse(('SELECT id, name FROM '
'(SELECT id, name FROM bar) as foo'))[0]
assert len(p.tokens) == 7
assert p.tokens[2].__class__ == sql.IdentifierList
assert p.tokens[-1].__class__ == sql.Identifier
assert p.tokens[-1].get_name() == 'foo'
sp = p.tokens[-1].tokens[0]
assert sp.tokens[3].__class__ == sql.IdentifierList
# make sure that formatting works as expected
s = sqlparse.format('SELECT id == name FROM '
'(SELECT id, name FROM bar)', reindent=True)
assert s == '\n'.join([
'SELECT id == name',
'FROM',
' (SELECT id,',
' name',
' FROM bar)'])
s = sqlparse.format('SELECT id == name FROM '
'(SELECT id, name FROM bar) as foo', reindent=True)
assert s == '\n'.join([
'SELECT id == name',
'FROM',
' (SELECT id,',