Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _execNotebook(self, notebook_filename, nbpath):
with open(notebook_filename) as f:
nb = nbformat.read(f, as_version=nbformat.current_nbformat)
try:
self.ep.preprocess(nb, {'metadata': {'path': nbpath}})
except CellExecutionError:
print('-' * 60)
traceback.print_exc(file=sys.stdout)
print('-' * 60)
self.assertTrue(False, 'Error executing the notebook %s.\
See above for error.' % notebook_filename)
# with the expected list in the second place
if isinstance(outputs,tuple) and len(outputs) == 2:
outputs = outputs[1]
cell.outputs = outputs
if not self.allow_errors:
for out in outputs:
if out.output_type == "stream" and out.name == "stderr":
pattern = u"""\
An error occurred while executing the following cell:
------------------
{cell.source}
------------------
{out.text}
"""
msg = dedent(pattern).format(out=out, cell=cell)
raise CellExecutionError(msg)
return cell, resources
settings['notebook'] = resources.notebook(notebook)
settings['testing'] = True
settings['executed_notebook'] = None
if analysis_name:
settings['analysisName'] = analysis_name
# load notebook
with open(settings['notebook']) as f:
nb = nbformat.read(f, as_version=4)
# execute notebook
ep = ExecutePreprocessor(timeout=600, kernel_name='python3')
try:
ep.preprocess(nb, {})
status = True
except CellExecutionError:
status = False
settings['executed_notebook'] = resources.notebook(notebook).replace('.ipynb','_executed.ipynb')
with open(settings['executed_notebook'], mode='wt') as f:
nbformat.write(nb, f)
# check status
self.assertTrue(status, 'Notebook execution failed (%s)' % settings['executed_notebook'])
"""
logging.debug('Cell {}: Dumping session to {}'.format(hash, fname_session))
inject_code = ['import dill',
'dill.dump_session(filename="{}")'.format(fname_session),
]
inject_cell = nbf.v4.new_code_cell('\n'.join(inject_code))
reply, outputs = super().run_cell(inject_cell)
errors = list(filter(lambda out: out.output_type == 'error', outputs))
if len(errors):
logging.info('Cell {}: Warning: serialization failed, cache disabled'.format(hash))
logging.debug(
'Cell {}: Serialization error: {}'.format(hash, CellExecutionError.from_cell_and_msg(cell, errors[0])))
# disable attempts to retrieve cache for subsequent cells
self.disable_cache = True
# remove partial cache for current cell
os.remove(fname_session)
return False
return True
def translate(self):
visitor = NBTranslator(self.document, self.app, self.docpath)
self.document.walkabout(visitor)
nb = _finilize_markdown_cells(visitor.nb)
if self.app.config.nbexport_execute:
ep = ExecutePreprocessor(allow_errors=True)
try:
ep.preprocess(nb, {'metadata': {}})
except CellExecutionError as e:
self.app.warn(str(e))
self.output = nbformat.writes(nb)
nb_man : NotebookExecutionManager
Engine wrapper of notebook being converted
resources : dictionary
Additional resources used in the conversion process. Allows
preprocessors to pass variables into the Jinja engine.
"""
# Execute each cell and update the output in real time.
nb = nb_man.nb
for index, cell in enumerate(nb.cells):
try:
nb_man.cell_start(cell, index)
if not cell.source:
continue
nb.cells[index], resources = self.preprocess_cell(cell, resources, index)
except CellExecutionError as ex:
nb_man.cell_exception(nb.cells[index], cell_index=index, exception=ex)
break
finally:
nb_man.cell_complete(nb.cells[index], cell_index=index)
return nb, resources
nb_man, resources
)
new_cell = nbformat.v4.new_code_cell(
source=('import dagstermill as __dm_dagstermill\n' '__dm_dagstermill._teardown()\n')
)
new_cell.metadata['tags'] = ['injected-teardown']
new_cell.metadata['papermill'] = {}
index = len(nb_man.nb.cells)
nb_man.nb.cells = nb_man.nb.cells + [new_cell]
# Calqued from PapermillExecutePreprocessor.papermill_process
try:
nb_man.cell_start(new_cell, index)
nb_man.nb.cells[index], _ = self.preprocess_cell(new_cell, None, index)
except CellExecutionError as ex: # pragma: nocover
nb_man.cell_exception(nb_man.nb.cells[index], cell_index=index, exception=ex)
finally:
nb_man.cell_complete(nb_man.nb.cells[index], cell_index=index)
return nb_man.nb, resources
with open(nb_path) as f:
nb = nbformat.read(f, as_version=4)
root, ext = splitext(basename(nb_path))
c = Config()
nb, resources = AddEpilog(config=c, pkg_dir=pkg_dir,
dataframes=dataframes
).preprocess(nb, {})
try:
ep = ExecutePreprocessor(config=c)
nb, _ = ep.preprocess(nb, {'metadata': {'path': dirname(nb_path)}})
except CellExecutionError as e:
err_nb_path = join(dirname(nb_path), root + '-errors' + ext)
with open(err_nb_path, 'wt') as f:
nbformat.write(nb, f)
raise CellExecutionError("Errors executing noteboook. See notebook at {} for details.\n{}"
.format(err_nb_path, ''))
if write_notebook:
if write_notebook is True:
exec_nb_path = join(dirname(nb_path), root + '-executed' + ext)
else:
exec_nb_path = write_notebook
with open(exec_nb_path, 'wt') as f:
nbformat.write(nb, f)
def preprocess(self, nb, resources, km=None):
try:
result = super(VoilaExecutePreprocessor, self).preprocess(nb, resources=resources, km=km)
except CellExecutionError as e:
self.log.error(e)
result = (nb, resources)
# Strip errors and traceback if not in debug mode
if should_strip_error(self.config):
self.strip_notebook_errors(nb)
return result
args = ['import sys', 'args = []']
for arg in sys.argv[1:]:
args.append('args.append("{}")'.format(arg.replace('"', '\\"')))
args.append('sys.argv = args')
nb = nbformat.v4.new_notebook()
nb.cells.append(nbformat.v4.new_code_cell(source='\n'.join(args)))
nb.cells.append(nbformat.v4.new_code_cell(source=source))
# Create an execution process that runs an OpenCOR kernel
# and use it to run the notebook
try:
ep = ExecutePreprocessor(timeout=-1, kernel_name='opencor')
ep.preprocess(nb, {'metadata': {'path': script_path + '/'}})
except CellExecutionError as err:
pass
# Tracebacks may contain ANSI escape codes which Windows
# by default doesn't recognise
if platform.system() == 'Windows':
subprocess.call('', shell=True)
# Write any output from the Python code
for output in nb.cells[1].outputs:
if output.output_type == 'stream':
if output.name == 'stdout':
sys.stdout.write(output.text)
elif output.name == 'stderr':
sys.stderr.write(output.text)