Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def is_root_keyword(self) -> bool:
if not self.is_keyword:
return False
value = self.value.split()[-1].upper()
if value == "FROM" and isinstance(self._token.parent.parent, sqlparse.sql.Function):
return False
return value in ROOT_KEYWORDS
if token.is_whitespace and '\n' in token.value:
if token.value.endswith('\n'):
self.line = ''
else:
self.line = token.value.splitlines()[-1]
elif token.is_group and type(token) not in self.keep_together:
token.tokens = self._process(token, token.tokens)
else:
val = text_type(token)
if len(self.line) + len(val) > self.width:
match = re.search(r'^ +', self.line)
if match is not None:
indent = match.group()
else:
indent = ''
yield sql.Token(T.Whitespace, '\n{0}'.format(indent))
self.line = indent
self.line += val
yield token
def process_join_key(token):
if isinstance(token, sqlparse.sql.TokenList):
if isinstance(token, sqlparse.sql.Comparison):
left_key, right_key = str(token.left), str(token.right)
left_key = left_key.replace('"', '').replace('`', '')
right_key = right_key.replace('"', '').replace('`', '')
if left_key in column_map and right_key in column_map:
left_type = column_map[left_key]
right_type = column_map[right_key]
if left_type > right_type:
left_type, right_type = right_type, left_type
stats['join_key_type'][project_type_name][left_type + '-' + right_type] = stats['join_key_type'][project_type_name].get(left_type + '-' + right_type, 0) + 1
if left_key in constraint_map and right_key in constraint_map:
left_constraint = constraint_map[left_key]
right_constraint = constraint_map[right_key]
if left_constraint > right_constraint:
left_constraint, right_constraint = right_constraint, left_constraint
stats['join_key_constraint'][project_type_name][left_constraint + '-' + right_constraint] = stats['join_key_constraint'][project_type_name].get(left_constraint + '-' + right_constraint, 0) + 1
if 'serial_diff' == sql_function_name:
self.parent_pipeline_aggs[projection_name] = {
sql_function_name: {'buckets_path': buckets_path, 'lag': eval(params[1].value)}}
elif 'moving_avg' == sql_function_name:
if len(params) == 2:
moving_avg = eval(eval(params[1].value))
else:
moving_avg = {}
moving_avg['buckets_path'] = buckets_path
self.parent_pipeline_aggs[projection_name] = {sql_function_name: moving_avg}
else:
self.parent_pipeline_aggs[projection_name] = {
sql_function_name: {'buckets_path': buckets_path}}
else:
tokens = projection.tokens
if isinstance(tokens[0], stypes.Parenthesis):
tokens = tokens[0].tokens[1:-1]
bucket_script_agg = bucket_script_translator.translate_script(
sql_select, tokens,
include_sub_aggregation=True)
self.parent_pipeline_aggs[projection_name] = {'bucket_script': bucket_script_agg}
i += length
elif type(tok) is S.Where:
subtokens = remove_whitespace(tok.tokens[2:])
LOG.debug("WHERE <%s tokens>", len(subtokens))
clause, _ = comparison_to_sqla(subtokens)
m = m.Where(clause)
elif type(tok) is S.IdentifierList:
if prev_tok.normalized == "FROM":
for x in tok.get_identifiers():
m = m.CrossJoin(M.Table(x.normalized))
else:
cols = []
for x in tok.get_identifiers():
cols.append(M.Field(x.normalized, alias=x.get_alias()))
m = m.Columns(cols)
elif type(tok) is S.Identifier:
if prev_tok is not None and prev_tok.normalized in ["SELECT", "DISTINCT"]:
m = m.Columns([M.Field(tok.normalized, alias=tok.get_alias())])
else:
m = m.Table(tok.normalized)
elif type(tok) is S.Comparison:
raise Exception("misplaced comparison %s" % tok)
elif type(tok) is S.Parenthesis:
subtokens = remove_whitespace(tok.tokens[1:-1])
# whole expression has parens - "(select * from thing)"
if prev_tok is None:
m = tokens_to_sqla(subtokens)
# "join (select id, name from ...)"
elif prev_tok.normalized == "JOIN":
sub = tokens_to_sqla(subtokens)
m = m.Join(sub)
# "on (foo.val > 1 or foo.thing = 'whatever') and ..."
def nl(self, offset=0):
return sql.Token(
T.Whitespace,
self.n + self.char * max(0, self.leading_ws + offset))
def parse(cls, raw):
# get non-whitespace non-comment tokens
tokens = [t for t in raw.tokens if not t.is_whitespace and not isinstance(t, sql.Comment)]
if len(tokens) < 3:
return None
# check statement is of form "CREATE|DROP TYPE ..."
if tokens[0].ttype != sql_tokens.DDL or tokens[1].ttype != sql_tokens.Keyword:
return None
if tokens[0].value.upper() in ('CREATE', 'CREATE OR REPLACE'):
is_create = True
elif tokens[0].value.upper() in ('DROP',):
is_create = False
else:
return None
try:
sql_type = SqlType[tokens[1].value.upper()]
def _get_primary_key(self, def_tokens):
EXPECT_PRIMARY = 0
EXPECT_KEY = 1
EXPECT_COLUMN = 2
state = EXPECT_PRIMARY
for token in def_tokens:
if state == EXPECT_PRIMARY and token.match(T.Keyword, 'PRIMARY'):
state = EXPECT_KEY
elif state == EXPECT_KEY and token.value.upper() == 'KEY':
state = EXPECT_COLUMN
elif state == EXPECT_COLUMN and isinstance(token, sql.Parenthesis):
return [
self._clean_identifier_quotes(t.value)
for t in token.tokens[1:-1]
if t.ttype in (T.Name, T.Literal.String.Symbol)
]
return []
def _select_expression_tokens(self, parent, first_token, end_words):
if isinstance(first_token, Types.IdentifierList):
return first_token, [list(first_token.flatten())]
tokens = list()
current_list = list()
current_token = first_token
while current_token and not current_token.match(Tokens.Keyword, end_words):
if current_token.match(Tokens.Punctuation, ','):
if current_list:
tokens.append(current_list)
current_list = list()
elif current_token.is_whitespace():
pass
else:
current_list.append(current_token)
current_token = self._token_next(parent, current_token)
if current_list:
tokens.append(current_list)