Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_from_subquery():
# issue 446
s = u'from(select 1)'
stmts = sqlparse.parse(s)
assert len(stmts) == 1
assert len(stmts[0].tokens) == 2
assert stmts[0].tokens[0].value == 'from'
assert stmts[0].tokens[0].ttype == T.Keyword
s = u'from (select 1)'
stmts = sqlparse.parse(s)
assert len(stmts) == 1
assert len(stmts[0].tokens) == 3
assert stmts[0].tokens[0].value == 'from'
assert stmts[0].tokens[0].ttype == T.Keyword
assert stmts[0].tokens[1].ttype == T.Whitespace
yield sql.Token(T.Whitespace, '\n')
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, '"')
# Print the tokens on the quote
for token in stream:
# Token is a new line separator
if token.is_whitespace and '\n' in token.value:
# Close quote and add a new line
yield sql.Token(T.Text, ' ";')
yield sql.Token(T.Whitespace, '\n')
# Quote header on secondary lines
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '.=')
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, '"')
# Indentation
after_lb = token.value.split('\n', 1)[1]
if after_lb:
yield sql.Token(T.Whitespace, after_lb)
continue
# Token has escape chars
elif '"' in token.value:
yield sql.Token(T.Name, varname)
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Operator, '=')
yield sql.Token(T.Whitespace, ' ')
if has_nl:
yield sql.Token(T.Operator, '(')
yield sql.Token(T.Text, "'")
cnt = 0
for token in stream:
cnt += 1
if token.is_whitespace() and '\n' in token.value:
if cnt == 1:
continue
after_lb = token.value.split('\n', 1)[1]
yield sql.Token(T.Text, " '")
yield sql.Token(T.Whitespace, '\n')
for i in range(len(varname) + 4):
yield sql.Token(T.Whitespace, ' ')
yield sql.Token(T.Text, "'")
if after_lb: # it's the indendation
yield sql.Token(T.Whitespace, after_lb)
continue
elif token.value and "'" in token.value:
token.value = token.value.replace("'", "\\'")
yield sql.Token(T.Text, token.value or '')
yield sql.Token(T.Text, "'")
if has_nl:
yield sql.Token(T.Operator, ')')
def nl(self, offset=0):
return sql.Token(
T.Whitespace,
self.n + self.char * max(0, self.leading_ws + offset))
def nl(self, offset=1):
# offset = 1 represent a single space after SELECT
offset = -len(offset) if not isinstance(offset, int) else offset
# add two for the space and parens
indent = self.indent * (2 + self._max_kwd_len)
return sql.Token(T.Whitespace, self.n + self.char * (
self._max_kwd_len + offset + indent + self.offset))
if self.comma_first:
_, ws = tlist.token_next(
tlist.token_index(token), skip_ws=False)
if (ws is not None
and ws.ttype is not T.Text.Whitespace):
tlist.insert_after(
token, sql.Token(T.Whitespace, ' '))
position = 0
else:
# ensure whitespace
for token in tlist:
_, next_ws = tlist.token_next(
tlist.token_index(token), skip_ws=False)
if token.value == ',' and not next_ws.is_whitespace:
tlist.insert_after(
token, sql.Token(T.Whitespace, ' '))
end_at = self.offset + sum(len(i.value) + 1 for i in identifiers)
adjusted_offset = 0
if (self.wrap_after > 0
and end_at > (self.wrap_after - self.offset)
and self._last_func):
adjusted_offset = -len(self._last_func.value) - 1
with offset(self, adjusted_offset), indent(self):
if adjusted_offset < 0:
tlist.insert_before(identifiers[0], self.nl())
position = 0
for token in identifiers:
# Add 1 for the "," separator
position += len(token.value) + 1
if (self.wrap_after > 0
def get_alias(self):
"""Returns the alias for this identifier or ``None``."""
# "name AS alias"
kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
if kw is not None:
return self._get_first_name(kw_idx + 1, keywords=True)
# "name alias" or "complicated column expression alias"
_, ws = self.token_next_by(t=T.Whitespace)
if len(self.tokens) > 2 and ws is not None:
return self._get_first_name(reverse=True)
def process(self, stack, stream):
splitlevel = 0
stmt = None
consume_ws = False
stmt_tokens = []
for ttype, value in stream:
# Before appending the token
if (consume_ws and ttype is not T.Whitespace
and ttype is not T.Comment.Single):
consume_ws = False
stmt.tokens = stmt_tokens
yield stmt
self._reset()
stmt = None
splitlevel = 0
if stmt is None:
stmt = Statement()
stmt_tokens = []
splitlevel += self._change_splitlevel(ttype, value)
# Append the token
stmt_tokens.append(Token(ttype, value))
# After appending the token
if (splitlevel <= 0 and ttype is T.Punctuation
and value == ';'):
def get_alias(self):
"""Returns the alias for this identifier or ``None``."""
# "name AS alias"
kw_idx, kw = self.token_next_by(m=(T.Keyword, 'AS'))
if kw is not None:
return self._get_first_name(kw_idx + 1, keywords=True)
# "name alias" or "complicated column expression alias"
_, ws = self.token_next_by(t=T.Whitespace)
if len(self.tokens) > 2 and ws is not None:
return self._get_first_name(reverse=True)
def strip_whitespace(self, tokenlist) :
#print tokenlist
newlist = []
for token in tokenlist.tokens :
if token.ttype == sqlparse.tokens.Whitespace :
pass
else :
newlist.append(token)
return newlist
## End strip_whitespace()