Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert Schema({str: Regex(r"^foo")}).validate({"key": "foovalue"}) == {"key": "foovalue"}
with SE:
Schema({str: Regex(r"^foo")}).validate({"key": "barvalue"})
# Error if the value does not have a buffer interface
with SE:
Regex(r"bar").validate(1)
with SE:
Regex(r"bar").validate({})
with SE:
Regex(r"bar").validate([])
with SE:
Regex(r"bar").validate(None)
# Validate that the pattern has a buffer interface
assert Regex(re.compile(r"foo")).validate("foo") == "foo"
assert Regex(unicode("foo")).validate("foo") == "foo"
with raises(TypeError):
Regex(1).validate("bar")
with raises(TypeError):
Regex({}).validate("bar")
with raises(TypeError):
Regex([]).validate("bar")
with raises(TypeError):
Regex(None).validate("bar")
with SE:
Regex(r"bar").validate(1)
with SE:
Regex(r"bar").validate({})
with SE:
Regex(r"bar").validate([])
with SE:
Regex(r"bar").validate(None)
# Validate that the pattern has a buffer interface
assert Regex(re.compile(r"foo")).validate("foo") == "foo"
assert Regex(unicode("foo")).validate("foo") == "foo"
with raises(TypeError):
Regex(1).validate("bar")
with raises(TypeError):
Regex({}).validate("bar")
with raises(TypeError):
Regex([]).validate("bar")
with raises(TypeError):
Regex(None).validate("bar")
OPTIONAL_ENVS = [
"ERIN_DEBUG",
"ERIN_DB_DRIVER",
"ERIN_DB_URI",
"ERIN_DB_HOST",
"ERIN_DB_PORT",
"ERIN_DB_USERNAME",
"ERIN_DB_PASSWORD",
"ERIN_DB_DATABASE" "ERIN_DESCRIPTION",
]
config_schema = Schema(
{
"bot": {
"token": Regex(r"[MN][A-Za-z\d]{23}\.[\w-]{6}\.[\w-]{27}"),
"debug": Or(True, False),
"project": str,
"plugins_folder": os.path.exists,
"log_type": Or("Normal", "Timed"),
"log_level": Or(
"SPAM",
"DEBUG",
"VERBOSE",
"INFO",
"NOTICE",
"WARNING",
"SUCCESS",
"ERROR",
"CRITICAL",
),
},
)
class RecipeConfigureFeature(RecipeFeature):
"""Recipe feature for configuring root trees built during a recipe root
feature"""
NAME = "configure"
FEATURED_ATTRIBUTES = {"configure"}
SCHEMA = schema.Schema({
"sdk": schema.Regex(RECIPE_IDENTIFIER_RE.pattern),
"configure": {
schema.Optional("env", default={}): {
schema.Regex(ENVVAR_FORMAT_RE.pattern,
error="Bad environment variable name"): str
},
"root": schema.Regex(RECIPE_IDENTIFIER_RE.pattern),
"steps": schema.And([str], len),
},
str: object, # do not consider other keys
})
def configure(self) -> None:
"""TODO"""
# using getattr to avoid static analyzers from complaining about
# missing attr (but brought by a recipe feature):
sdk = getattr(recipe.Recipe(self.recipe.config["sdk"]), "sdk")
# the recipe to configure (i.e. what is the recipe for which we need to
... "storedBy": "admin",
... "coordinate": {
... "latitude": 59.8,
... "longitude": 10.9
... },
... "dataValues": [
... { "dataElement": "qrur9Dvnyt5", "value": "22" },
... { "dataElement": "oZg33kd9taw", "value": "Male" },
... { "dataElement": "msodh3rEMJa", "value": "2013-05-18" }
... ]
... }
>>> Schema(get_event_schema()).is_valid(event)
True
"""
date_str = Regex(r"^\d{4}-\d{2}-\d{2}$")
dhis2_id_str = Regex(r"^[A-Za-z0-9]+$") # (ASCII \w without underscore)
return {
"program": dhis2_id_str,
"orgUnit": dhis2_id_str,
"eventDate": date_str,
SchemaOptional("completedDate"): date_str,
SchemaOptional("status"): Regex("^(ACTIVE|COMPLETED|VISITED|SCHEDULE|OVERDUE|SKIPPED)$"),
SchemaOptional("storedBy"): str,
SchemaOptional("coordinate"): {
"latitude": float,
"longitude": float,
},
SchemaOptional("geometry"): {
"type": str,
"coordinates": [float],
},
def validate_check_config(check_config):
class PrettyReprAnd(schema.And):
def __repr__(self):
return self._error
check_name = PrettyReprAnd(
str,
lambda val: len(val) > 0,
lambda val: not any(w in val for w in string.whitespace),
error='Check name must be a nonzero length string with no whitespace')
timeout_units = ['ns', 'us', 'µs', 'ms', 's', 'm', 'h']
timeout = schema.Regex(
'^\d+(\.\d+)?({})$'.format('|'.join(timeout_units)),
error='Timeout must be a string containing an integer or float followed by a unit: {}'.format(
', '.join(timeout_units)))
check_config_schema = schema.Schema({
schema.Optional('cluster_checks'): {
check_name: {
'description': str,
'cmd': [str],
'timeout': timeout,
},
},
schema.Optional('node_checks'): {
'checks': {
check_name: {
'description': str,
def __createSchemas(self):
varNameSchema = schema.Regex(r'^[A-Za-z_][A-Za-z0-9_]*$')
varFilterSchema = schema.Regex(r'^!?[][A-Za-z_*?][][A-Za-z0-9_*?]*$')
recipeFilterSchema = schema.Regex(r'^!?[][0-9A-Za-z_.+:*?-]+$')
toolNameSchema = schema.Regex(r'^[0-9A-Za-z_.+:-]+$')
useClauses = ['deps', 'environment', 'result', 'tools', 'sandbox']
useClauses.extend(self.__states.keys())
# construct recursive depends clause
dependsInnerClause = {
schema.Optional('name') : str,
schema.Optional('use') : useClauses,
schema.Optional('forward') : bool,
schema.Optional('environment') : schema.Schema({
varNameSchema : str
}),
schema.Optional('if') : str
}
dependsClause = schema.Schema([
schema.Or(
_FILE_SCHEMA = schema.Schema(_as_d(
{
"audit-log": _as_l([str]),
"key-custodians": _as_d({
schema.Optional(str): _as_d({
"pwdkm": str,
}),
}),
schema.Optional(schema.Regex("^(?!meta).*$")): _as_d({
# allow string names for security domains,
# but meta is reserved
"meta": _as_d({
"owners": _as_d({str: str}),
"public-key": str,
}),
schema.Optional(schema.Regex("secret-.*")): str,
}),
}))
OPSLIMIT = nacl.pwhash.argon2id.OPSLIMIT_SENSITIVE
MEMLIMIT = nacl.pwhash.argon2id.MEMLIMIT_MODERATE
# NOTE: this is a public class since it must be passed in
@attr.s(frozen=True)
class Creds(object):
"Stores credentials used to open a KeyFile"
name = attr.ib(validator=attr.validators.instance_of(unicode))
passphrase = attr.ib(validator=attr.validators.instance_of(unicode))
name_source = attr.ib(default=None)
passphrase_source = attr.ib(default=None)
def __createSchemas(self):
varNameSchema = schema.Regex(r'^[A-Za-z_][A-Za-z0-9_]*$')
varFilterSchema = schema.Regex(r'^!?[][A-Za-z_*?][][A-Za-z0-9_*?]*$')
recipeFilterSchema = schema.Regex(r'^!?[][0-9A-Za-z_.+:*?-]+$')
toolNameSchema = schema.Regex(r'^[0-9A-Za-z_.+:-]+$')
useClauses = ['deps', 'environment', 'result', 'tools', 'sandbox']
useClauses.extend(self.__states.keys())
# construct recursive depends clause
dependsInnerClause = {
schema.Optional('name') : str,
schema.Optional('use') : useClauses,
schema.Optional('forward') : bool,
schema.Optional('environment') : schema.Schema({
varNameSchema : str
}),
schema.Optional('if') : str
},
And(lambda x: x, error='Either "exclude" or "include" filter should be specified.'),
And(lambda x: not ('exclude' in x and 'include' in x), error='"exclude" and "include" filters should '
'be specified as different list items.'),
)]
},
'instance': {
'region': And(str, len),
Optional('availabilityZone', default=''): str,
Optional('subnetId', default=''): str,
'instanceType': str,
Optional('onDemandInstance', default=False): bool,
Optional('amiName', default='SpottyAMI'): str,
Optional('keyName', default=''): str,
Optional('rootVolumeSize', default=0): And(Or(int, str), Use(str),
Regex(r'^\d+$', error='Incorrect value for "rootVolumeSize".'),
Use(int),
And(lambda x: x > 0,
error='"rootVolumeSize" should be greater than 0 or should '
' not be specified.'),
),
Optional('maxPrice', default=0): And(Or(float, int, str), Use(str),
Regex(r'^\d+(\.\d{1,6})?$', error='Incorrect value for "maxPrice".'),
Use(float),
And(lambda x: x > 0, error='"maxPrice" should be greater than 0 or '
'should not be specified.'),
),
Optional('volumes', default=[]): And(
[{
Optional('name', default=''): And(str, Regex(r'^[\w-]{1,255}$')),
'directory': And(str, lambda x: x.startswith('/'), Use(lambda x: x.rstrip('/'))),
Optional('size', default=0): And(int, lambda x: x > 0),