Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def include_file(path, basename):
parts = basename.split(os.path.sep)
dst = CORE.relative_src_path(*parts)
copy_file_if_changed(path, dst)
_, ext = os.path.splitext(path)
if ext in ['.h', '.hpp', '.tcc']:
# Header, add include statement
cg.add_global(cg.RawStatement(f'#include "{basename}"'))
def validate_uid(value):
value = cv.string_strict(value)
for x in value.split('-'):
if len(x) != 2:
raise cv.Invalid("Each part (separated by '-') of the UID must be two characters "
"long.")
try:
x = int(x, 16)
except ValueError:
raise cv.Invalid("Valid characters for parts of a UID are 0123456789ABCDEF.")
if x < 0 or x > 255:
raise cv.Invalid("Valid values for UID parts (separated by '-') are 00 to FF")
return value
}
return out
def validate_fingerprint(value):
value = cv.string(value)
if re.match(r'^[0-9a-f]{40}$', value) is None:
raise cv.Invalid("fingerprint must be valid SHA1 hash")
return value
CONFIG_SCHEMA = cv.All(cv.Schema({
cv.GenerateID(): cv.declare_id(MQTTClientComponent),
cv.Required(CONF_BROKER): cv.string_strict,
cv.Optional(CONF_PORT, default=1883): cv.port,
cv.Optional(CONF_USERNAME, default=''): cv.string,
cv.Optional(CONF_PASSWORD, default=''): cv.string,
cv.Optional(CONF_CLIENT_ID): cv.string,
cv.Optional(CONF_DISCOVERY, default=True): cv.Any(cv.boolean, cv.one_of("CLEAN", upper=True)),
cv.Optional(CONF_DISCOVERY_RETAIN, default=True): cv.boolean,
cv.Optional(CONF_DISCOVERY_PREFIX, default="homeassistant"): cv.publish_topic,
cv.Optional(CONF_BIRTH_MESSAGE): MQTT_MESSAGE_SCHEMA,
cv.Optional(CONF_WILL_MESSAGE): MQTT_MESSAGE_SCHEMA,
cv.Optional(CONF_SHUTDOWN_MESSAGE): MQTT_MESSAGE_SCHEMA,
cv.Optional(CONF_TOPIC_PREFIX, default=lambda: CORE.name): cv.publish_topic,
cv.Optional(CONF_LOG_TOPIC): cv.Any(None, MQTT_MESSAGE_BASE.extend({
cv.Optional(CONF_LEVEL): logger.is_log_level,
}), validate_message_just_topic),
cv.Optional(CONF_SSL_FINGERPRINTS): cv.All(cv.only_on_esp8266,
cv.ensure_list(validate_fingerprint)),
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.components import web_server_base
from esphome.components.web_server_base import CONF_WEB_SERVER_BASE_ID
from esphome.const import CONF_CSS_URL, CONF_ID, CONF_JS_URL, CONF_PORT
from esphome.core import coroutine_with_priority
AUTO_LOAD = ['json', 'web_server_base']
web_server_ns = cg.esphome_ns.namespace('web_server')
WebServer = web_server_ns.class_('WebServer', cg.Component, cg.Controller)
CONFIG_SCHEMA = cv.Schema({
cv.GenerateID(): cv.declare_id(WebServer),
cv.Optional(CONF_PORT, default=80): cv.port,
cv.Optional(CONF_CSS_URL, default="https://esphome.io/_static/webserver-v1.min.css"): cv.string,
cv.Optional(CONF_JS_URL, default="https://esphome.io/_static/webserver-v1.min.js"): cv.string,
cv.GenerateID(CONF_WEB_SERVER_BASE_ID): cv.use_id(web_server_base.WebServerBase),
}).extend(cv.COMPONENT_SCHEMA)
@coroutine_with_priority(40.0)
def to_code(config):
paren = yield cg.get_variable(config[CONF_WEB_SERVER_BASE_ID])
var = cg.new_Pvariable(config[CONF_ID], paren)
yield cg.register_component(var, config)
cg.add(paren.set_port(config[CONF_PORT]))
cg.add(var.set_css_url(config[CONF_CSS_URL]))
import esphome.codegen as cg
import esphome.config_validation as cv
from esphome.components import i2c, sensor
from esphome.const import CONF_ID, CONF_PRESSURE, \
CONF_TEMPERATURE, ICON_THERMOMETER, UNIT_CELSIUS, ICON_GAUGE, \
UNIT_HECTOPASCAL
DEPENDENCIES = ['i2c']
ms5611_ns = cg.esphome_ns.namespace('ms5611')
MS5611Component = ms5611_ns.class_('MS5611Component', cg.PollingComponent, i2c.I2CDevice)
CONFIG_SCHEMA = cv.Schema({
cv.GenerateID(): cv.declare_id(MS5611Component),
cv.Required(CONF_TEMPERATURE): sensor.sensor_schema(UNIT_CELSIUS, ICON_THERMOMETER, 1),
cv.Required(CONF_PRESSURE): sensor.sensor_schema(UNIT_HECTOPASCAL, ICON_GAUGE, 1),
}).extend(cv.polling_component_schema('60s')).extend(i2c.i2c_device_schema(0x77))
def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
yield cg.register_component(var, config)
yield i2c.register_i2c_device(var, config)
if CONF_TEMPERATURE in config:
sens = yield sensor.new_sensor(config[CONF_TEMPERATURE])
cg.add(var.set_temperature_sensor(sens))
if CONF_PRESSURE in config:
sens = yield sensor.new_sensor(config[CONF_PRESSURE])
vol.Required(CONF_ADDRESS): cv.hex_uint16_t,
vol.Required(CONF_COMMAND): cv.hex_uint16_t,
}),
vol.Optional(CONF_SAMSUNG): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
}),
vol.Optional(CONF_SONY): cv.Schema({
vol.Required(CONF_DATA): cv.hex_uint32_t,
vol.Optional(CONF_NBITS, default=12): cv.one_of(12, 15, 20, int=True),
}),
vol.Optional(CONF_PANASONIC): cv.Schema({
vol.Required(CONF_ADDRESS): cv.hex_uint16_t,
vol.Required(CONF_COMMAND): cv.hex_uint32_t,
}),
vol.Optional(CONF_RC5): cv.Schema({
vol.Required(CONF_ADDRESS): vol.All(cv.hex_int, vol.Range(min=0, max=0x1F)),
vol.Required(CONF_COMMAND): vol.All(cv.hex_int, vol.Range(min=0, max=0x3F)),
}),
vol.Optional(CONF_RAW): validate_raw,
vol.Optional(CONF_RC_SWITCH_RAW): RC_SWITCH_RAW_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_A): RC_SWITCH_TYPE_A_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_B): RC_SWITCH_TYPE_B_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_C): RC_SWITCH_TYPE_C_SCHEMA,
vol.Optional(CONF_RC_SWITCH_TYPE_D): RC_SWITCH_TYPE_D_SCHEMA,
cv.GenerateID(CONF_REMOTE_RECEIVER_ID): cv.use_variable_id(RemoteReceiverComponent),
cv.GenerateID(CONF_RECEIVER_ID): cv.declare_variable_id(RemoteReceiver),
}), cv.has_exactly_one_key(*REMOTE_KEYS))
def receiver_base(full_config):
name = full_config[CONF_NAME]
atm90e32_ns = cg.esphome_ns.namespace('atm90e32')
ATM90E32Component = atm90e32_ns.class_('ATM90E32Component', cg.PollingComponent, spi.SPIDevice)
ATM90E32_PHASE_SCHEMA = cv.Schema({
cv.Optional(CONF_VOLTAGE): sensor.sensor_schema(UNIT_VOLT, ICON_FLASH, 2),
cv.Optional(CONF_CURRENT): sensor.sensor_schema(UNIT_AMPERE, ICON_CURRENT_AC, 2),
cv.Optional(CONF_POWER): sensor.sensor_schema(UNIT_WATT, ICON_FLASH, 2),
cv.Optional(CONF_REACTIVE_POWER): sensor.sensor_schema(UNIT_VOLT_AMPS_REACTIVE,
ICON_LIGHTBULB, 2),
cv.Optional(CONF_POWER_FACTOR): sensor.sensor_schema(UNIT_EMPTY, ICON_FLASH, 2),
cv.Optional(CONF_GAIN_VOLTAGE, default=41820): cv.uint16_t,
cv.Optional(CONF_GAIN_CT, default=25498): cv.uint16_t,
})
CONFIG_SCHEMA = cv.Schema({
cv.GenerateID(): cv.declare_id(ATM90E32Component),
cv.Optional(CONF_PHASE_A): ATM90E32_PHASE_SCHEMA,
cv.Optional(CONF_PHASE_B): ATM90E32_PHASE_SCHEMA,
cv.Optional(CONF_PHASE_C): ATM90E32_PHASE_SCHEMA,
cv.Optional(CONF_FREQUENCY): sensor.sensor_schema(UNIT_HERTZ, ICON_CURRENT_AC, 1),
cv.Optional(CONF_CHIP_TEMPERATURE): sensor.sensor_schema(UNIT_CELSIUS, ICON_THERMOMETER, 1),
cv.Required(CONF_LINE_FREQUENCY): cv.enum(LINE_FREQS, upper=True),
cv.Optional(CONF_GAIN_PGA, default='2X'): cv.enum(PGA_GAINS, upper=True),
}).extend(cv.polling_component_schema('60s')).extend(spi.SPI_DEVICE_SCHEMA)
def to_code(config):
var = cg.new_Pvariable(config[CONF_ID])
yield cg.register_component(var, config)
yield spi.register_spi_device(var, config)
for i, phase in enumerate([CONF_PHASE_A, CONF_PHASE_B, CONF_PHASE_C]):
from esphome.cpp_types import App, Component, esphome_ns
UARTComponent = esphome_ns.class_('UARTComponent', Component)
UARTDevice = esphome_ns.class_('UARTDevice')
MULTI_CONF = True
def validate_rx_pin(value):
value = pins.input_pin(value)
if CORE.is_esp8266 and value >= 16:
raise vol.Invalid("Pins GPIO16 and GPIO17 cannot be used as RX pins on ESP8266.")
return value
CONFIG_SCHEMA = vol.All(cv.Schema({
cv.GenerateID(): cv.declare_variable_id(UARTComponent),
vol.Optional(CONF_TX_PIN): pins.output_pin,
vol.Optional(CONF_RX_PIN): validate_rx_pin,
vol.Required(CONF_BAUD_RATE): cv.positive_int,
}).extend(cv.COMPONENT_SCHEMA.schema), cv.has_at_least_one_key(CONF_TX_PIN, CONF_RX_PIN))
def to_code(config):
rhs = App.init_uart(config[CONF_BAUD_RATE])
var = Pvariable(config[CONF_ID], rhs)
if CONF_TX_PIN in config:
add(var.set_tx_pin(config[CONF_TX_PIN]))
if CONF_RX_PIN in config:
add(var.set_rx_pin(config[CONF_RX_PIN]))
setup_component(var, config)
from esphome.components import i2c, output
import esphome.config_validation as cv
from esphome.const import CONF_ADDRESS, CONF_FREQUENCY, CONF_ID
from esphome.cpp_generator import Pvariable, add
from esphome.cpp_helpers import setup_component
from esphome.cpp_types import App, Component
DEPENDENCIES = ['i2c']
MULTI_CONF = True
PCA9685OutputComponent = output.output_ns.class_('PCA9685OutputComponent',
Component, i2c.I2CDevice)
CONFIG_SCHEMA = cv.Schema({
cv.GenerateID(): cv.declare_variable_id(PCA9685OutputComponent),
vol.Required(CONF_FREQUENCY): vol.All(cv.frequency,
vol.Range(min=23.84, max=1525.88)),
vol.Optional(CONF_ADDRESS): cv.i2c_address,
}).extend(cv.COMPONENT_SCHEMA.schema)
def to_code(config):
rhs = App.make_pca9685_component(config.get(CONF_FREQUENCY))
pca9685 = Pvariable(config[CONF_ID], rhs)
if CONF_ADDRESS in config:
add(pca9685.set_address(config[CONF_ADDRESS]))
setup_component(pca9685, config)
BUILD_FLAGS = '-DUSE_PCA9685_OUTPUT'
assert isinstance(value, list)
last_negative = None
for i, val in enumerate(value):
this_negative = val < 0
if i != 0:
if this_negative == last_negative:
raise cv.Invalid("Values must alternate between being positive and negative, "
"please see index {} and {}".format(i, i + 1), [i])
last_negative = this_negative
return value
RawData, RawBinarySensor, RawTrigger, RawAction, RawDumper = declare_protocol('Raw')
CONF_CODE_STORAGE_ID = 'code_storage_id'
RAW_SCHEMA = cv.Schema({
cv.Required(CONF_CODE): cv.All([cv.Any(cv.int_, cv.time_period_microseconds)],
cv.Length(min=1), validate_raw_alternating),
cv.GenerateID(CONF_CODE_STORAGE_ID): cv.declare_id(cg.int32),
})
@register_binary_sensor('raw', RawBinarySensor, RAW_SCHEMA)
def raw_binary_sensor(var, config):
code_ = config[CONF_CODE]
arr = cg.progmem_array(config[CONF_CODE_STORAGE_ID], code_)
cg.add(var.set_data(arr))
cg.add(var.set_len(len(code_)))
@register_trigger('raw', RawTrigger, cg.std_vector.template(cg.int32))
def raw_trigger(var, config):
pass