Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_issue40():
# make sure identifier lists in subselects are grouped
p = sqlparse.parse(('SELECT id, name FROM '
'(SELECT id, name FROM bar) as foo'))[0]
assert len(p.tokens) == 7
assert p.tokens[2].__class__ == sql.IdentifierList
assert p.tokens[-1].__class__ == sql.Identifier
assert p.tokens[-1].get_name() == 'foo'
sp = p.tokens[-1].tokens[0]
assert sp.tokens[3].__class__ == sql.IdentifierList
# make sure that formatting works as expected
s = sqlparse.format('SELECT id == name FROM '
'(SELECT id, name FROM bar)', reindent=True)
assert s == '\n'.join([
'SELECT id == name',
'FROM',
' (SELECT id,',
' name',
' FROM bar)'])
s = sqlparse.format('SELECT id == name FROM '
'(SELECT id, name FROM bar) as foo', reindent=True)
assert s == '\n'.join([
'SELECT id == name',
'FROM',
' (SELECT id,',
bb.geometry,
CAST(Area(Transform(geometry,26946)) AS REAL) AS area,
CAST(b02001001 AS INTEGER) AS total_pop,
FROM d02G003_geofile AS geo
JOIN d024004_b02001_estimates AS b02001e ON geo.stusab = b02001e.stusab AND geo.logrecno = b02001e.logrecno
JOIN blockgroup_boundaries AS bb ON geo.state = bb.state AND geo.county = bb.county AND bb.tract = geo.tract AND bb.blkgrp = geo.blkgrp
WHERE geo.sumlevel = 150 AND geo.state = 6 and geo.county = 73
"""
import sqlparse
import sqlparse.sql
r = sqlparse.parse(sql)
for t in r[0].tokens:
if isinstance(t, sqlparse.sql.IdentifierList):
for i in t.get_identifiers():
pass
def is_identifierlist(token):
return isinstance(token, sqlparse.sql.IdentifierList)
def extract_table_identifiers(token_stream):
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
yield identifier.get_name()
elif isinstance(item, Identifier):
yield item.get_name()
# It's a bug to check for Keyword here, but in the example
# above some tables names are identified as keywords...
elif isinstance(item, str):
if item.strip() != '' and re.search('\w+',item):
yield item
elif item.ttype is Keyword:
yield item.value
if schema_name and not schema_quoted:
schema_name = schema_name.lower()
quote_count = item.value.count('"')
name_quoted = quote_count > 2 or (quote_count and not schema_quoted)
alias_quoted = alias and item.value[-1] == '"'
if alias_quoted or name_quoted and not alias and name.islower():
alias = '"' + (alias or name) + '"'
if name and not name_quoted and not name.islower():
if not alias:
alias = name
name = name.lower()
return schema_name, name, alias
try:
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
# Sometimes Keywords (such as FROM ) are classified as
# identifiers which don't have the get_real_name() method.
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
is_function = allow_functions and _identifier_is_function(
identifier
)
except AttributeError:
continue
if real_name:
yield TableReference(
schema_name, real_name, identifier.get_alias(), is_function
)
elif isinstance(item, Identifier):
def group_identifier_list(tlist):
[group_identifier_list(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.IdentifierList)]
idx = 0
# Allowed list items
fend1_funcs = [lambda t: isinstance(t, (sql.Identifier, sql.Function,
sql.Case)),
lambda t: t.is_whitespace(),
lambda t: t.ttype == T.Name,
lambda t: t.ttype == T.Wildcard,
lambda t: t.match(T.Keyword, 'null'),
lambda t: t.ttype == T.Number.Integer,
lambda t: t.ttype == T.String.Single,
lambda t: isinstance(t, sql.Comparison),
]
tcomma = tlist.token_next_match(idx, T.Punctuation, ',')
start = None
while tcomma is not None:
before = tlist.token_prev(tcomma)
first_token = self.token_first(skip_cm=True)
if first_token is None:
# An "empty" statement that either has not tokens at all
# or only whitespace tokens.
return 'UNKNOWN'
elif first_token.ttype in (T.Keyword.DML, T.Keyword.DDL):
return first_token.normalized
elif first_token.ttype == T.Keyword.CTE:
# The WITH keyword should be followed by either an Identifier or
# an IdentifierList containing the CTE definitions; the actual
# DML keyword (e.g. SELECT, INSERT) will follow next.
fidx = self.token_index(first_token)
tidx, token = self.token_next(fidx, skip_ws=True)
if isinstance(token, (Identifier, IdentifierList)):
_, dml_keyword = self.token_next(tidx, skip_ws=True)
if dml_keyword is not None \
and dml_keyword.ttype == T.Keyword.DML:
return dml_keyword.normalized
# Hmm, probably invalid syntax, so return unknown.
return 'UNKNOWN'
def extract_table_identifiers(token_stream):
"""yields tuples of (schema_name, table_name, table_alias)"""
for item in token_stream:
if isinstance(item, IdentifierList):
for identifier in item.get_identifiers():
# Sometimes Keywords (such as FROM ) are classified as
# identifiers which don't have the get_real_name() method.
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
except AttributeError:
continue
if real_name:
yield (schema_name, real_name, identifier.get_alias())
elif isinstance(item, Identifier):
real_name = item.get_real_name()
schema_name = item.get_parent_name()
if real_name:
yield (schema_name, real_name, item.get_alias())
def group_identifier_list(tlist):
[group_identifier_list(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.IdentifierList)]
idx = 0
# Allowed list items
fend1_funcs = [lambda t: isinstance(t, (sql.Identifier, sql.Function,
sql.Case)),
lambda t: t.is_whitespace(),
lambda t: t.ttype == T.Name,
lambda t: t.ttype == T.Wildcard,
lambda t: t.match(T.Keyword, 'null'),
lambda t: t.match(T.Keyword, 'role'),
lambda t: t.ttype == T.Number.Integer,
lambda t: t.ttype == T.String.Single,
lambda t: t.ttype == T.Name.Placeholder,
lambda t: isinstance(t, sql.Comparison),
lambda t: isinstance(t, sql.Comment),
]
tcomma = tlist.token_next_match(idx, T.Punctuation, ',')
def find_the_identifiers(sql_part):
# logging.debug("sql_part is of type %s", type(sql_part))
result = []
for token in sql_part:
# logging.debug("Token |%s| is of type |%s|", token, type(token))
if isinstance(token, Identifier):
# logging.debug("Found an Identifier: |%s|", token)
result.append(str(token))
elif isinstance(token, IdentifierList):
# logging.debug("Found an IdentifierList: |%s|", token)
result.extend(find_the_identifiers_in_list(token))
return result