Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_creates_without_config(self):
subscriptions = (sub_stub,)
worker = Worker(subscriptions)
worker.setup()
assert worker._subscriber._ack_deadline == 60
assert worker._subscriber._gc_project_id == "rele-test"
def worker(config):
subscriptions = (sub_stub,)
return Worker(
subscriptions,
config.gc_project_id,
config.credentials,
default_ack_deadline=60,
threads_per_subscription=10,
)
def test_creates_subscription_with_custom_ack_deadline_from_environment(
self, config
):
subscriptions = (sub_stub,)
custom_ack_deadline = 234
worker = Worker(
subscriptions,
config.gc_project_id,
config.credentials,
custom_ack_deadline,
threads_per_subscription=10,
)
worker.setup()
assert worker._subscriber._ack_deadline == custom_ack_deadline
def worker_wait_forever(self):
with patch.object(Worker, "_wait_forever", return_value=None) as p:
yield p
@patch.object(Worker, "_wait_forever")
def test_run_sets_up_and_creates_subscriptions_when_called(
self, mock_wait_forever, mock_consume, mock_create_subscription, worker
):
worker.run_forever()
topic = "some-cool-topic"
subscription = "rele-some-cool-topic"
mock_create_subscription.assert_called_once_with(subscription, topic)
mock_consume.assert_called_once_with(
subscription_name="rele-some-cool-topic", callback=ANY, scheduler=ANY,
)
scheduler = mock_consume.call_args_list[0][1]["scheduler"]
assert isinstance(scheduler, ThreadScheduler)
assert isinstance(scheduler._executor, futures.ThreadPoolExecutor)
mock_wait_forever.assert_called_once()