Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def match_prefixes(our, other, ignore_filters):
result = None
if isinstance(our, irast.EntityLink):
link = our
our_node = our.target
if our_node is None:
our_id = irutils.LinearPath(our.source.id)
our_id.add(link.link_class, link.direction, None)
our_node = our.source
else:
our_id = our_node.id
else:
link = None
our_node = our
our_id = our.id
if isinstance(other, irast.EntityLink):
other_link = other
other_node = other.target
if other_node is None:
other_node = other.source
other_id = irutils.LinearPath(other.source.id)
other_id.add(other_link.link_class, other_link.direction, None)
other_node = other.source
other_id = irutils.LinearPath(other.source.id)
other_id.add(other_link.link_class, other_link.direction, None)
else:
other_id = other_node.id
else:
other_link = None
other_node = other
other_id = other.id
if our_id[-1] is None and other_id[-1] is not None:
other_id = irutils.LinearPath(other_id)
other_id[-1] = None
if other_id[-1] is None and our_id[-1] is not None:
our_id = irutils.LinearPath(our_id)
our_id[-1] = None
ok = (
(our_node is None and other_node is None)
or (our_node is not None and other_node is not None
and (our_id == other_id
and our_node.pathvar == other_node.pathvar
and (ignore_filters
or (not our_node.filter
and not other_node.filter
and not our_node.conjunction.paths
and not other_node.conjunction.paths))))
and (not link or (link.link_class == other_link.link_class
and link.direction == other_link.direction))
)
def _rewrite_with_edgeql_expr(self, expr, edgeql_expr, anchors):
from edgedb.lang import edgeql
schema = self._context.current.schema
ir = edgeql.compile_fragment_to_ir(
edgeql_expr, schema, anchors=anchors)
node = expr.source
rewrite_target = expr.target
path_id = irutils.LinearPath([node.concept])
nodes = ast.find_children(
ir, lambda n: isinstance(n, irast.EntitySet) and n.id == path_id)
for expr_node in nodes:
expr_node.reference = node
ptrname = expr.link_class.shortname
expr_ref = irast.SubgraphRef(
name=ptrname,
force_inline=True,
rlink=expr,
is_rewrite_product=True,
rewrite_original=rewrite_target)
self._context.current.graph.replace_refs(
if proto.source.source:
src = irast.EntitySet()
src.concept = proto.source.source
src.id = irutils.LinearPath([src.concept])
else:
src = None
link = irast.EntityLink(link_proto=proto.source, source=src,
direction=s_pointers.PointerDirection.Outbound)
if src:
src.disjunction.update(link)
pref_id = irutils.LinearPath(src.id)
else:
pref_id = irutils.LinearPath([])
pref_id.add(proto.source, s_pointers.PointerDirection.Outbound, proto.target)
step = irast.LinkPropRefSimple(name=ptr_name, ref=link, id=pref_id,
ptr_proto=proto)
step.anchor = anchor
step.show_as_anchor = anchor
else:
step = proto
context.current.anchors[anchor] = step
our_id = irutils.LinearPath(our.source.id)
our_id.add(link.link_proto, link.direction, None)
our_node = our.source
else:
our_id = our_node.id
else:
link = None
our_node = our
our_id = our.id
if isinstance(other, irast.EntityLink):
other_link = other
other_node = other.target
if other_node is None:
other_node = other.source
other_id = irutils.LinearPath(other.source.id)
other_id.add(other_link.link_proto, other_link.direction, None)
else:
other_id = other_node.id
else:
other_link = None
other_node = other
other_id = other.id
if our_id[-1] is None and other_id[-1] is not None:
other_id = irutils.LinearPath(other_id)
other_id[-1] = None
if other_id[-1] is None and our_id[-1] is not None:
our_id = irutils.LinearPath(our_id)
our_id[-1] = None
if isinstance(other, irast.EntityLink):
other_link = other
other_node = other.target
if other_node is None:
other_node = other.source
other_id = irutils.LinearPath(other.source.id)
other_id.add(other_link.link_class, other_link.direction, None)
else:
other_id = other_node.id
else:
other_link = None
other_node = other
other_id = other.id
if our_id[-1] is None and other_id[-1] is not None:
other_id = irutils.LinearPath(other_id)
other_id[-1] = None
if other_id[-1] is None and our_id[-1] is not None:
our_id = irutils.LinearPath(our_id)
our_id[-1] = None
ok = (
(our_node is None and other_node is None)
or (our_node is not None and other_node is not None
and (our_id == other_id
and our_node.pathvar == other_node.pathvar
and (ignore_filters
or (not our_node.filter
and not other_node.filter
and not our_node.conjunction.paths
and not other_node.conjunction.paths))))
expr=qlast.LinkNode(
namespace=tip.namespace,
name=tip.expr
)
)
path_tip = context.current.local_link_source
if isinstance(tip, qlast.PathStepNode):
proto = self._normalize_concept(
context, tip.expr, tip.namespace)
if isinstance(proto, s_obj.ProtoNode):
step = irast.EntitySet()
step.concept = proto
step.id = irutils.LinearPath([step.concept])
step.users.add(context.current.location)
else:
step = irast.EntityLink(link_proto=proto)
path_tip = step
elif isinstance(tip, qlast.TypeRefNode):
typeref = self._process_path(context, tip.expr)
if isinstance(typeref, irast.PathCombination):
if len(typeref.paths) > 1:
msg = "type() argument must not be a path combination"
raise errors.EdgeQLError(msg)
typeref = next(iter(typeref.paths))
elif isinstance(tip, qlast.LinkExprNode) and typeref:
path_tip = irast.MetaRef(ref=typeref, name=tip.expr.name)
def _populate_anchors(self, context, anchors):
for anchor, proto in anchors.items():
if isinstance(proto, s_obj.ProtoNode):
step = irast.EntitySet()
step.concept = proto
step.id = irutils.LinearPath([step.concept])
step.anchor = anchor
step.show_as_anchor = anchor
# XXX
# step.users =
elif isinstance(proto, s_links.Link):
if proto.source:
src = irast.EntitySet()
src.concept = proto.source
src.id = irutils.LinearPath([src.concept])
else:
src = None
step = irast.EntityLink(link_proto=proto, source=src)
step.anchor = anchor
step.show_as_anchor = anchor
if src:
src.disjunction.update(step)
elif isinstance(proto, s_lprops.LinkProperty):
ptr_name = proto.normal_name()
if proto.source.source:
src = irast.EntitySet()
src.concept = proto.source.source