Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _next_token(tl, i):
# chooses the next token. if two tokens are found then the
# first is returned.
t1 = tl.token_next_by_type(i, (T.String.Symbol, T.Name))
t2 = tl.token_next_by_instance(i, sql.Function)
if t1 and t2:
i1 = tl.token_index(t1)
i2 = tl.token_index(t2)
if i1 > i2:
return t2
else:
return t1
elif t1:
return t1
else:
return t2
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
is_function = (allow_functions and
_identifier_is_function(identifier))
except AttributeError:
continue
if real_name:
yield TableReference(schema_name, real_name,
identifier.get_alias(), is_function)
elif isinstance(item, Identifier):
schema_name, real_name, alias = parse_identifier(item)
is_function = allow_functions and _identifier_is_function(item)
yield TableReference(schema_name, real_name, alias, is_function)
elif isinstance(item, Function):
schema_name, real_name, alias = parse_identifier(item)
yield TableReference(None, real_name, alias, allow_functions)
def _next_token(tl, i):
# chooses the next token. if two tokens are found then the
# first is returned.
t1 = tl.token_next_by_type(i, (T.String.Symbol, T.Name))
t2 = tl.token_next_by_instance(i, sql.Function)
if t1 and t2:
i1 = tl.token_index(t1)
i2 = tl.token_index(t2)
if i1 > i2:
return t2
else:
return t1
elif t1:
return t1
else:
return t2
def _identifier_is_function(identifier):
return any(isinstance(t, Function) for t in identifier.tokens)
def is_count_star(projection):
return isinstance(projection, stypes.Function) \
and 'COUNT' == projection.tokens[0].get_name().upper() \
and not projection.get_parameters()
def parse(self):
"return a ParsedColumn. This is rudimentary and will fail on hard cases"
name, *tokens = self.tokens
if not isinstance(name, sqlparse.sql.Identifier):
return ParsedColumn(False)
type_, *tokens = tokens
if not isinstance(type_, sqlparse.sql.Function) and type_.ttype[-1] not in ('Builtin', 'Keyword'):
return ParsedColumn(False)
success = ParsedColumn(True, name.value, type_.value)
if tokens and isinstance(tokens[0], sqlparse.sql.SquareBrackets):
brackets, *tokens = tokens
success.type += brackets.value
while tokens:
if tokens[0].ttype and tokens[0].ttype[-1] == 'Keyword':
if tokens[0].normalized.lower() == 'default':
_, val, *tokens = tokens
success.default = val.value
elif tokens[0].normalized.lower() == 'not null':
_, *tokens = tokens
success.not_null = True
elif tokens[0].normalized.lower() == 'unique':
_, *tokens = tokens
success.unique = True
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
is_function = (allow_functions and
_identifier_is_function(identifier))
except AttributeError:
continue
if real_name:
yield TableReference(schema_name, real_name,
identifier.get_alias(), is_function)
elif isinstance(item, Identifier):
schema_name, real_name, alias = parse_identifier(item)
is_function = allow_functions and _identifier_is_function(item)
yield TableReference(schema_name, real_name, alias, is_function)
elif isinstance(item, Function):
schema_name, real_name, alias = parse_identifier(item)
yield TableReference(None, real_name, alias, allow_functions)
def _replace_table_entity_name(self, parent, token, table_name, entity_name=None):
if not entity_name:
entity_name = table_name
next_token = self._token_next(parent, token)
if not table_name in self._skip_tables + self._translate_tables:
token_to_replace = parent.tokens[self._token_idx(parent, token)]
if isinstance(token_to_replace, Types.Function):
t = self._token_first(token_to_replace)
if isinstance(t, Types.Identifier):
token_to_replace.tokens[self._token_idx(token_to_replace, t)] = Types.Token(Tokens.Keyword,
self._prefixed_table_entity_name(entity_name))
elif isinstance(token_to_replace, Types.Identifier) or isinstance(token_to_replace, Types.Token):
parent.tokens[self._token_idx(parent, token_to_replace)] = Types.Token(Tokens.Keyword,
self._prefixed_table_entity_name(entity_name))
else:
raise Exception("Internal error, invalid table entity token type")
return next_token
try:
schema_name = identifier.get_parent_name()
real_name = identifier.get_real_name()
is_function = (allow_functions and
_identifier_is_function(identifier))
except AttributeError:
continue
if real_name:
yield TableReference(schema_name, real_name,
identifier.get_alias(), is_function)
elif isinstance(item, Identifier):
schema_name, real_name, alias = parse_identifier(item)
is_function = allow_functions and _identifier_is_function(item)
yield TableReference(schema_name, real_name, alias, is_function)
elif isinstance(item, Function):
schema_name, real_name, alias = parse_identifier(item)
yield TableReference(None, real_name, alias, allow_functions)
def _identifier_is_function(identifier):
return any(isinstance(t, Function) for t in identifier.tokens)