Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
created_at = FieldBase(field_type="DATE", default=datetime.datetime.utcnow, required=True)
updated_at = FieldBase(field_type="DATE", default=None, required=True)
error_class = FieldBase(field_type="STRING")
has_parent = FieldBase(field_type="BOOLEAN")
requester = FieldBase(field_type="STRING")
class System:
brewtils_model = brewtils.models.System
name = FieldBase(field_type="STRING", required=True, unique_with=['name','version', 'namespace'])
description = FieldBase(field_type="STRING")
version = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
max_instances = FieldBase(field_type="INT", default=1)
instances = FieldBase(field_type=brewtils.models.Instance, is_ref=True, is_list=True, reverse_delete_rule="PULL")
commands = FieldBase(field_type=brewtils.models.Command, is_ref=True, is_list=True, reverse_delete_rule="PULL")
icon_name = FieldBase(field_type="STRING")
display_name = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="DICT")
local = FieldBase(field_type="BOOLEAN", default=True)
class Event:
brewtils_model = brewtils.models.Event
name = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
garden = FieldBase(field_type="STRING")
payload = FieldBase(field_type="DICT")
error = FieldBase(field_type="BOOLEAN")
metadata = FieldBase(field_type="DICT")
timestamp = FieldBase(field_type="DATE")
if not self.nullable and self.optional and self.default is None:
raise ModelValidationError(
f"Can not save Parameter {self}: For this Parameter nulls are not "
f"allowed, but the parameter is optional with no default defined."
)
if len(self.parameters) != len(
set(parameter.key for parameter in self.parameters)
):
raise ModelValidationError(
f"Can not save Parameter {self}: Contains Parameters with duplicate keys"
)
class Command(MongoModel, Document):
brewtils_model = brewtils.models.Command
name = StringField(required=True, unique_with="system")
description = StringField()
parameters = ListField(EmbeddedDocumentField("Parameter"))
command_type = StringField(choices=BrewtilsCommand.COMMAND_TYPES, default="ACTION")
output_type = StringField(choices=BrewtilsCommand.OUTPUT_TYPES, default="STRING")
output_types = ListField()
output_labels = ListField()
schema = DictField()
form = DictField()
template = StringField()
hidden = BooleanField()
icon_name = StringField()
# reverse_delete_rule=CASCADE is set later, can't set until System is defined
system = ReferenceField("System")
type_info = FieldBase(field_type="JSON", required=False)
parameters = FieldBase(
field_type=brewtils.models.Parameter, default=None, is_list=True
)
class Command:
brewtils_model = brewtils.models.Command
name = FieldBase(field_type="STRING", required=True)
description = FieldBase(field_type="STRING")
parameters = FieldBase(
field_type=brewtils.models.Parameter, default=None, is_list=True
)
command_type = FieldBase(
field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES, default="ACTION"
)
output_type = FieldBase(
field_type="STRING", choices=BrewtilsCommand.OUTPUT_TYPES, default="STRING"
)
schema = FieldBase(field_type="JSON")
form = FieldBase(field_type="JSON")
template = FieldBase(field_type="STRING")
icon_name = FieldBase(field_type="STRING")
def clean(self):
"""Validate before saving to the database"""
if not self.name:
raise ModelValidationError(
f"Can not save Command"
f"{' for system ' + self.system.name if self.system else ''}"
def clean(self):
"""Validate before saving to the database"""
if not self.name:
raise ModelValidationError(
f"Can not save Command"
f"{' for system ' + self.system.name if self.system else ''}"
f": Missing name"
)
if self.command_type not in BrewtilsCommand.COMMAND_TYPES:
raise ModelValidationError(
f"Can not save Command {self}: Invalid command type '{self.command_type}'"
)
if self.output_type not in BrewtilsCommand.OUTPUT_TYPES:
raise ModelValidationError(
f"Can not save Command {self}: Invalid output type '{self.output_type}'"
)
if len(self.parameters) != len(
set(parameter.key for parameter in self.parameters)
):
raise ModelValidationError(
f"Can not save Command {self}: Contains Parameters with duplicate keys"
)
#from beer_garden.db.sql.util import check_indexes, ensure_roles, ensure_users
logger = logging.getLogger(__name__)
ModelType = Union[
Type[brewtils.models.Command],
Type[brewtils.models.Instance],
Type[brewtils.models.Job],
Type[brewtils.models.Request],
Type[brewtils.models.RequestTemplate],
Type[brewtils.models.System],
Type[brewtils.models.Garden],
]
ModelItem = Union[
brewtils.models.Command,
brewtils.models.Instance,
brewtils.models.Job,
brewtils.models.Request,
brewtils.models.RequestTemplate,
brewtils.models.System,
brewtils.models.Garden,
]
_model_map = beer_garden.db.sql.models.schema_mapping
engine = None
Session = None
def from_brewtils(obj: ModelItem) -> SqlModel:
"""Convert an item from its Brewtils model to its one
"output_labels": {"field": ListField, "kwargs": {}},
}
for field_name, field_info in TEMPLATE_FIELDS.items():
locals()[field_name] = field_info["field"](**field_info["kwargs"])
parent = ReferenceField(
"Request", dbref=True, required=False, reverse_delete_rule=CASCADE
)
children = DummyField(required=False)
output = StringField()
output_type = StringField(choices=BrewtilsCommand.OUTPUT_TYPES)
output_types = ListField()
output_labels = ListField()
status = StringField(choices=BrewtilsRequest.STATUS_LIST, default="CREATED")
command_type = StringField(choices=BrewtilsCommand.COMMAND_TYPES)
created_at = DateTimeField(default=datetime.datetime.utcnow, required=True)
updated_at = DateTimeField(default=None, required=True)
error_class = StringField(required=False)
has_parent = BooleanField(required=False)
requester = StringField(required=False)
meta = {
"auto_create_index": False, # We need to manage this ourselves
"index_background": True,
"indexes": [
# These are used for sorting all requests
{"name": "command_index", "fields": ["command"]},
{"name": "command_type_index", "fields": ["command_type"]},
{"name": "system_index", "fields": ["system"]},
{"name": "instance_name_index", "fields": ["instance_name"]},
{"name": "status_index", "fields": ["status"]},
command_type = FieldBase(field_type="STRING")
parameters = FieldBase(field_type="DICT")
comment = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="DICT")
output_type = FieldBase(field_type="STRING")
class Request(RequestTemplate):
brewtils_model = brewtils.models.Request
parent = FieldBase(field_type=brewtils.models.Request, is_ref=True, reverse_delete_rule="CASCADE")
children = FieldBase(field_type="PLACE_HOLDER")
output = FieldBase(field_type="STRING")
output_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.OUTPUT_TYPES)
status = FieldBase(field_type="STRING", choices=BrewtilsRequest.STATUS_LIST, default="CREATED")
command_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES)
created_at = FieldBase(field_type="DATE", default=datetime.datetime.utcnow, required=True)
updated_at = FieldBase(field_type="DATE", default=None, required=True)
error_class = FieldBase(field_type="STRING")
has_parent = FieldBase(field_type="BOOLEAN")
requester = FieldBase(field_type="STRING")
class System:
brewtils_model = brewtils.models.System
name = FieldBase(field_type="STRING", required=True, unique_with=['name','version', 'namespace'])
description = FieldBase(field_type="STRING")
version = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
max_instances = FieldBase(field_type="INT", default=1)
instances = FieldBase(field_type=brewtils.models.Instance, is_ref=True, is_list=True, reverse_delete_rule="PULL")
commands = FieldBase(field_type=brewtils.models.Command, is_ref=True, is_list=True, reverse_delete_rule="PULL")
choices = FieldBase(field_type=brewtils.models.Choices, default=None)
nullable = FieldBase(field_type="BOOLEAN", required=False, default=False)
maximum = FieldBase(field_type="INT", required=False)
minimum = FieldBase(field_type="INT", required=False)
regex = FieldBase(field_type="STRING", required=False)
form_input_type = FieldBase(
field_type="STRING", required=False, choices=BrewtilsParameter.FORM_INPUT_TYPES
)
type_info = FieldBase(field_type="JSON", required=False)
parameters = FieldBase(
field_type=brewtils.models.Parameter, default=None, is_list=True
)
class Command:
brewtils_model = brewtils.models.Command
name = FieldBase(field_type="STRING", required=True)
description = FieldBase(field_type="STRING")
parameters = FieldBase(
field_type=brewtils.models.Parameter, default=None, is_list=True
)
command_type = FieldBase(
field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES, default="ACTION"
)
output_type = FieldBase(
field_type="STRING", choices=BrewtilsCommand.OUTPUT_TYPES, default="STRING"
)
schema = FieldBase(field_type="JSON")
form = FieldBase(field_type="JSON")
template = FieldBase(field_type="STRING")
icon_name = FieldBase(field_type="STRING")
)
class System(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.System.schema
class Instance(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Instance.schema
class Command(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Command.schema
class Parameter(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Parameter.schema
class Request(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Request.schema
class Choices(Base):
id = Column(sqlalchemy.Integer, primary_key=True)
__tablename__ = brewtils.models.Choices.schema
namespace = FieldBase(field_type="STRING", required=True)
command = FieldBase(field_type="STRING", required=True)
command_type = FieldBase(field_type="STRING")
parameters = FieldBase(field_type="DICT")
comment = FieldBase(field_type="STRING")
metadata = FieldBase(field_type="DICT")
output_type = FieldBase(field_type="STRING")
class Request(RequestTemplate):
brewtils_model = brewtils.models.Request
parent = FieldBase(field_type=brewtils.models.Request, is_ref=True, reverse_delete_rule="CASCADE")
children = FieldBase(field_type="PLACE_HOLDER")
output = FieldBase(field_type="STRING")
output_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.OUTPUT_TYPES)
status = FieldBase(field_type="STRING", choices=BrewtilsRequest.STATUS_LIST, default="CREATED")
command_type = FieldBase(field_type="STRING", choices=BrewtilsCommand.COMMAND_TYPES)
created_at = FieldBase(field_type="DATE", default=datetime.datetime.utcnow, required=True)
updated_at = FieldBase(field_type="DATE", default=None, required=True)
error_class = FieldBase(field_type="STRING")
has_parent = FieldBase(field_type="BOOLEAN")
requester = FieldBase(field_type="STRING")
class System:
brewtils_model = brewtils.models.System
name = FieldBase(field_type="STRING", required=True, unique_with=['name','version', 'namespace'])
description = FieldBase(field_type="STRING")
version = FieldBase(field_type="STRING", required=True)
namespace = FieldBase(field_type="STRING", required=True)
max_instances = FieldBase(field_type="INT", default=1)