How to use tenacity - 10 common examples

To help you get started, we’ve selected a few tenacity examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github seung-lab / cloud-volume / cloudvolume / storage.py View on Github external
return ret

S3_POOL = None
GC_POOL = None
def reset_connection_pools():
  global S3_POOL
  global GC_POOL
  S3_POOL = keydefaultdict(lambda service: keydefaultdict(lambda bucket_name: S3ConnectionPool(service, bucket_name)))
  GC_POOL = keydefaultdict(lambda bucket_name: GCloudBucketPool(bucket_name))

reset_connection_pools()

retry = tenacity.retry(
  reraise=True, 
  stop=tenacity.stop_after_attempt(7), 
  wait=tenacity.wait_random_exponential(0.5, 60.0),
)

DEFAULT_THREADS = 20

class SimpleStorage(object):
  """
  Access files stored in Google Storage (gs), Amazon S3 (s3), 
  or the local Filesystem (file).

  e.g. with Storage('gs://bucket/dataset/layer') as stor:
      files = stor.get_file('filename')

  Required:
    layer_path (str): A protocol prefixed path of the above format.
      Accepts s3:// gs:// and file://. File paths are absolute.
github jd / tenacity / test_tenacity.py View on Github external
    retry=tenacity.retry_if_exception_type(CustomError))
def _retryable_test_with_exception_type_custom_attempt_limit(thing):
    return thing.go()
github jd / tenacity / test_tenacity.py View on Github external
def test_random_sleep(self):
        r = Retrying(wait=tenacity.wait_random(min=1000, max=2000))
        times = set()
        times.add(r.wait(1, 6546))
        times.add(r.wait(1, 6546))
        times.add(r.wait(1, 6546))
        times.add(r.wait(1, 6546))

        # this is kind of non-deterministic...
        self.assertTrue(len(times) > 1)
        for t in times:
            self.assertTrue(t >= 1000)
            self.assertTrue(t <= 2000)
github jd / tenacity / test_tenacity.py View on Github external
def test_incrementing_sleep(self):
        r = Retrying(wait=tenacity.wait_incrementing(
            start=500, increment=100))
        self.assertEqual(500, r.wait(1, 6546))
        self.assertEqual(600, r.wait(2, 6546))
        self.assertEqual(700, r.wait(3, 6546))
github openstack-charmers / zaza / unit_tests / utilities / test_zaza_utilities_openstack.py View on Github external
def test_neutron_bgp_speaker_appears_on_agent(self):
        openstack_utils.neutron_bgp_speaker_appears_on_agent.retry.stop = \
            tenacity.stop_after_attempt(1)
        self.assertEqual(
            openstack_utils.neutron_bgp_speaker_appears_on_agent(
                self.neutronclient, 'FAKE_AGENT_ID'),
            self.bgp_speakers)
github kiwicom / konfetti / test / test_vault.py View on Github external
def test_retry_object(vault_prefix, mocker):
    config = Konfig(
        vault_backend=VaultBackend(
            vault_prefix,
            retry=Retrying(retry=retry_if_exception_type(KonfettiError), reraise=True, stop=stop_after_attempt(2)),
        )
    )
    mocker.patch("requests.adapters.HTTPAdapter.send", side_effect=KonfettiError)
    m = mocker.patch.object(config.vault_backend, "_call", wraps=config.vault_backend._call)
    with pytest.raises(KonfettiError):
        config.SECRET
    assert m.called is True
    assert m.call_count == 2
github CloudVE / cloudbridge / tests / helpers / standard_interface_tests.py View on Github external
                wait=tenacity.wait_fixed(10),
                reraise=True)
def check_delete(test, service, obj, perform_delete=False):
    if perform_delete:
        obj.delete()

    objs = service.list()
    found_objs = [o for o in objs if o.id == obj.id]
    test.assertTrue(
        len(found_objs) == 0,
        "Object %s in service %s should have been deleted but still exists."
        % (found_objs, type(service).__name__))
github openstack-charmers / zaza / zaza / openstack / charm_tests / ceph / tests.py View on Github external
        @tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, max=60),
                        reraise=True, stop=tenacity.stop_after_attempt(12))
        def _target_get_object():
            return target_client.get_object(_container, 'testfile')
        _, target_content = _target_get_object()
github jd / tenacity / test_tenacity.py View on Github external
def test_stop_after_delay(self):
        r = Retrying(stop=tenacity.stop_after_delay(1000))
        self.assertFalse(r.stop(2, 999))
        self.assertTrue(r.stop(2, 1000))
        self.assertTrue(r.stop(2, 1001))
github cosmicpython / code / tests / e2e / test_external_events.py View on Github external
api_client.post_to_add_batch(later_batch, sku, qty=10, eta='2011-01-03')
    r = api_client.post_to_allocate(orderid, sku, 10)
    assert r.ok
    response = api_client.get_allocation(orderid)
    assert response.json()[0]['batchref'] == earlier_batch

    subscription = redis_client.subscribe_to('line_allocated')

    # change quantity on allocated batch so it's less than our order
    redis_client.publish_message('change_batch_quantity', {
        'batchref': earlier_batch, 'qty': 5
    })

    # wait until we see a message saying the order has been reallocated
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]['data'])
            assert data['orderid'] == orderid
            assert data['batchref'] == later_batch