Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def visit_AtomicRefExpr(self, expr):
if self._context.current.location == 'generator' and expr.inline:
expr.ref.filter = irutils.extend_binop(expr.ref.filter, expr.expr)
self.visit(expr.ref)
arefs = ast.find_children(
expr, lambda i: isinstance(i, irast.AtomicRefSimple))
for aref in arefs:
self.visit(aref)
expr = irast.InlineFilter(expr=expr.ref.filter, ref=expr.ref)
else:
self.visit(expr.expr)
return expr
def _process_expr(self, expr, context):
if isinstance(expr, caos_ast.BinOp):
left = self._process_expr(expr.left, context)
right = self._process_expr(expr.right, context)
if expr.op == caos_ast.LIKE:
f = caos_ast.FunctionCall(name=('str', 'like'), args=[expr.left, expr.right])
result = self._process_expr(f, context)
elif expr.op == caos_ast.ILIKE:
f = caos_ast.FunctionCall(name=('str', 'ilike'), args=[expr.left, expr.right])
result = self._process_expr(f, context)
elif expr.op == caosql_ast.REMATCH:
f = caos_ast.FunctionCall(name=('re', 'match'), args=[expr.right, expr.left])
result = self._process_expr(f, context)
elif expr.op == caosql_ast.REIMATCH:
def compile_ir_to_sql_tree(
ir_expr: irast.Base, *,
schema, backend=None,
output_format: typing.Optional[OutputFormat]=None,
ignore_shapes: bool=False,
singleton_mode: bool=False) -> pgast.Base:
try:
# Transform to sql tree
ctx_stack = context.CompilerContext()
ctx = ctx_stack.current
expr_is_stmt = isinstance(ir_expr, irast.Statement)
if expr_is_stmt:
views = ir_expr.views
ctx.scope_map = ir_expr.scope_map
ctx.scope_tree = ir_expr.scope_tree
ir_expr = ir_expr.expr
else:
views = {}
ctx.env = context.Environment(
schema=schema, output_format=output_format,
backend=backend, singleton_mode=singleton_mode,
views=views)
if ignore_shapes:
ctx.expr_exposed = False
qtree = dispatch.compile(ir_expr, ctx=ctx)
except Exception as e: # pragma: no cover
def _class_set(self, scls):
path_id = irast.PathId([scls])
ir_set = irast.Set(path_id=path_id, scls=scls)
self._register_path_scope(ir_set.path_id)
return ir_set
def intersect_paths(context, left, right, merge_filters=False):
if isinstance(left, (irast.EntityLink, irast.EntitySet)):
if isinstance(right, (irast.EntityLink, irast.EntitySet)):
# Both operands are sets -- simply merge them
result = intersect_sets(context, left, right, merge_filters)
elif isinstance(right, irast.Disjunction):
result = intersect_with_disjunction(context, right, left)
elif isinstance(right, irast.Conjunction):
result = intersect_with_conjunction(context, right, left)
elif isinstance(left, irast.Disjunction):
if isinstance(right, (irast.EntityLink, irast.EntitySet)):
result = intersect_with_disjunction(context, left, right)
elif isinstance(right, irast.Disjunction):
result = intersect_disjunctions(context, left, right)
elif isinstance(right, irast.Conjunction):
result = intersect_disjunction_with_conjunction(
context, left, right)
def _is_exists_expr_set(self, ir_expr):
return (
isinstance(ir_expr, irast.Set) and
isinstance(ir_expr.expr, irast.ExistPred)
)
srcset = irast.Set(scls=ctx.source, path_id=path_id)
result = irutils.extend_path(ctx.schema, srcset, pointer)
else:
if ctx.source.generic():
propname = ctx.attmap[expr.field][0]
else:
ptr_info = pg_types.get_pointer_storage_info(
ctx.source, resolve_type=False)
if ptr_info.table_type == 'concept':
# Singular pointer promoted into source table
propname = sn.Name('std::target')
else:
propname = ctx.attmap[expr.field][0]
path = irast.Set()
path.scls = ctx.schema.get('std::Object')
path.path_id = irutils.LinearPath([path.scls])
tgt_path = irutils.extend_path(ctx.schema, path, ctx.source)
propcls = ctx.source.resolve_pointer(ctx.schema, propname)
result = irutils.extend_path(ctx.schema, tgt_path, propcls)
return result
ir, lambda n: isinstance(n, irast.EntitySet) and n.id == path_id)
raise errors.EdgeQLError(
'only references to link properties are allowed '
'in nested UPDATE shapes', context=subel.context)
ptr_node = targetstep.rptr
el = compile_shape(
targetstep,
elements,
rptr=ptr_node,
_recurse=True,
require_expressions=True,
include_implicit=False,
ctx=ctx)
substmt = irast.SelectStmt(
result=el,
)
result = setgen.generated_set(substmt, ctx=ctx)
result.rptr = ptr_node
return result
def _populate_anchors(self, context, anchors):
for anchor, proto in anchors.items():
if isinstance(proto, s_obj.ProtoNode):
step = irast.EntitySet()
step.concept = proto
step.id = irutils.LinearPath([step.concept])
step.anchor = anchor
step.show_as_anchor = anchor
# XXX
# step.users =
elif isinstance(proto, s_links.Link):
if proto.source:
src = irast.EntitySet()
src.concept = proto.source
src.id = irutils.LinearPath([src.concept])
else:
src = None
step = irast.EntityLink(link_proto=proto, source=src)
step.anchor = anchor