Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def suggest_based_on_last_token(token, stmt):
if isinstance(token, string_types):
token_v = token.lower()
elif isinstance(token, Comparison):
# If 'token' is a Comparison type such as
# 'select * FROM abc a JOIN def d ON a.id = d.'. Then calling
# token.value on the comparison type will only return the lhs of the
# comparison. In this case a.id. So we need to do token.tokens to get
# both sides of the comparison and pick the last token out of that
# list.
token_v = token.tokens[-1].value.lower()
elif isinstance(token, Where):
# sqlparse groups all tokens from the where clause into a single token
# list. This means that token.value may be something like
# 'where foo > 5 and '. We need to look "inside" token.tokens to handle
# suggestions in complicated where clauses correctly
prev_keyword = stmt.reduce_to_prev_keyword()
return suggest_based_on_last_token(prev_keyword, stmt)
elif isinstance(token, Identifier):
# If the previous token is an identifier, we can suggest datatypes if
# we're in a parenthesized column/field list, e.g.:
# CREATE TABLE foo (Identifier
# CREATE FUNCTION foo (Identifier
# If we're not in a parenthesized list, the most likely scenario is the
# user is about to specify an alias, e.g.:
# SELECT Identifier
# SELECT foo FROM Identifier
prev_keyword, _ = find_prev_keyword(stmt.text_before_cursor)
def group_where(tlist):
[group_where(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Where)]
idx = 0
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
stopwords = ('ORDER', 'GROUP', 'LIMIT', 'UNION')
while token:
tidx = tlist.token_index(token)
end = tlist.token_next_match(tidx + 1, T.Keyword, stopwords)
if end is None:
end = tlist._groupable_tokens[-1]
else:
end = tlist.tokens[tlist.token_index(end) - 1]
group = tlist.group_tokens(sql.Where,
tlist.tokens_between(token, end),
ignore_ws=True)
idx = tlist.token_index(group)
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
def group_where(tlist):
[group_where(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Where)]
idx = 0
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
stopwords = ('ORDER', 'GROUP', 'LIMIT', 'UNION')
while token:
tidx = tlist.token_index(token)
end = tlist.token_next_match(tidx + 1, T.Keyword, stopwords)
if end is None:
end = tlist._groupable_tokens[-1]
else:
end = tlist.tokens[tlist.token_index(end) - 1]
group = tlist.group_tokens(sql.Where,
tlist.tokens_between(token, end),
ignore_ws=True)
idx = tlist.token_index(group)
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
self._add_to_lines(indent)
continue
if token.value == 'LIMIT':
self._add_to_lines(indent + 4)
self._line.append(token.value)
for token in tokens[i + 1:]:
i += 1
s = _val(tokens[i])
s = s.replace('\n', ' ')
self._line.append(s)
self._add_to_lines(indent)
continue
if isinstance(token, Where):
self._add_to_lines(indent + 4)
self._format(_filter(token.tokens))
continue
in_break, k, s = self._in_break_tokens(tokens[i:])
if in_break:
self._add_to_lines(indent + 4)
self._line.append(s)
i += k
if s in ('GROUP BY', 'HAVING', 'ORDER BY'):
self._add_to_lines(indent)
continue
self._line.append(_val(token, indent + 4))
self._add_to_lines(indent + 4)
def group_where(tlist):
[group_where(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Where)]
idx = 0
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
stopwords = ('ORDER', 'GROUP', 'LIMIT', 'UNION')
while token:
tidx = tlist.token_index(token)
end = tlist.token_next_match(tidx + 1, T.Keyword, stopwords)
if end is None:
end = tlist._groupable_tokens[-1]
else:
end = tlist.tokens[tlist.token_index(end) - 1]
group = tlist.group_tokens(sql.Where,
tlist.tokens_between(token, end),
ignore_ws=True)
idx = tlist.token_index(group)
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
def end_match(token):
stopwords = ('ORDER', 'GROUP', 'LIMIT', 'UNION', 'EXCEPT', 'HAVING',
'WHEN', # for Oracle10g merge
'CONNECT', # for Oracle connect by
)
if token.match(T.Keyword, stopwords):
return True
if token.match(T.DML, ('DELETE')): # for Oracle10g merge
return True
if token.match(T.DML, ('START')): # for Oracle connect by
return True
return False
[group_where(sgroup) for sgroup in tlist.get_sublists()
if not isinstance(sgroup, sql.Where)]
idx = 0
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
while token:
tidx = tlist.token_index(token)
end = tlist.token_matching(tidx + 1, (end_match, ))
if end is None:
end = tlist._groupable_tokens[-1]
else:
end = tlist.tokens[tlist.token_index(end) - 1]
group = tlist.group_tokens(sql.Where,
tlist.tokens_between(token, end),
ignore_ws=True)
idx = tlist.token_index(group)
token = tlist.token_next_match(idx, T.Keyword, 'WHERE')
self.query = query
self.me = me
self.statement = stmt = sqlparse.parse(query)[0]
# extract table and WHERE clause, if any
self.table = None
self.where = None
from_ = stmt.token_next_match(0, tokens.Keyword, 'FROM')
if from_:
index = stmt.token_index(from_)
self.table = stmt.token_next(index)
if self.table.is_group():
self.table = self.table.token_first()
self.where = stmt.token_next_by_instance(0, sql.Where)
logging.debug('table %s, where %s' % (self.table, self.where))
# suggestions in complicated where clauses correctly
prev_keyword, text_before_cursor = find_prev_keyword(text_before_cursor)
return suggest_based_on_last_token(
prev_keyword, text_before_cursor, full_text, identifier
)
else:
token_v = token.value.lower()
is_operand = lambda x: x and any([x.endswith(op) for op in ["+", "-", "*", "/"]])
if not token:
return [{"type": "keyword"}, {"type": "special"}]
elif token_v.endswith("("):
p = sqlparse.parse(text_before_cursor)[0]
if p.tokens and isinstance(p.tokens[-1], Where):
# Four possibilities:
# 1 - Parenthesized clause like "WHERE foo AND ("
# Suggest columns/functions
# 2 - Function call like "WHERE foo("
# Suggest columns/functions
# 3 - Subquery expression like "WHERE EXISTS ("
# Suggest keywords, in order to do a subquery
# 4 - Subquery OR array comparison like "WHERE foo = ANY("
# Suggest columns/functions AND keywords. (If we wanted to be
# really fancy, we could suggest only array-typed columns)
column_suggestions = suggest_based_on_last_token(
"where", text_before_cursor, full_text, identifier
)
# Check for a subquery expression (cases 3 & 4)