Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
path = self.get_query_argument(name="path")
log.info("Path to the directory: {0}".format(path))
try:
output = ls(path)
log.debug("Contents of the directory: {0}".format(output))
self.write(output)
except StochSSAPIError as err:
self.set_status(err.status_code)
self.set_header('Content-Type', 'application/json')
error = {"Reason":err.reason,"Message":err.message}
log.error("Exception information: {0}".format(error))
self.write(error)
self.finish()
class ModelToNotebookHandler(APIHandler):
'''
##############################################################################
Handler for handling conversions from model (.mdl) file to Jupyter Notebook
(.ipynb) file.
##############################################################################
'''
@web.authenticated
async def get(self):
'''
Runs the convert_to_notebook function from the convert_to_notebook script
to build a Jupyter Notebook version of the model and write it to a file.
Attributes
----------
'''
path = self.get_query_argument(name="path")
@web.authenticated
def post(self):
data = self.get_json_body()
reload_interval = data.get("reload_interval", None)
entry = (
self.settings["tensorboard_manager"]
.new_instance(data["logdir"], reload_interval=reload_interval)
)
self.finish(json.dumps({
'name': entry.name,
'logdir': _trim_notebook_dir(entry.logdir),
'reload_time': entry.thread.reload_time}))
class TbInstanceHandler(APIHandler):
SUPPORTED_METHODS = ('GET', 'DELETE')
@web.authenticated
def get(self, name):
manager = self.settings["tensorboard_manager"]
if name in manager:
entry = manager[name]
self.finish(json.dumps({
'name': entry.name,
'logdir': _trim_notebook_dir(entry.logdir),
'reload_time': entry.thread.reload_time}))
else:
raise web.HTTPError(
404, "TensorBoard instance not found: %r" % name)
path = self.get_query_argument(name="path")
self.set_header('Content-Type', 'application/json')
log.debug("Copying file: {0}".format(path))
try:
resp = duplicate(path)
log.debug("Response message: {0}".format(resp))
self.write(resp)
except StochSSAPIError as err:
self.set_status(err.status_code)
error = {"Reason":err.reason,"Message":err.message}
log.error("Exception information: {0}".format(error))
self.write(error)
self.finish()
class DuplicateDirectoryHandler(APIHandler):
'''
##############################################################################
Handler for creating copies of a directory and its contents.
##############################################################################
'''
@web.authenticated
async def get(self):
'''
Creates a copy of a directory and its contents with a unique name and
stores it in the same parent directory as the original.
Attributes
----------
'''
path = self.get_query_argument(name="path")
self.set_header('Content-Type', 'application/json')
workflow_path = data['wkflPath']
log.debug("Actions for the workflow: {0}".format(opt_type))
log.debug("Type of workflow: {0}".format(wkfl_type))
log.debug("Path to the model: {0}".format(model_path))
log.debug("Path to the workflow: {0}".format(workflow_path))
exec_cmd = ["/stochss/stochss/handlers/util/run_workflow.py", "{}".format(model_path), "{0}".format(workflow_path), "{0}".format(wkfl_type) ] # Script commands
opt_type = list(map(lambda el: "-" + el, list(opt_type))) # format the opt_type for argparse
exec_cmd.extend(opt_type) # Add opt_type to exec_cmd
log.debug("Exec command sent to the subprocess: {0}".format(exec_cmd))
log.debug('Sending the workflow run cmd')
pipe = subprocess.Popen(exec_cmd)
log.debug('The workflow has started')
self.finish()
class SaveWorkflowAPIHandler(APIHandler):
'''
########################################################################
Handler for saving workflows.
########################################################################
'''
@web.authenticated
async def get(self):
'''
Start saving the workflow. Creates the workflow directory and workflow_info file if
saving a new workflow. Copys model into the workflow directory.
Attributes
----------
'''
data = json.loads(self.get_query_argument(name="data"))
log.debug("Handler query string: {0}".format(data))
full_path = os.path.join("/home/jovyan", path)
log.debug("Path to the model on disk: {0}".format(full_path))
try:
with open(full_path, 'r') as f:
resp = f.read()
self.write(resp)
except FileNotFoundError as err:
self.set_status(404)
self.set_header('Content-Type', 'application/json')
error = {"Reason":"File Not Found","Message":"Could not find file: " + str(err)}
log.error("Exception information: {0}".format(error))
self.write(error)
self.finish()
class DownloadZipFileAPIHandler(APIHandler):
'''
##############################################################################
Handler for downloading zip files to the users download directory.
##############################################################################
'''
@web.authenticated
async def get(self, action, path):
'''
Read and download a zip file or generate a zip file from a directory or
file and then download. Generated zip files are stored in the targets
parent directory with a unique name.
Attributes
----------
path : str
Path from the user directory to the target file or directory.
import json
from notebook.utils import url_path_join as ujoin
from notebook.base.handlers import APIHandler
class HerokuHandler(APIHandler):
"""
Top-level parent class.
"""
@property
def heroku(self):
return self.settings["heroku"]
@property
def current_path(self):
return self.get_json_body()["current_path"]
class HerokuStatus(HerokuHandler):
async def post(self):
result = await self.heroku.status(self.current_path)
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import json
from tornado import gen, web
from ...base.handlers import APIHandler
from jupyter_client.jsonutil import date_default
from notebook.utils import maybe_future, url_path_join
from jupyter_client.kernelspec import NoSuchKernel
class SessionRootHandler(APIHandler):
@web.authenticated
@gen.coroutine
def get(self):
# Return a list of running sessions
sm = self.session_manager
sessions = yield maybe_future(sm.list_sessions())
self.finish(json.dumps(sessions, default=date_default))
@web.authenticated
@gen.coroutine
def post(self):
# Creates a new session
#(unless a session already exists for the named session)
sm = self.session_manager
log.debug("Path of directories: {0}".format(directories))
full_path = os.path.join("/home/jovyan", directories)
log.debug("Full path of directories: {0}".format(full_path))
try:
os.makedirs(full_path)
self.write("{0} was successfully created!".format(directories))
except FileExistsError as err:
self.set_status(406)
self.set_header('Content-Type', 'application/json')
error = {"Reason":"Directory Already Exists","Message":"Could not create your directory: "+str(err)}
log.error("Exception information: {0}".format(error))
self.write(error)
self.finish()
class UploadFileAPIHandler(APIHandler):
'''
##############################################################################
Handler for uploading files.
##############################################################################
'''
@web.authenticated
async def post(self):
'''
Uploads the target file to the target directory. If the intended file
type is a StochSS Model or SBML Model, the original file is uploaded
with a converted model. If the file can't be uploaded to the intended
type no conversion is made and errors are sent to the user.
Attributes
----------
km = self.kernel_manager
km._check_kernel_id(kernel_id)
model = km.kernel_model(kernel_id)
self.finish(json.dumps(model))
@json_errors
@web.authenticated
@gen.coroutine
def delete(self, kernel_id):
km = self.kernel_manager
yield gen.maybe_future(km.shutdown_kernel(kernel_id))
self.set_status(204)
self.finish()
class KernelActionHandler(APIHandler):
@json_errors
@web.authenticated
@gen.coroutine
def post(self, kernel_id, action):
km = self.kernel_manager
if action == 'interrupt':
km.interrupt_kernel(kernel_id)
self.set_status(204)
if action == 'restart':
try:
yield gen.maybe_future(km.restart_kernel(kernel_id))
except Exception as e:
self.log.error("Exception restarting kernel", exc_info=True)
self.set_status(500)
'''
log.debug("Converting non-spatial model to spatial model: {0}".format(path))
self.set_header('Content-Type', 'application/json')
try:
resp = convert_model(path, to_spatial=True)
log.debug("Response: {0}".format(resp))
self.write(resp)
except StochSSAPIError as err:
self.set_status(err.status_code)
error = {"Reason":err.reason,"Message":err.message}
log.error("Exception information: {0}".format(error))
self.write(error)
self.finish()
class ConvertToModelAPIHandler(APIHandler):
'''
##############################################################################
Handler for converting a spatial model to a model.
##############################################################################
'''
@web.authenticated
async def get(self, path):
'''
Creates a model file with a unique name from a spatial model file and
stores it in the same directory as the original.
Attributes
----------
path : str
Path from the user directory to the spatial model.