Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def visit_UpdateQuery(self, edgeql_tree):
parent_ctx = self.context.current
is_toplevel = parent_ctx.stmt is None
with self.context.subquery() as ctx:
stmt = ctx.stmt = irast.UpdateStmt()
stmt.parent_stmt = parent_ctx.stmt
self._visit_with_block(edgeql_tree)
subject = self._declare_aliased_set(
self.visit(edgeql_tree.subject), edgeql_tree.subject_alias)
subj_type = irutils.infer_type(subject, ctx.schema)
if not isinstance(subj_type, s_concepts.Concept):
raise errors.EdgeQLError(
f'cannot update non-Concept objects',
context=edgeql_tree.subject.context
)
stmt.where = self._process_select_where(edgeql_tree.where)
stmt.subject = self._process_shape(
subject, edgeql_tree.shape,
require_expressions=True,
include_implicit=False)
stmt.result = subject
if is_toplevel:
def _transform_ifelse(self, condition, if_expr, else_expr, src_context):
ctx = self.context.current
if_expr = self._ensure_qlstmt(if_expr)
if_expr.where = self._extend_qlbinop(if_expr.where, condition)
not_condition = qlast.UnaryOp(operand=condition, op=ast.ops.NOT)
else_expr = self._ensure_qlstmt(else_expr)
else_expr.where = self._extend_qlbinop(else_expr.where, not_condition)
if_expr = self.visit(if_expr)
else_expr = self.visit(else_expr)
if_expr_type = irutils.infer_type(if_expr, ctx.schema)
else_expr_type = irutils.infer_type(else_expr, ctx.schema)
result = s_utils.get_class_nearest_common_ancestor(
[if_expr_type, else_expr_type])
if result is None:
raise errors.EdgeQLError(
'if/else clauses must be of related types, got: {}/{}'.format(
if_expr_type.name, else_expr_type.name),
context=src_context)
return irast.SetOp(left=if_expr.expr, right=else_expr.expr,
op=qlast.UNION)
def _declare_view(self, expr, alias):
ctx = self.context.current
if not isinstance(expr, qlast.Statement):
expr = qlast.SelectQuery(result=expr)
with self.context.new() as subctx:
subctx.stmt = ctx.stmt.parent_stmt
substmt = self.visit(expr)
if self._is_subquery_set(substmt):
substmt = substmt.expr
result_type = irutils.infer_type(substmt, ctx.schema)
view_name = sn.Name(module='__view__', name=alias)
if isinstance(result_type, (s_atoms.Atom, s_concepts.Concept)):
c = result_type.__class__(name=view_name, bases=[result_type])
c.acquire_ancestor_inheritance(ctx.schema)
else:
c = s_concepts.Concept(name=view_name)
path_id = irast.PathId([c])
if isinstance(substmt.result, irast.Set):
real_path_id = substmt.result.path_id
else:
real_path_id = irast.PathId([result_type])
substmt.main_stmt = ctx.stmt
ctx = self.context.current
args = []
kwargs = {}
arg_types = []
for ai, a in enumerate(expr.args):
if isinstance(a, qlast.NamedArg):
arg = self._ensure_set(self.visit(a.arg))
kwargs[a.name] = arg
aname = a.name
else:
arg = self._ensure_set(self.visit(a))
args.append(arg)
aname = ai
arg_type = irutils.infer_type(arg, ctx.schema)
if arg_type is None:
raise errors.EdgeQLError(
f'could not resolve the type of argument '
f'${aname} of function {funcname}',
context=a.context)
arg_types.append(arg_type)
return args, kwargs, arg_types
def _transform_ifelse(self, condition, if_expr, else_expr, src_context):
ctx = self.context.current
if_expr = self._ensure_qlstmt(if_expr)
if_expr.where = self._extend_qlbinop(if_expr.where, condition)
not_condition = qlast.UnaryOp(operand=condition, op=ast.ops.NOT)
else_expr = self._ensure_qlstmt(else_expr)
else_expr.where = self._extend_qlbinop(else_expr.where, not_condition)
if_expr = self.visit(if_expr)
else_expr = self.visit(else_expr)
if_expr_type = irutils.infer_type(if_expr, ctx.schema)
else_expr_type = irutils.infer_type(else_expr, ctx.schema)
result = s_utils.get_class_nearest_common_ancestor(
[if_expr_type, else_expr_type])
if result is None:
raise errors.EdgeQLError(
'if/else clauses must be of related types, got: {}/{}'.format(
if_expr_type.name, else_expr_type.name),
context=src_context)
return irast.SetOp(left=if_expr.expr, right=else_expr.expr,
op=qlast.UNION)
def _try_fold_arithmetic_binop(self, op, left: irast.BinOp,
right: irast.BinOp):
ctx = self.context.current
left_type = irutils.infer_type(left, ctx.schema)
right_type = irutils.infer_type(right, ctx.schema)
if (left_type.name not in {'std::int', 'std::float'} or
right_type.name not in {'std::int', 'std::float'}):
return
result_type = left_type
if right_type.name == 'std::float':
result_type = right_type
if op == ast.ops.ADD:
value = left.value + right.value
elif op == ast.ops.SUB:
value = left.value - right.value
elif op == ast.ops.MUL:
value = left.value * right.value
else:
with ctx.new() as shape_expr_ctx:
# Put current pointer class in context, so
# that references to link properties in sub-SELECT
# can be resolved. This is necessary for proper
# evaluation of link properties on computable links,
# most importantly, in INSERT/UPDATE context.
shape_expr_ctx.toplevel_shape_rptr = irast.Pointer(
source=source_expr,
ptrcls=ptrcls,
direction=ptr_direction
)
qlexpr = astutils.ensure_qlstmt(shape_el.compexpr)
compexpr = dispatch.compile(qlexpr, ctx=shape_expr_ctx)
target_class = irutils.infer_type(compexpr, schema)
if target_class is None:
msg = 'cannot determine expression result type'
raise errors.EdgeQLError(msg, context=source_ctx)
if ptrcls is None:
if (isinstance(ctx.stmt, irast.MutatingStmt) and
ctx.clause != 'result'):
raise errors.EdgeQLError(
'reference to unknown pointer',
context=source_ctx)
ptr_module = (
ptrname[0] or
ctx.derived_target_module or
ptrsource.name.module
)
def _try_fold_binop(self, binop: irast.BinOp):
ctx = self.context.current
result_type = irutils.infer_type(binop, ctx.schema)
folded = None
left = binop.left
if isinstance(left, irast.Set) and left.expr is not None:
left = left.expr
right = binop.right
if isinstance(right, irast.Set) and right.expr is not None:
right = right.expr
op = binop.op
if (isinstance(left, irast.Constant) and
isinstance(right, irast.Constant) and
result_type.name in {'std::int', 'std::float'}):
# Left and right nodes are constants.
folded = self._try_fold_arithmetic_binop(op, left, right)