Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Handle the first step of setup flow.
Return self.async_show_form(step_id='init') if user_input is None.
Return self.async_create_entry(data={'result': result}) if finish.
"""
import pyotp
errors: Dict[str, str] = {}
if user_input:
verified = await self.hass.async_add_executor_job( # type: ignore
pyotp.TOTP(self._ota_secret).verify, user_input["code"]
)
if verified:
result = await self._auth_module.async_setup_user(
self._user_id, {"secret": self._ota_secret}
"""
The current property state read from the live environment.
"""
def __init__(self,
id_: Optional[str] = None,
outs: Optional[Any] = None) -> None:
self.id = id_
self.outs = outs
class UpdateResult:
"""
UpdateResult represents the results of a call to `ResourceProvider.update`.
"""
outs: Optional[Any]
"""
Any properties that were computed during updating.
"""
def __init__(self,
outs: Optional[Any] = None) -> None:
self.outs = outs
class ResourceProvider:
"""
ResourceProvider is a Dynamic Resource Provider which allows defining new kinds of resources
whose CRUD operations are implemented inside your Python program.
"""
def check(self, _olds: Any, news: Any) -> CheckResult:
"""
from urllib.parse import urljoin
from importlib import import_module
from pathlib import Path
from inspect import (
isasyncgenfunction,
isgeneratorfunction,
isfunction,
iscoroutinefunction,
ismethod,
isgenerator,
)
# typing
from typing import Tuple, Dict, Any
_Config = Dict[str, Any]
def config_from_setting(module) -> Tuple[_Config, _Config, _Config]:
# Generate three types of config from `setting.py`.
context = {}
for key in dir(module):
if not key.startswith("__"):
context[key] = getattr(module, key)
request_config = context.pop("REQUEST_CONFIG", {})
middleware_config = context.pop("MIDDLEWARE_CONFIG", {})
config = context
return config, request_config, middleware_config
def merge_config(*configs: _Config) -> _Config:
# Merge different configs in order.
router = RoutesRegistry()
head = router.head
get = router.get
post = router.post
put = router.put
patch = router.patch
delete = router.delete
trace = router.trace
options = router.options
connect = router.connect
if Environment is ...:
TemplatesType = Any
else:
TemplatesType = Optional[Environment]
class CannotDetermineDefaultViewNameError(RuntimeError):
def __init__(self):
super().__init__('Cannot determine the default view name to be used for the calling function. '
'Modify your Controller `view()` function call to specify the name of the view to be used.')
class ControllerMeta(type):
def __init__(cls, name, bases, attr_dict):
super().__init__(name, bases, attr_dict)
)
dependencies = attr.ib(
type=Dict[str, Dict[Tuple[str, str, str], Set[Tuple[str, str, str]]]], kw_only=True, default=attr.Factory(dict),
)
dependents = attr.ib(
type=Dict[
str,
Dict[Tuple[str, str, str], Set[Tuple[Tuple[str, str, str], Optional[str], Optional[str], Optional[str],]],],
],
kw_only=True,
default=attr.Factory(dict),
)
sources = attr.ib(type=Dict[str, Source], kw_only=True, default=attr.Factory(dict))
iteration = attr.ib(type=int, default=0, kw_only=True)
cli_parameters = attr.ib(type=Dict[str, Any], kw_only=True, default=attr.Factory(dict))
stack_info = attr.ib(type=List[Dict[str, Any]], kw_only=True, default=attr.Factory(list))
accepted_final_states_count = attr.ib(type=int, kw_only=True, default=0)
discarded_final_states_count = attr.ib(type=int, kw_only=True, default=0)
_accepted_states = attr.ib(type=List[Tuple[Tuple[float, int], State]], kw_only=True, default=attr.Factory(list),)
_accepted_states_counter = attr.ib(type=int, kw_only=True, default=0)
def __attrs_post_init__(self) -> None:
"""Verify we have only adviser or dependency monkey specific context."""
if self.decision_type is not None and self.recommendation_type is not None:
raise ValueError("Cannot instantiate context for adviser and dependency monkey at the same time")
if self.decision_type is None and self.recommendation_type is None:
raise ValueError("Cannot instantiate context not specific to adviser nor dependency monkey")
def iter_accepted_final_states(self) -> Generator[State, None, None]:
"""Get accepted final states by resolution pipeline, states are not sorted."""
self.target = target
self.msg = msg
def __str__(self) -> str:
return error_msg(*self.args)
class InvalidTarget(BuildError):
def __str__(self) -> str:
return f'muck error: invalid target: {self.target!r}; ' + ''.join(str(m) for m in self.msg)
class TargetNotFound(BuildError): pass
OptDpdt = Optional[Any] # TODO: 'Dpdt'
class Dpdt(NamedTuple):
'''
Dependent target tracking.
Each recursive update creates a `Dpdt`, forming a linked list of targets.
These are used for reporting circular dependency errors,
and for rendering dependency tree info.
TODO: make kind an enum.
'''
kind:str # 'source', 'inferred', or 'observed'.
target:str
parent:OptDpdt
def __str__(self) -> str:
return f'Dpdt: {self.target} ({self.kind})'
async def verify(self, value: typing.Iterable) -> typing.List[typing.Any]:
result = []
value = self.check_null(value)
for each in value:
data = self.field.verify(each)
if asyncio.iscoroutine(data):
data = await data
result.append(data)
return result
NotImplemented when given -1. Furthermore `val` isn't an
iterable containing invertible items. Also, no `default` argument
was specified.
"""
# Check if object defines an inverse via __pow__.
raiser = getattr(val, '__pow__', None)
result = NotImplemented if raiser is None else raiser(-1)
if result is not NotImplemented:
return result
# Maybe it's an iterable of invertible items?
# Note: we avoid str because 'a'[0] == 'a', which creates an infinite loop.
if (isinstance(val, Iterable) and not isinstance(val,
(str, ops.Operation))):
unique_indicator: List[Any] = []
results = tuple(inverse(e, unique_indicator) for e in val)
if all(e is not unique_indicator for e in results):
return results[::-1]
# Can't invert.
if default is not RaiseTypeErrorIfNotProvided:
return default
raise TypeError(
"object of type '{}' isn't invertible. "
"It has no __pow__ method (or the method returned NotImplemented) "
"and it isn't an iterable of invertible objects.".format(type(val)))
def map_repo_generic(repo2coords: Callable[[Dict[str, Any]], Dict[str, float]], namex: str = 'X', namey: str = 'Y', unitx: str = '', unity: str = '') -> Any:
snapshots: List[Dict[str, Any]] = [
#get_db().get_repositories_from_past(60 * 60 * 24 * 30)
]
points = []
for repo in get_db().get_active_repositories():
point = {
'text': repometadata[repo['name']]['desc'],
'coords': list(map(repo2coords, [repo] + [snapshot[repo['name']] for snapshot in snapshots if repo['name'] in snapshot]))
}
if 'color' in repometadata[repo['name']]:
point['color'] = repometadata[repo['name']]['color']
points.append(point)
width = 1140
async def generate_docker_args(cls,
docker: Docker,
device_alloc) \
-> Mapping[str, Any]:
cores = [*map(int, device_alloc['cpu'].keys())]
sorted_core_ids = [*map(str, sorted(cores))]
return {
'HostConfig': {
'CpuPeriod': 100_000, # docker default
'CpuQuota': int(100_000 * len(cores)),
'Cpus': ','.join(sorted_core_ids),
'CpusetCpus': ','.join(sorted_core_ids),
# 'CpusetMems': f'{resource_spec.numa_node}',