Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@_utils.wraps(stop_func)
def wrapped_stop_func(retry_state):
warn_about_non_retry_state_deprecation(
'stop', stop_func, stacklevel=4)
return stop_func(
retry_state.attempt_number,
retry_state.seconds_since_start,
)
@_utils.wraps(fn)
def new_fn(self,
previous_attempt_number=_unset,
delay_since_first_attempt=_unset,
retry_state=None):
if retry_state is None:
from tenacity import RetryCallState
retry_state_passed_as_non_kwarg = (
previous_attempt_number is not _unset and
isinstance(previous_attempt_number, RetryCallState))
if retry_state_passed_as_non_kwarg:
retry_state = previous_attempt_number
else:
warn_about_dunder_non_retry_state_deprecation(fn, stacklevel=2)
retry_state = make_retry_state(
previous_attempt_number=previous_attempt_number,
delay_since_first_attempt=delay_since_first_attempt)
@_utils.wraps(retry_func)
def wrapped_retry_func(retry_state):
warn_about_non_retry_state_deprecation(
'retry', retry_func, stacklevel=4)
return retry_func(retry_state.outcome)
return wrapped_retry_func
@_utils.wraps(fn)
def wrapped_retry_error_callback(retry_state):
warn_about_non_retry_state_deprecation(
'retry_error_callback', fn, stacklevel=4)
return fn(retry_state.outcome)
return wrapped_retry_error_callback
@_utils.wraps(fn)
def wrapped_after_sleep_func(retry_state):
# func, trial_number, trial_time_taken
warn_about_non_retry_state_deprecation('after', fn, stacklevel=4)
return fn(
retry_state.fn,
retry_state.attempt_number,
retry_state.seconds_since_start)
return wrapped_after_sleep_func
@_utils.wraps(fn)
def wrapped_before_func(retry_state):
# func, trial_number, trial_time_taken
warn_about_non_retry_state_deprecation('before', fn, stacklevel=4)
return fn(
retry_state.fn,
retry_state.attempt_number,
)
@_utils.wraps(f)
def wrapped_f(*args, **kw):
return self.call(f, *args, **kw)
@_utils.wraps(wait_func)
def wrapped_wait_func(retry_state):
warn_about_non_retry_state_deprecation(
'wait', wait_func, stacklevel=4)
return wait_func(
retry_state.attempt_number,
retry_state.seconds_since_start,
)