Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# sensor_type [ description, unit, icon ]
SENSOR_TYPES = {
'last_capture': ['Last', None, 'run-fast'],
'total_cameras': ['Arlo Cameras', None, 'video'],
'captured_today': ['Captured Today', None, 'file-video'],
'battery_level': ['Battery Level', '%', 'battery-50'],
'signal_strength': ['Signal Strength', None, 'signal'],
'temperature': ['Temperature', TEMP_CELSIUS, 'thermometer'],
'humidity': ['Humidity', '%', 'water-percent'],
'air_quality': ['Air Quality', 'ppm', 'biohazard']
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_MONITORED_CONDITIONS, default=list(SENSOR_TYPES)):
vol.All(cv.ensure_list, [vol.In(SENSOR_TYPES)]),
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up an Arlo IP sensor."""
arlo = hass.data.get(DATA_ARLO)
if not arlo:
return
sensors = []
for sensor_type in config.get(CONF_MONITORED_CONDITIONS):
if sensor_type == 'total_cameras':
sensors.append(ArloSensor(
SENSOR_TYPES[sensor_type][0], arlo, sensor_type))
else:
for camera in arlo.cameras:
SENSOR_PRESSURE: ['Pressure', 'Pa', 'mdi:arrow-down-bold'],
SENSOR_PM10: ['PM10', VOLUME_MICROGRAMS_PER_CUBIC_METER, 'mdi:thought-bubble'],
SENSOR_PM2_5: ['PM2.5', VOLUME_MICROGRAMS_PER_CUBIC_METER,
'mdi:thought-bubble-outline']
}
DEFAULT_NAME = 'Luftdaten'
CONF_SENSORID = 'sensorid'
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_SENSORID): cv.positive_int,
vol.Required(CONF_MONITORED_CONDITIONS):
vol.All(cv.ensure_list, [vol.In(SENSOR_TYPES)]),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
})
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Luftdaten sensor."""
sensor_id = config.get(CONF_SENSORID)
monitored_conditions = config.get(CONF_MONITORED_CONDITIONS)
try:
api = LuftdatenApi(sensor_id)
except Exception as e:
_LOGGER.error("Could not setup Lufdaten sensor ({})".format(e))
return False
else:
if api.data is None:
DHT_DOWNLOAD = 1000
SENSOR_TYPES = {
"current_status": ["Status", None],
"download_speed": ["Down Speed", "kB/s"],
"upload_speed": ["Up Speed", "kB/s"],
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_HOST): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Required(CONF_USERNAME): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_MONITORED_VARIABLES, default=[]): vol.All(
cv.ensure_list, [vol.In(SENSOR_TYPES)]
),
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Deluge sensors."""
name = config.get(CONF_NAME)
host = config.get(CONF_HOST)
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
port = config.get(CONF_PORT)
deluge_api = DelugeRPCClient(host, port, username, password)
try:
}
# Support to Yottabytes for the future, why not
BYTE_SIZES = ["B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_API_KEY): cv.string,
vol.Optional(CONF_DAYS, default=DEFAULT_DAYS): cv.string,
vol.Optional(CONF_HOST, default=DEFAULT_HOST): cv.string,
vol.Optional(CONF_INCLUDED, default=[]): cv.ensure_list,
vol.Optional(CONF_MONITORED_CONDITIONS, default=["movies"]): vol.All(
cv.ensure_list, [vol.In(list(SENSOR_TYPES))]
),
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
vol.Optional(CONF_SSL, default=False): cv.boolean,
vol.Optional(CONF_UNIT, default=DEFAULT_UNIT): vol.In(BYTE_SIZES),
vol.Optional(CONF_URLBASE, default=DEFAULT_URLBASE): cv.string,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Radarr platform."""
conditions = config.get(CONF_MONITORED_CONDITIONS)
add_entities([RadarrSensor(hass, config, sensor) for sensor in conditions], True)
class RadarrSensor(Entity):
"""Implementation of the Radarr sensor."""
def __init__(self, hass, conf, sensor_type):
"""Create Radarr entity."""
GIGABYTES = 'GB' # type: str
BYTES = 'bytes' # type: str
PERCENT = '%' # type: str
MIN_TIME_BETWEEN_UPDATES = timedelta(hours=1)
REQUEST_TIMEOUT = 5 # seconds
CONF_SERVICE_ID = "service"
SENSOR_TYPES = {
'usage': ['Usage', PERCENT, 'mdi:percent'],
}
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_MONITORED_VARIABLES):
vol.All(cv.ensure_list, [vol.In(SENSOR_TYPES)]),
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Required(CONF_SERVICE_ID): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
})
@asyncio.coroutine
def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
"""Set up the sensor platform."""
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
serviceid = config.get(CONF_SERVICE_ID)
auth = aiohttp.BasicAuth( username, password=password)
CONF_INDEX = 'index'
CONF_SHORT_TOPIC ='stopic' # short_topic
DEFAULT_QOS = 1
TASMOTA_ONLINE ="Online"
TASMOTA_OFFLINE = "Offline"
PLATFORM_SCHEMA = cv.PLATFORM_SCHEMA.extend({
vol.Required(CONF_SHORT_TOPIC): cv.string,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_INDEX, default=DEFAULT_INDEX): cv.string,
vol.Optional(CONF_ICON): cv.icon,
vol.Optional(CONF_PAYLOAD_ON, default=DEFAULT_PAYLOAD_ON): cv.string,
vol.Optional(CONF_PAYLOAD_OFF, default=DEFAULT_PAYLOAD_OFF): cv.string,
vol.Optional(CONF_QOS, default=DEFAULT_QOS):
vol.All(vol.Coerce(int), vol.In([0, 1, 2])),
vol.Optional(CONF_RETAIN, default=mqtt.DEFAULT_RETAIN): cv.boolean,
}).extend(mqtt.MQTT_AVAILABILITY_SCHEMA.schema)
async def async_setup_platform(hass: HomeAssistantType, config: ConfigType,
async_add_entities, discovery_info=None):
if discovery_info is None:
await _async_setup_entity(hass, config, async_add_entities,
discovery_info)
async def _async_setup_entity(hass, config, async_add_entities,
discovery_hash=None):
"""Set up the MQTT switch."""
from . import DOMAIN as SKYBELL_DOMAIN, SkybellDevice
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=90)
IMAGE_AVATAR = "avatar"
IMAGE_ACTIVITY = "activity"
CONF_ACTIVITY_NAME = "activity_name"
CONF_AVATAR_NAME = "avatar_name"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_MONITORED_CONDITIONS, default=[IMAGE_AVATAR]): vol.All(
cv.ensure_list, [vol.In([IMAGE_AVATAR, IMAGE_ACTIVITY])]
),
vol.Optional(CONF_ACTIVITY_NAME): cv.string,
vol.Optional(CONF_AVATAR_NAME): cv.string,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the platform for a Skybell device."""
cond = config[CONF_MONITORED_CONDITIONS]
names = {}
names[IMAGE_ACTIVITY] = config.get(CONF_ACTIVITY_NAME)
names[IMAGE_AVATAR] = config.get(CONF_AVATAR_NAME)
skybell = hass.data.get(SKYBELL_DOMAIN)
sensors = []
NEATO_MAP_DATA,
NEATO_PERSISTENT_MAPS,
NEATO_ROBOTS,
VALID_VENDORS,
)
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
NEATO_DOMAIN: vol.Schema(
{
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_VENDOR, default="neato"): vol.In(VALID_VENDORS),
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass, config):
"""Set up the Neato component."""
if NEATO_DOMAIN not in config:
# There is an entry and nothing in configuration.yaml
return True
entries = hass.config_entries.async_entries(NEATO_DOMAIN)
hass.data[NEATO_CONFIG] = config[NEATO_DOMAIN]
SCAN_INTERVAL = timedelta(minutes=5)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_ORIGIN): cv.string,
vol.Required(CONF_DESTINATION): cv.string,
vol.Required(CONF_REGION): vol.In(REGIONS),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_INCL_FILTER): cv.string,
vol.Optional(CONF_EXCL_FILTER): cv.string,
vol.Optional(CONF_REALTIME, default=DEFAULT_REALTIME): cv.boolean,
vol.Optional(CONF_VEHICLE_TYPE, default=DEFAULT_VEHICLE_TYPE): vol.In(
VEHICLE_TYPES
),
vol.Optional(CONF_UNITS): vol.In(UNITS),
vol.Optional(
CONF_AVOID_TOLL_ROADS, default=DEFAULT_AVOID_TOLL_ROADS
): cv.boolean,
vol.Optional(
CONF_AVOID_SUBSCRIPTION_ROADS, default=DEFAULT_AVOID_SUBSCRIPTION_ROADS
): cv.boolean,
vol.Optional(CONF_AVOID_FERRIES, default=DEFAULT_AVOID_FERRIES): cv.boolean,
}
)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Waze travel time sensor platform."""
destination = config.get(CONF_DESTINATION)
name = config.get(CONF_NAME)
origin = config.get(CONF_ORIGIN)
"CRITICAL": 50,
"FATAL": 50,
"ERROR": 40,
"WARNING": 30,
"WARN": 30,
"INFO": 20,
"DEBUG": 10,
"NOTSET": 0,
}
LOGGER_DEFAULT = "default"
LOGGER_LOGS = "logs"
ATTR_LEVEL = "level"
_VALID_LOG_LEVEL = vol.All(vol.Upper, vol.In(LOGSEVERITY))
SERVICE_SET_DEFAULT_LEVEL_SCHEMA = vol.Schema({ATTR_LEVEL: _VALID_LOG_LEVEL})
SERVICE_SET_LEVEL_SCHEMA = vol.Schema({cv.string: _VALID_LOG_LEVEL})
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Optional(LOGGER_DEFAULT): _VALID_LOG_LEVEL,
vol.Optional(LOGGER_LOGS): vol.Schema({cv.string: _VALID_LOG_LEVEL}),
}
)
},
extra=vol.ALLOW_EXTRA,
)