Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
node=_sortexpr, dir=sortexpr.direction,
nulls=sortexpr.nones_order))
partition = []
if expr.partition:
for partition_expr in expr.partition:
_pexpr = self.visit(partition_expr)
partition.append(_pexpr)
if funcobj.from_function:
name = (funcobj.from_function,)
else:
name = (
common.edgedb_module_name_to_schema_name(
funcobj.shortname.module),
common.edgedb_name_to_pg_name(
funcobj.shortname.name)
)
if expr.window:
window_sort = agg_sort
agg_sort = None
result = pgast.FuncCall(
name=name, args=args,
agg_order=agg_sort, agg_filter=agg_filter,
agg_distinct=expr.agg_set_modifier == irast.SetModifier.DISTINCT)
if expr.window:
result.over = pgast.WindowDef(
orderby=window_sort, partition=partition)
async def code(self, context):
return (f'REASSIGN OWNED BY {common.quote_ident(self.old_role)} '
f'TO {common.quote_ident(self.new_role)}')
def visit_CTENode(self, node):
if isinstance(node.alias, str):
self.write(common.quote_ident(node.alias))
else:
self.write(common.quote_ident(node.alias.alias))
async def code(self, context):
return \
'ALTER TABLE{only} {table_name} ' \
'DISABLE TRIGGER {trigger_name}'.format(
trigger_name=common.quote_ident(self.trigger.name),
table_name=common.qname(*self.trigger.table_name),
only=' ONLY' if self.self_only else '')
async def code(self, context):
object_type = self.object.get_type()
object_id = self.object.get_id()
code = 'COMMENT ON {type} {id} IS {text}'.format(
type=object_type, id=object_id,
text=common.quote_literal(self.text))
return code
async def code(self, context):
name = common.quote_ident(self.get_extension_name())
schema = common.quote_ident(self.schema)
return 'CREATE EXTENSION {} WITH SCHEMA {}'.format(name, schema)
def get_scalar_base(schema, scalar):
base = base_type_name_map.get(scalar.name)
if base is not None:
return base
for ancestor in scalar.get_mro()[1:]:
if not ancestor.is_abstract:
# Check if base is fundamental, if not, then it is
# another domain.
try:
base = base_type_name_map[ancestor.name]
except KeyError:
base = common.scalar_name_to_domain_name(ancestor.name)
return base
raise ValueError(f'cannot determine backend type for scalar type '
f'{scalar.name}')
async def code(self, context):
code = 'ALTER SEQUENCE {} RENAME TO {}'.format(
common.qname(*self.name), common.quote_ident(self.new_name))
return code
def get_id(self):
return common.quote_ident(self.name)
async def code(self, context):
attrname = common.qname(self.attribute.name)
return 'DROP {} {}'.format(self.get_attribute_term(), attrname)