Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_gets_local_agent(self):
addr = self.agents[0].addr
agent_proxy = aiomas.run(self.node.connect(addr))
self.assertTrue(isinstance(agent_proxy, LocalProxy))
def test_agent_update(self, mock_method):
aiomas.run(self.node.update_agents())
self.assertEquals(mock_method.call_count, 2)
def test_multiple_behaviors_called(self):
class NewAgent(syd.Agent):
state_vars = ['cash']
behaviors = [behavior,
partial(behavior_with_params, param=2)]
agent = NewAgent(self.node, {'cash': 100})
self.assertEquals(agent.state.cash, 100)
aiomas.run(agent.decide())
agent.apply_updates()
self.assertEquals(agent.state.cash, 20)
def irun(self, steps, reports=None):
"""run the simulation lazily (as an iterator)"""
# reports = {name: (fn, mod step)}
aggs = defaultdict(dict)
reports = reports or {}
for name, (fn, mod_step) in reports.items():
aggs[mod_step][name] = fn
aiomas.run(self.container.setup_agents())
for i in range(steps):
aiomas.run(self._step())
# TODO any way to make this more efficient?
report = {}
states = None
for mod_step, agg_fns in aggs.items():
if i % mod_step == 0:
for name, fn in agg_fns.items():
if states is None:
states = self.container.states()
report[name] = fn(states)
yield report
def states(self):
"""iterate over all agent states across the cluster"""
tasks = [asyncio.ensure_future(manager.states())
for manager in self._managers]
for states in aiomas.run(asyncio.gather(*tasks)):
yield from states
def irun(self, steps, reports=None):
"""run the simulation lazily (as an iterator)"""
# reports = {name: (fn, mod step)}
aggs = defaultdict(dict)
reports = reports or {}
for name, (fn, mod_step) in reports.items():
aggs[mod_step][name] = fn
aiomas.run(self.container.setup_agents())
for i in range(steps):
aiomas.run(self._step())
# TODO any way to make this more efficient?
report = {}
states = None
for mod_step, agg_fns in aggs.items():
if i % mod_step == 0:
for name, fn in agg_fns.items():
if states is None:
states = self.container.states()
report[name] = fn(states)
yield report
def start_node(port):
"""start a node"""
task = Node.start(('0.0.0.0', port),
codec=aiomas.codecs.MsgPackBlosc,
extra_serializers=[serializers.get_np_serializer])
# terminates when the node's manager is given the 'stop' command
aiomas.run(until=task)
def shutdown(self):
"""shutdown the cluster"""
tasks = [asyncio.ensure_future(manager.stop())
for manager in self._managers]
aiomas.run(asyncio.gather(*tasks))
self._container.shutdown()