Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif op == ast.ops.MUL:
value = left.value * right.value
elif op == ast.ops.DIV:
if left_type.name == right_type.name == 'std::int':
value = left.value // right.value
else:
value = left.value / right.value
elif op == ast.ops.POW:
value = left.value ** right.value
elif op == ast.ops.MOD:
value = left.value % right.value
else:
value = None
if value is not None:
return irast.Constant(value=value, type=result_type)
def process_none_test(self, expr, schema):
if isinstance(expr.expr, irast.AtomicRef):
expr = irast.AtomicRefExpr(expr=expr)
elif isinstance(expr.expr, irast.LinkPropRef):
expr = irast.LinkPropRefExpr(expr=expr)
elif isinstance(expr.expr, irast.EntitySet):
c = irast.Conjunction(paths=frozenset((expr.expr,)))
aref = self.entityref_to_idref(c, schema)
expr = irast.AtomicRefExpr(expr=irast.NoneTest(expr=aref))
elif isinstance(expr.expr, irast.Constant):
expr = irast.Constant(expr=expr)
return expr
def visit_EntityLink(self, expr):
if expr.source is not None:
self.visit(expr.source)
if 'lang_rewrite' not in expr.rewrite_flags:
schema = self._context.current.schema
localizable = schema.get('std::localizable', default=None)
link_class = expr.link_class
if (localizable is not None and
link_class.issubclass(localizable)):
cvars = self._context.current.context_vars
lang = irast.Constant(
index='__context_lang', type=schema.get('std::str'))
cvars['lang'] = 'en-US'
propn = sn.Name('std::lang')
for langprop in expr.proprefs:
if langprop.name == propn:
break
else:
lprop_class = link_class.pointers[propn]
langprop = irast.LinkPropRefSimple(
name=propn, ref=expr, ptr_class=lprop_class)
expr.proprefs.add(langprop)
eq_lang = irast.BinOp(
left=langprop, right=lang, op=ast.ops.EQ)
def process_unaryop(self, expr, operator):
if isinstance(expr, irast.AtomicRef):
result = irast.AtomicRefExpr(expr=irast.UnaryOp(expr=expr, op=operator))
elif isinstance(expr, irast.LinkPropRef):
result = irast.LinkPropRefExpr(expr=irast.UnaryOp(expr=expr, op=operator))
elif isinstance(expr, irast.Constant):
result = irast.Constant(expr=irast.UnaryOp(expr=expr, op=operator))
else:
paths = irutils.extract_paths(expr, reverse=False,
resolve_arefs=False)
exprs = self.get_multipath(paths)
arefs = self.check_atomic_disjunction(exprs, irast.AtomicRef)
proprefs = self.check_atomic_disjunction(exprs, irast.LinkPropRef)
if arefs and len(arefs) == 1:
result = irast.AtomicRefExpr(expr=irast.UnaryOp(expr=expr, op=operator))
elif proprefs and len(proprefs) == 1:
result = irast.LinkPropRefExpr(expr=irast.UnaryOp(expr=expr, op=operator))
else:
result = irast.UnaryOp(expr=expr, op=operator)
return result
def _process_unlimited_recursion(self):
type = s_types.normalize_type((0).__class__, self.schema)
return irast.Constant(value=0, index=None, type=type)
def visit_Constant(self, expr):
ctx = self.context.current
ct = s_types.normalize_type(expr.value.__class__, ctx.schema)
# TODO: visit expr.value?
return irast.Constant(value=expr.value, type=ct)
args=[result, py_ast.PyStr(s=str(attr))])
elif isinstance(expr, caos_ast.EntitySet):
if expr.anchor:
name = expr.anchor
else:
name = 'n{:x}'.format(persistent_hash(str(expr.concept.name)))
result = py_ast.PyName(id=name)
elif isinstance(expr, caos_ast.Disjunction):
if len(expr.paths) == 1:
result = self._process_expr(next(iter(expr.paths)), context)
else:
raise NotImplementedError('multipaths are not supported by this backend')
elif isinstance(expr, caos_ast.Constant):
if expr.expr:
result = self._process_expr(expr.expr, context)
elif expr.index:
result = py_ast.PyName(id=expr.index.replace('.', '__'))
else:
if isinstance(expr.value, numbers.Number):
result = py_ast.PyNum(n=expr.value)
else:
result = py_ast.PyStr(s=expr.value)
elif isinstance(expr, caos_ast.Sequence):
elements = [self._process_expr(el, context) for el in expr.elements]
result = py_ast.PyTuple(elts=elements)
elif isinstance(expr, caos_ast.FunctionCall):
args = [self._process_expr(a, context) for a in expr.args]
def _process_unlimited_recursion(*, ctx):
type = s_types.normalize_type((0).__class__, ctx.schema)
return irast.Constant(value=0, index=None, type=type)
right = self._process_expr(expr.right, context)
if expr.op == caos_ast.LIKE:
f = caos_ast.FunctionCall(name=('str', 'like'), args=[expr.left, expr.right])
result = self._process_expr(f, context)
elif expr.op == caos_ast.ILIKE:
f = caos_ast.FunctionCall(name=('str', 'ilike'), args=[expr.left, expr.right])
result = self._process_expr(f, context)
elif expr.op == caosql_ast.REMATCH:
f = caos_ast.FunctionCall(name=('re', 'match'), args=[expr.right, expr.left])
result = self._process_expr(f, context)
elif expr.op == caosql_ast.REIMATCH:
flags = caos_ast.Sequence(elements=[caos_ast.Constant(value='i')])
f = caos_ast.FunctionCall(name=('re', 'match'), args=[expr.right, expr.left, flags])
result = self._process_expr(f, context)
else:
op = _operator_map[expr.op]()
if isinstance(expr.op, (ast.ops.ComparisonOperator, ast.ops.MembershipOperator)):
result = py_ast.PyCompare(left=left, ops=[op], comparators=[right])
else:
result = py_ast.PyBinOp(left=left, right=right, op=op)
elif isinstance(expr, caos_ast.UnaryOp):
operand = self._process_expr(expr.expr, context)
op = _operator_map[expr.op]()
result = py_ast.PyUnaryOp(op=op, operand=operand)