Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pagination(self):
state = EventsState()
state.get_or_create_worker('worker1')
events = [Event('worker-online', hostname='worker1')]
events += task_succeeded_events(worker='worker1', name='task1',
id='123')
events += task_succeeded_events(worker='worker1', name='task2',
id='456')
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
params = dict(draw=1, start=0, length=10)
params['search[value]'] = ''
params['order[0][column]'] = 0
params['columns[0][data]'] = 'name'
def test_tasks(self):
state = EventsState()
state.get_or_create_worker('worker1')
state.get_or_create_worker('worker2')
state.get_or_create_worker('worker3')
events = [Event('worker-online', hostname='worker1'),
Event('worker-online', hostname='worker2')]
for i in range(100):
events += task_succeeded_events(worker='worker1')
for i in range(10):
events += task_succeeded_events(worker='worker3')
for i in range(13):
events += task_failed_events(worker='worker3')
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
def test_succeeded_task(self):
state = EventsState()
state.get_or_create_worker('worker1')
events = [Event('worker-online', hostname='worker1')]
events += task_succeeded_events(worker='worker1', name='task1',
id='123')
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
params = dict(draw=1, start=0, length=10)
params['search[value]'] = ''
params['order[0][column]'] = 0
params['columns[0][data]'] = 'name'
params['order[0][dir]'] = 'asc'
def test_task_started(self):
state = EventsState()
state.get_or_create_worker('worker1')
state.get_or_create_worker('worker2')
events = [Event('worker-online', hostname='worker1'),
Event('worker-online', hostname='worker2'),
Event('task-received', uuid='123', name='task1',
args='(2, 2)', kwargs="{'foo': 'bar'}",
retries=0, eta=None, hostname='worker1'),
Event('task-started', uuid='123', hostname='worker1')]
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
r = self.get('/dashboard')
def test_single_workers_offline(self):
state = EventsState()
state.get_or_create_worker('worker1')
state.event(Event('worker-online', hostname='worker1',
local_received=time.time()))
state.event(Event('worker-offline', hostname='worker1',
local_received=time.time()))
self.app.events.state = state
r = self.get('/dashboard')
table = HtmlTableParser()
table.parse(str(r.body))
self.assertEqual(200, r.code)
self.assertEqual(1, len(table.rows()))
self.assertTrue(table.get_row('worker1'))
self.assertEqual(['worker1', 'False', '0', '0', '0', '0', '0', None],
def test_sort_runtime(self):
state = EventsState()
state.get_or_create_worker('worker1')
events = [Event('worker-online', hostname='worker1')]
events += task_succeeded_events(worker='worker1', name='task1',
id='123', runtime=10.0)
events += task_succeeded_events(worker='worker1', name='task1',
id='456', runtime=2.0)
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
params = dict(draw=1, start=0, length=10)
params['search[value]'] = ''
params['order[0][column]'] = 0
params['columns[0][data]'] = 'runtime'
def test_task_failed(self):
state = EventsState()
state.get_or_create_worker('worker1')
state.get_or_create_worker('worker2')
events = [Event('worker-online', hostname='worker1'),
Event('worker-online', hostname='worker2'),
Event('task-received', uuid='123', name='task1',
args='(2, 2)', kwargs="{'foo': 'bar'}",
retries=0, eta=None, hostname='worker1'),
Event('task-started', uuid='123', hostname='worker1'),
Event('task-failed', uuid='123', exception="KeyError('foo')",
traceback='line 1 at main', hostname='worker1')]
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
def test_task_retried(self):
state = EventsState()
state.get_or_create_worker('worker1')
state.get_or_create_worker('worker2')
events = [Event('worker-online', hostname='worker1'),
Event('worker-online', hostname='worker2'),
Event('task-received', uuid='123', name='task1',
args='(2, 2)', kwargs="{'foo': 'bar'}",
retries=0, eta=None, hostname='worker1'),
Event('task-started', uuid='123', hostname='worker1'),
Event('task-retried', uuid='123', exception="KeyError('bar')",
traceback='line 2 at main', hostname='worker1'),
Event('task-failed', uuid='123', exception="KeyError('foo')",
traceback='line 1 at main', hostname='worker1')]
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
def test_sort_incomparable(self):
state = EventsState()
state.get_or_create_worker('worker1')
events = [Event('worker-online', hostname='worker1')]
events += task_succeeded_events(worker='worker1', name='task1',
id='123')
events += task_succeeded_events(worker='worker1', name='task1',
id='456', runtime=None)
for i, e in enumerate(events):
e['clock'] = i
e['local_received'] = time.time()
state.event(e)
self.app.events.state = state
params = dict(draw=1, start=0, length=10)
params['search[value]'] = ''
params['order[0][column]'] = 0
params['columns[0][data]'] = 'runtime'
def event(self, event):
worker_name = event['hostname']
event_type = event['type']
self.counter[worker_name][event_type] += 1
# Send event to api subscribers (via websockets)
classname = api.events.getClassName(event_type)
cls = getattr(api.events, classname, None)
if cls:
cls.send_message(event)
# Save the event
super(EventsState, self).event(event)