Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
yield sql.Token(T.Operator, '.=')
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, '"')
# Indentation
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue
# Token has escape chars
elif '"' in token.value:
token.value = token.value.replace('"', '\\"')
# Put the token
yield sql.Token(T.Text, token.value)
# Close quote
yield sql.Token(T.Text, '"')
yield sql.Token(T.Punctuation, ';')
def is_with_query_cols(tlist):
"""
WITHのqueryカラム名括弧判定
"""
parent = tlist.parent
if parent and tu.is_identifier(parent):
nametoken = tu.token_prev_enable(parent, tlist)
if not nametoken:
return False
if not tu.is_identifier(nametoken) and not nametoken.ttype in T.Name:
return False
parent = parent.parent
if parent and tu.is_identifier_list(parent):
parent = parent.parent
if parent and tu.is_with(parent):
return True
return False
def process_where_comparison_value(self, value) :
cls = value.__class__.__name__
if cls == 'Identifier' :
return "'" + str(value.to_unicode()).strip('"\'') + "'"
elif value.ttype == sqlparse.tokens.String :
return "'" + str(value.to_unicode()).strip('"\'') + "'"
else :
return value.to_unicode()
## End process_where_comparison_value()
(r'END(\s+IF|\s+LOOP|\s+WHILE)?\b', tokens.Keyword),
(r'NOT\s+NULL\b', tokens.Keyword),
(r'NULLS\s+(FIRST|LAST)\b', tokens.Keyword),
(r'UNION\s+ALL\b', tokens.Keyword),
(r'CREATE(\s+OR\s+REPLACE)?\b', tokens.Keyword.DDL),
(r'DOUBLE\s+PRECISION\b', tokens.Name.Builtin),
(r'GROUP\s+BY\b', tokens.Keyword),
(r'ORDER\s+BY\b', tokens.Keyword),
(r'(LATERAL\s+VIEW\s+)'
r'(EXPLODE|INLINE|PARSE_URL_TUPLE|POSEXPLODE|STACK)\b',
tokens.Keyword),
(r"(AT|WITH')\s+TIME\s+ZONE\s+'[^']+'", tokens.Keyword.TZCast),
(r'[0-9_A-ZÀ-Ü][_$#\w]*', is_keyword),
(r'[;:()\[\],\.]', tokens.Punctuation),
(r'[<>=~!]+', tokens.Operator.Comparison),
(r'[+/@#%^&|`?^-]+', tokens.Operator),
]}
FLAGS = re.IGNORECASE | re.UNICODE
SQL_REGEX = [(re.compile(rx, FLAGS).match, tt) for rx, tt in SQL_REGEX['root']]
KEYWORDS = {
'ABORT': tokens.Keyword,
'ABS': tokens.Keyword,
'ABSOLUTE': tokens.Keyword,
'ACCESS': tokens.Keyword,
'ADA': tokens.Keyword,
'ADD': tokens.Keyword,
'ADMIN': tokens.Keyword,
'AFTER': tokens.Keyword,
'AGGREGATE': tokens.Keyword,
def is_whitespace(self):
"""Return ``True`` if this token is a whitespace token."""
return self.ttype and self.ttype in T.Whitespace
lambda y: (y.match(T.Punctuation, '.')
or y.ttype is T.Operator),
lambda y: (y.ttype in (T.String.Symbol,
def process(self, stream):
"""Process the token stream"""
for token_type, value in stream:
is_keyword = token_type in T.Keyword
if is_keyword:
yield T.Text, '<strong>'
yield token_type, escape(value)
if is_keyword:
yield T.Text, '</strong>'
"""Returns a list of 2-tuples (condition, value).
If an ELSE exists condition is None.
"""
CONDITION = 1
VALUE = 2
ret = []
mode = CONDITION
for token in self.tokens:
# Set mode from the current statement
if token.match(T.Keyword, 'CASE'):
continue
elif skip_ws and token.ttype in T.Whitespace:
continue
elif token.match(T.Keyword, 'WHEN'):
ret.append(([], []))
mode = CONDITION
elif token.match(T.Keyword, 'THEN'):
mode = VALUE
elif token.match(T.Keyword, 'ELSE'):
ret.append((None, []))
mode = VALUE
elif token.match(T.Keyword, 'END'):
mode = None
def _process(self, tlist):
token = self._get_next_comment(tlist)
while token:
tidx = tlist.token_index(token)
prev = tlist.token_prev(tidx, False)
next_ = tlist.token_next(tidx, False)
# Replace by whitespace if prev and next exist and if they're not
# whitespaces. This doesn't apply if prev or next is a paranthesis.
if (prev is not None and next_ is not None
and not prev.is_whitespace() and not next_.is_whitespace()
and not (prev.match(T.Punctuation, '(')
or next_.match(T.Punctuation, ')'))):
tlist.tokens[tidx] = sql.Token(T.Whitespace, ' ')
else:
tlist.tokens.pop(tidx)
token = self._get_next_comment(tlist)