Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_local_garden() -> Garden:
"""Retrieved the local garden object
Returns:
Garden Object
"""
local_garden = Garden(
name=config.get("garden.name"),
connection_type="LOCAL",
systems=get_systems(filter_params={"local": True}),
status="RUNNING",
status_info="This is your local Garden record, not persisted in the Database. "
"Changes will not be reflected",
)
for system in local_garden.systems:
if system.namespace not in local_garden.namespaces:
local_garden.namespaces.append(system.namespace)
return local_garden
def remove_garden(garden_name: str) -> None:
"""Remove a garden
Args:
garden_name: The Garden name
Returns:
None
"""
garden = db.query_unique(Garden, name=garden_name)
db.delete(garden)
return garden
Type[brewtils.models.Instance],
Type[brewtils.models.Job],
Type[brewtils.models.Request],
Type[brewtils.models.RequestTemplate],
Type[brewtils.models.System],
Type[brewtils.models.Garden],
]
ModelItem = Union[
brewtils.models.Command,
brewtils.models.Instance,
brewtils.models.Job,
brewtils.models.Request,
brewtils.models.RequestTemplate,
brewtils.models.System,
brewtils.models.Garden,
]
_model_map = beer_garden.db.sql.models.schema_mapping
engine = None
Session = None
def from_brewtils(obj: ModelItem) -> SqlModel:
"""Convert an item from its Brewtils model to its one
Args:
obj: The Brewtils model item
Returns:
The Mongo model item
"""Validate before saving to the database"""
if self.trigger_type not in self.TRIGGER_MODEL_MAPPING:
raise ModelValidationError(
f"Cannot save job. No mongo model for trigger type {self.trigger_type}"
)
trigger_class = self.TRIGGER_MODEL_MAPPING.get(self.trigger_type)
if not isinstance(self.trigger, trigger_class):
raise ModelValidationError(
f"Cannot save job. Expected trigger type {self.trigger_type} but "
f"actual type was {type(self.trigger)}"
)
class Garden:
brewtils_model = brewtils.models.Garden
name = FieldBase(field_type="STRING",required=True, default="default", unique_with=['name'])
status = FieldBase(field_type="STRING", default="INITIALIZING")
status_info = FieldBase(field_type="StatusInfo", default=StatusInfo())
namespaces = FieldBase(field_type="STRING", is_list=True)
connection_type = FieldBase(field_type="STRING")
connection_params = FieldBase(field_type="DICT")
systems = FieldBase(field_type="STRING", is_list=True)
def get_garden(garden_name: str) -> Garden:
"""Retrieve an individual Garden
Args:
garden_name: The name of Garden
Returns:
The Garden
"""
if garden_name == config.get("garden.name"):
return get_local_garden()
return db.query_unique(Garden, name=garden_name)
logger = logging.getLogger(__name__)
# These are the operations that we will forward to child gardens
routable_operations = [
"INSTANCE_START",
"INSTANCE_STOP",
"REQUEST_CREATE",
"SYSTEM_DELETE",
]
# Processor that will be used for forwarding
forward_processor = None
# Dict of garden_name -> garden
gardens: Dict[str, Garden] = {}
route_functions = {
"REQUEST_CREATE": beer_garden.requests.process_request,
"REQUEST_START": beer_garden.requests.start_request,
"REQUEST_COMPLETE": beer_garden.requests.complete_request,
"REQUEST_READ": beer_garden.requests.get_request,
"REQUEST_READ_ALL": beer_garden.requests.get_requests,
"COMMAND_READ": beer_garden.commands.get_command,
"COMMAND_READ_ALL": beer_garden.commands.get_commands,
"INSTANCE_READ": beer_garden.instances.get_instance,
"INSTANCE_DELETE": beer_garden.instances.remove_instance,
"INSTANCE_UPDATE": beer_garden.plugin.update,
"INSTANCE_INITIALIZE": beer_garden.plugin.initialize,
"INSTANCE_START": beer_garden.plugin.start,
"INSTANCE_STOP": beer_garden.plugin.stop,
"JOB_CREATE": beer_garden.scheduler.create_job,