Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
sources = []
def f():
while True:
source, msg = receive()
if not msg:
break
if source.ref not in sources:
sources.append(source.ref)
messages.append(msg)
def f1(ref):
msg = ['hello', ' ', 'world']
for s in msg:
send(ref, s)
pid0 = spawn(f)
pid1 = spawn(f1, pid0)
core.run()
assert messages == ['hello', ' ', 'world']
assert sources == [3]
def test_registered(self):
r_list = []
def f():
print("ici %s" % registered())
print(registry._by_ref)
[r_list.append(r) for r in registered()]
pid = spawn(f)
register("b", pid)
register("a", pid)
assert 'a' in registry
assert 'b' in registry
assert registered(pid) == ['a', 'b']
pid.actor.switch()
assert r_list == ['a', 'b']
def test_send_after(self):
r_list = []
def f():
receive()
r_list.append(time.time())
ref = spawn(f)
start = time.time()
send_after(0.3, ref, None)
core.run()
end = r_list[0]
diff = end - start
assert 0.29 <= diff <= 0.31
def test_simple(self):
def f(): return
pid = spawn(f)
register("test", pid)
assert pid in registry
assert "test" in registry
assert registry["test"] == pid
assert registry[pid] == ["test"]
del registry[pid]
assert registry["test"] is None
core.run()
sources.append(source.ref)
messages.append(msg)
def f1(ref):
msg = ['hello', 'world']
for s in msg:
send(ref, s)
def f2(ref):
msg = ['brave', 'new', 'world', '']
for s in msg:
send(ref, s)
pid0 = spawn(f)
pid1 = spawn(f1, pid0)
pid2 = spawn(f2, pid0)
core.run()
assert len(messages) == 5
assert sources == [5, 6]
def test_share_registry(self):
r = Registry()
def f(): return
pid = spawn(f)
register("test1", pid)
assert "test1" in registry
assert registry["test1"] is pid
assert "test1" in r
assert r["test1"] == registry["test1"]
if source.ref not in sources:
sources.append(source.ref)
messages.append(msg)
def f1(ref):
msg = ['hello', 'world']
for s in msg:
send(ref, s)
def f2(ref):
msg = ['brave', 'new', 'world', '']
for s in msg:
send(ref, s)
pid0 = spawn(f)
pid1 = spawn(f1, pid0)
pid2 = spawn(f2, pid0)
core.run()
assert len(messages) == 5
assert sources == [5, 6]
break
if source.ref not in sources:
sources.append(source.ref)
messages.append(msg)
def f1(ref):
msg = ['hello', 'world']
for s in msg:
send(ref, s)
def f2(ref):
msg = ['brave', 'new', 'world', '']
for s in msg:
send(ref, s)
pid0 = spawn(f)
pid1 = spawn(f1, pid0)
pid2 = spawn(f2, pid0)
core.run()
assert len(messages) == 5
assert sources == [5, 6]
def f():
while True:
source, msg = receive()
if not msg:
break
if source.ref not in sources:
sources.append(source.ref)
messages.append(msg)
def f1(ref):
msg = ['hello', ' ', 'world']
for s in msg:
send(ref, s)
pid0 = spawn(f)
pid1 = spawn(f1, pid0)
core.run()
assert messages == ['hello', ' ', 'world']
assert sources == [3]
def test_simple(self):
r_list = []
def f():
r_list.append(True)
pid = spawn(f)
assert isinstance(pid, ActorRef)
assert pid.ref == 0
assert hasattr(pid.actor, 'mailbox')
sleep(0.1)
core.run()
assert r_list == [True]
assert pid.actor is None
assert pid.is_alive is False