Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def prepare(self):
"""Called before each verb handler"""
# Used for calculating request handling duration
self.request.created_time = datetime.datetime.utcnow()
content_type = self.request.headers.get("content-type", "")
if self.request.method.upper() in ["POST", "PATCH"] and content_type:
content_type = content_type.split(";")
self.request.mime_type = content_type[0]
if self.request.mime_type not in [
"application/json",
"application/x-www-form-urlencoded",
]:
raise ModelValidationError("Unsupported or missing content-type header")
# Attempt to parse out the charset and decode the body, default to utf-8
charset = "utf-8"
if len(content_type) > 1:
search_result = self.charset_re.search(content_type[1])
if search_result:
charset = search_result.group(1)
self.request.charset = charset
self.request.decoded_body = self.request.body.decode(charset)
if operation.operation_type in ("SYSTEM_DELETE", "SYSTEM_RELOAD", "SYSTEM_UPDATE"):
target_system = db.query_unique(System, id=operation.args[0])
elif "INSTANCE" in operation.operation_type:
target_instance = db.query_unique(Instance, id=operation.args[0])
target_system = db.query_unique(System, instances__contains=target_instance)
elif operation.operation_type == "REQUEST_CREATE":
target_system = System(
namespace=operation.model.namespace,
name=operation.model.system,
version=operation.model.system_version,
)
elif operation.operation_type.startswith("REQUEST"):
request = db.query_unique(Request, id=operation.args[0])
target_system = System(
namespace=request.namespace,
name=request.system,
version=request.system_version,
)
elif operation.operation_type == "QUEUE_DELETE":
# Need to deconstruct the queue name
parts = operation.args[0].split(".")
version = parts[2].replace("-", ".")
target_system = System(namespace=parts[0], name=parts[1], version=version)
return _garden_name_lookup(target_system)
elif operation.operation_type.startswith("REQUEST"):
request = db.query_unique(Request, id=operation.args[0])
target_system = System(
namespace=request.namespace,
name=request.system,
version=request.system_version,
)
elif operation.operation_type == "QUEUE_DELETE":
# Need to deconstruct the queue name
parts = operation.args[0].split(".")
version = parts[2].replace("-", ".")
target_system = System(namespace=parts[0], name=parts[1], version=version)
return _garden_name_lookup(target_system)
def stop(self):
# Mark the thread as stopping
StoppableThread.stop(self)
# Close the socket - this will break self.trans.accept() call with an exception
self.trans.close()
# Wait some amount of time for all the futures to complete
futures_status = wait(
self.futures, timeout=self.WORKER_TIMEOUT, return_when=ALL_COMPLETED
)
# If there are still workers remaining after the timeout then we remove
# references to them. We need to do this because workers are daemons but
# concurrent.futures.thread adds a hook to join all workers with no timeout when
# shutting down. So any hung worker prevents the application from shutting down.
if futures_status.not_done:
self.logger.warning(
"There were still unfinished worker "
@parameter(key="list_of_s", type="String", description="Testing List of Strings", multi=True,
display_name="A List of Strings", optional=True, default=['a', 'b', 'c'],
choices=None, model=None,
nullable=False)
def echo_list_of_strings_with_default(self, list_of_s=None):
list_of_s = list_of_s or ['a', 'b', 'c']
for s in list_of_s:
self.logger.info("%s" % s)
return list_of_s
@parameter(key="model", optional=False, description="A Model with a cool definition.",
model=MyModelWithDefaults)
def echo_model_with_nested_defaults_no_main(self, model):
for k, v in model.items():
self.logger.info('{"%s" : %s}' % (k, v))
return model
@parameter(key="message", optional=False, nullable=False, default="cannot be null",
type="String")
def echo_required_message_nullable_false_with_default(self, message="cannot be null"):
if message is None:
raise ValueError("Message cannot be None.")
self.logger.info(message)
return message
@parameter(key="message", optional=True, nullable=True, default=None, type="String")
def echo_optional_message_nullable_true_null_default(self, message=None):
if not message:
self.logger.info("No message provided, and that's okay.")
else:
self.logger.info(message)
return message
@parameter(key="message", type="String", description="I depend on 'dict_key'", nullable=True,
choices={'type': 'static', 'value': STATIC_CHOICES_DICTIONARY,
'key_reference': '${dict_key}'})
def say_specific_dictionary_with_key_reference(self, message, **_):
return message
@parameter(key="echo_max_value", type="Integer",
description="Testing integer maximum constraint", multi=False,
display_name="An Integer", optional=False, nullable=False, maximum=20)
def echo_with_max_value(self, echo_max_value):
self.logger.info(echo_max_value)
return echo_max_value