Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_random_sleep(self):
r = Retrying(wait=tenacity.wait_random(min=1000, max=2000))
times = set()
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
times.add(r.wait(1, 6546))
# this is kind of non-deterministic...
self.assertTrue(len(times) > 1)
for t in times:
self.assertTrue(t >= 1000)
self.assertTrue(t <= 2000)
wait=tenacity.wait_random(max=2),
reraise=True)
def set_in_progress():
if not first_iter[0]:
res_obj = resource_objects.Resource.get_obj(
self.context, self.id)
self._atomic_key = res_obj.atomic_key
else:
first_iter[0] = False
self.state_set(action, self.IN_PROGRESS, lock=lock_acquire)
wait=tenacity.wait_random(min=2, max=5))
def resolve_hostname(hostname):
""" This function uses system DNS to resolve a hostname.
It is capable of retrying in case the host is not immediately available
"""
return socket.gethostbyname(hostname)
wait=tenacity.wait_random(min=1, max=10),
reraise=True)
def get_default_router(self):
router_name = self._settings.router_name
routers = self._client.list_routers(
tenant_id=self._project_id, name=router_name).get('routers')
if len(routers) == 0:
LOG.debug('Router {name} not found'.format(name=router_name))
if self._settings.create_router:
LOG.debug('Attempting to create Router {router}'.
format(router=router_name))
external_network = self._settings.external_network
kwargs = {'id': external_network} \
if uuidutils.is_uuid_like(external_network) \
else {'name': external_network}
networks = self._client.list_networks(**kwargs).get('networks')
def _retry_send(self, function: Callable, attempt=10, *args, **kwargs):
retry_configuration = tenacity.Retrying(
stop=tenacity.stop_after_attempt(attempt),
wait=tenacity.wait_fixed(3) + tenacity.wait_random(0, 2),
after=tenacity.after_log(logger, logger.level) if logger else None,
reraise=True,
)
return retry_configuration(function, *args, **kwargs)
def __enter__(self):
self.lock = False
if not self.coordinator:
return self
LOG.debug("Trying to acquire lock for %s", self.locks_prefix)
names = itertools.cycle(self.lock_names)
retry_kwargs = {'wait': tenacity.wait_random(min=0, max=1),
'reraise': True}
if self.timeout:
retry_kwargs['stop'] = tenacity.stop_after_delay(self.timeout)
@tenacity.retry(**retry_kwargs)
def grab_lock_from_pool():
name = next(names)
# NOTE(pas-ha) currently all tooz backends support locking API.
# In case this changes, this should be wrapped to not respin
# lock grabbing on NotImplemented exception.
lock = self.coordinator.get_lock(name.encode())
locked = lock.acquire(blocking=False)
if not locked:
raise coordination.LockAcquireFailed(
"Failed to acquire lock %s" % name)
return lock
wait=tenacity.wait_random(max=2),
reraise=True)
def create_replacement():
return resource_objects.Resource.replacement(self.context,
self.id,
update_data,
rs,
self._atomic_key)