Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
user_api = [user_api]
else:
raise ValueError(
"user_api must be either in {} or None. Got "
"{} instead.".format(_ALL_USER_APIS, user_api))
if limits is not None:
limits = {api: limits for api in user_api}
prefixes = []
else:
if isinstance(limits, list):
# This should be a list of dicts of modules, for compatibility
# with the result from threadpool_info.
limits = {module["prefix"]: module["num_threads"]
for module in limits}
elif isinstance(limits, _ThreadpoolInfo):
# To set the limits from the modules of a _ThreadpoolInfo
# object.
limits = {module.prefix: module.num_threads
for module in limits}
if not isinstance(limits, dict):
raise TypeError("limits must either be an int, a list or a "
"dict. Got {} instead".format(type(limits)))
# With a dictionary, can set both specific limit for given modules
# and global limit for user_api. Fetch each separately.
prefixes = [prefix for prefix in limits if prefix in _ALL_PREFIXES]
user_api = [api for api in limits if api in _ALL_USER_APIS]
return limits, user_api, prefixes
def get_modules(self, key, values):
"""Return all modules such that values contains module[key]"""
if key == "user_api" and values is None:
values = list(_ALL_USER_APIS)
if not isinstance(values, list):
values = [values]
modules = [module for module in self.modules
if getattr(module, key) in values]
return _ThreadpoolInfo(modules=modules)
def _set_threadpool_limits(self):
"""Change the maximal number of threads in selected thread pools.
Return a list with all the supported modules that have been found
matching `self._prefixes` and `self._user_api`.
"""
if self._limits is None:
return None
modules = _ThreadpoolInfo(prefixes=self._prefixes,
user_api=self._user_api)
for module in modules:
# self._limits is a dict {key: num_threads} where key is either
# a prefix or a user_api. If a module matches both, the limit
# corresponding to the prefix is chosed.
if module.prefix in self._limits:
num_threads = self._limits[module.prefix]
else:
num_threads = self._limits[module.user_api]
if num_threads is not None:
module.set_num_threads(num_threads)
return modules
def get_original_num_threads(self):
"""Original num_threads from before calling threadpool_limits
Return a dict `{user_api: num_threads}`.
"""
if self._original_info is not None:
original_limits = self._original_info
else:
original_limits = _ThreadpoolInfo(user_api=self._user_api)
num_threads = {}
warning_apis = []
for user_api in self._user_api:
limits = [module.num_threads for module in
original_limits.get_modules("user_api", user_api)]
limits = set(limits)
n_limits = len(limits)
if n_limits == 1:
limit = limits.pop()
elif n_limits == 0:
limit = None
else:
limit = min(limits)