Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _split_kwds(self, tlist):
tidx, token = self._next_token(tlist)
while token:
# joins are special case. only consider the first word as aligner
if token.match(T.Keyword, self.join_words, regex=True):
token_indent = token.value.split()[0]
else:
token_indent = text_type(token)
tlist.insert_before(token, self.nl(token_indent))
tidx += 1
tidx, token = self._next_token(tlist, tidx)
# ANSI
# if normal token return
# wouldn't parenthesis increase/decrease a level?
# no, inside a paranthesis can't start new statement
if ttype not in T.Keyword:
return 0
# Everything after here is ttype = T.Keyword
# Also to note, once entered an If statement you are done and basically
# returning
unified = value.upper()
# three keywords begin with CREATE, but only one of them is DDL
# DDL Create though can contain more words such as "or replace"
if ttype is T.Keyword.DDL and unified.startswith('CREATE'):
self._is_create = True
return 0
# can have nested declare inside of being...
if unified == 'DECLARE' and self._is_create and self._begin_depth == 0:
self._in_declare = True
return 1
if unified == 'BEGIN':
self._begin_depth += 1
if self._is_create:
# FIXME(andi): This makes no sense.
return 1
return 0
# Should this respect a preceeding BEGIN?
def _next_token(self, tlist, idx=-1):
split_words = T.Keyword, self.split_words, True
tidx, token = tlist.token_next_by(m=split_words, idx=idx)
# treat "BETWEEN x and y" as a single statement
if token and token.normalized == 'BETWEEN':
tidx, token = self._next_token(tlist, tidx)
if token and token.normalized == 'AND':
tidx, token = self._next_token(tlist, tidx)
return tidx, token
# ANSI
# if normal token return
# wouldn't parenthesis increase/decrease a level?
# no, inside a paranthesis can't start new statement
if ttype not in T.Keyword:
return 0
# Everything after here is ttype = T.Keyword
# Also to note, once entered an If statement you are done and basically
# returning
unified = value.upper()
# three keywords begin with CREATE, but only one of them is DDL
# DDL Create though can contain more words such as "or replace"
if ttype is T.Keyword.DDL and unified.startswith('CREATE'):
self._is_create = True
return 0
# can have nested declare inside of being...
if unified == 'DECLARE' and self._is_create and self._begin_depth == 0:
self._in_declare = True
return 1
if unified == 'BEGIN':
self._begin_depth += 1
if self._is_create:
# FIXME(andi): This makes no sense.
return 1
return 0
# Should this respect a preceeding BEGIN?
def _create(self, parent, start_token):
token = self._token_next(parent, start_token)
if token.match(Tokens.Keyword, 'TABLE'):
token = self._token_next(parent, token)
while token.match(Tokens.Keyword, ['IF', 'NOT', 'EXIST']) or \
token.is_whitespace():
token = self._token_next(parent, token)
table_name = self._get_entity_name_from_token(parent, token)
if not table_name:
raise Exception("Invalid CREATE TABLE statement, expected table name")
self._replace_table_entity_name(parent, token, table_name)
elif token.match(Tokens.Keyword, ['UNIQUE', 'INDEX']):
if token.match(Tokens.Keyword, 'UNIQUE'):
token = self._token_next(parent, token)
if token.match(Tokens.Keyword, 'INDEX'):
index_token = self._token_next(parent, token)
index_name = self._get_entity_name_from_token(parent, index_token)
if not index_name:
raise Exception("Invalid CREATE INDEX statement, expected index name")
on_token = self._token_next_match(parent, index_token, Tokens.Keyword, 'ON')
if not on_token:
def __init__(self, schema, query, me):
"""Args:
query: FQL statement
me: integer, the user id that me() should return
"""
logging.debug('parsing %s' % query)
self.schema = schema
self.query = query
self.me = me
self.statement = stmt = sqlparse.parse(query)[0]
# extract table and WHERE clause, if any
self.table = None
self.where = None
from_ = stmt.token_next_match(0, tokens.Keyword, 'FROM')
if from_:
index = stmt.token_index(from_)
self.table = stmt.token_next(index)
if self.table.is_group():
self.table = self.table.token_first()
self.where = stmt.token_next_by_instance(0, sql.Where)
logging.debug('table %s, where %s' % (self.table, self.where))
def end_match(token):
stopwords = ('ORDER', 'GROUP', 'LIMIT', 'UNION', 'EXCEPT', 'HAVING',
'WHEN', # for Oracle10g merge
'CONNECT', # for Oracle connect by
)
if token.match(T.Keyword, stopwords):
return True
if token.match(T.DML, ('DELETE')): # for Oracle10g merge
return True
if token.match(T.DML, ('START')): # for Oracle connect by
return True
return False
'USER_DEFINED_TYPE_SCHEMA': tokens.Keyword,
'USING': tokens.Keyword,
'VACUUM': tokens.Keyword,
'VALID': tokens.Keyword,
'VALIDATE': tokens.Keyword,
'VALIDATOR': tokens.Keyword,
'VALUES': tokens.Keyword,
'VARIABLE': tokens.Keyword,
'VERBOSE': tokens.Keyword,
'VERSION': tokens.Keyword,
'VIEW': tokens.Keyword,
'VOLATILE': tokens.Keyword,
'WHENEVER': tokens.Keyword,
'WITH': tokens.Keyword.CTE,
'WITHOUT': tokens.Keyword,
'WORK': tokens.Keyword,
'WRITE': tokens.Keyword,
'YEAR': tokens.Keyword,
'ZONE': tokens.Keyword,
# Name.Builtin
'ARRAY': tokens.Name.Builtin,
'BIGINT': tokens.Name.Builtin,
'BINARY': tokens.Name.Builtin,
'BIT': tokens.Name.Builtin,
'BLOB': tokens.Name.Builtin,
'BOOLEAN': tokens.Name.Builtin,
'CHAR': tokens.Name.Builtin,
if token.is_keyword and token.normalized == 'WITH':
with_stmt = token
break
if with_stmt is None:
# no with stmt, add one, and inject CTEs right at the beginning
first_token = parsed.token_first()
with_stmt = sqlparse.sql.Token(sqlparse.tokens.Keyword, 'with')
parsed.insert_before(first_token, with_stmt)
else:
# stmt exists, add a comma (which will come after injected CTEs)
trailing_comma = sqlparse.sql.Token(sqlparse.tokens.Punctuation, ',')
parsed.insert_after(with_stmt, trailing_comma)
token = sqlparse.sql.Token(
sqlparse.tokens.Keyword,
", ".join(c.sql for c in ctes)
)
parsed.insert_after(with_stmt, token)
return str(parsed)
def __call__(self, stream):
for token_type, value in stream:
if token_type not in Whitespace:
return token_type in Keyword and value == self.type