Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
pass
try:
from _pytest.compat import CaptureIO
from _pytest.capture import EncodedFile
if isinstance(self.stream, (CaptureIO, EncodedFile)):
# avoid closing streams for pytest
return
except ImportError:
pass
try:
from ipykernel.iostream import OutStream
if isinstance(self.stream, OutStream):
# avoid closing streams for Jupyter Notebook
return
except ImportError:
pass
try:
self.stream.close()
except AttributeError:
self._logger.logger.warn(
"the stream has no close method implementation: type={}".format(type(self.stream))
)
finally:
self._stream = None
def _default_stdout(self):
return OutStream(self.session, self.iopub_thread, u'stdout')
def redirect_output(session, pub_socket):
"""Prevent any of the widgets from permanently hijacking stdout or
stderr"""
sys.stdout = OutStream(session, pub_socket, u'stdout')
sys.stderr = OutStream(session, pub_socket, u'stderr')
try:
yield
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
import ipykernel.iostream
from ipykernel.iostream import *
class OutStream(ipykernel.iostream.OutStream):
def __init__(self, *args, **kwargs):
super(OutStream, self).__init__(*args, **kwargs)
self.execution_count = None
def get_execution_count(self):
raise NotImplementedError("Should be added by the kernelapp");
# Update _flush to send the execution_count back with output
def _flush(self):
"""This is where the actual send happens.
_flush should generally be called in the IO thread,
unless the thread has been destroyed (e.g. forked subprocess).
"""
self._flush_pending = False
def _pick_tqdm_interval(file):
# Heuristics to pick a update interval for progress bar that's nice-looking for users.
isatty = file.isatty()
# Jupyter notebook should be recognized as tty.
# Wait for https://github.com/ipython/ipykernel/issues/268
try:
from ipykernel import iostream
if isinstance(file, iostream.OutStream):
isatty = True
except ImportError:
pass
if isatty:
return 0.5
else:
# When run under mpirun/slurm, isatty is always False.
# Here we apply some hacky heuristics for slurm.
if 'SLURM_JOB_ID' in os.environ:
if int(os.environ.get('SLURM_JOB_NUM_NODES', 1)) > 1:
# multi-machine job, probably not interactive
return 60
else:
# possibly interactive, so let's be conservative
return 15
def redirect_output(session, pub_socket):
"""Prevent any of the widgets from permanently hijacking stdout or
stderr"""
sys.stdout = OutStream(session, pub_socket, u'stdout')
sys.stderr = OutStream(session, pub_socket, u'stderr')
try:
yield
finally:
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
def _pick_tqdm_interval(file):
# Heuristics to pick a update interval for progress bar that's nice-looking for users.
isatty = file.isatty()
# Jupyter notebook should be recognized as tty.
# Wait for https://github.com/ipython/ipykernel/issues/268
try:
from ipykernel import iostream
if isinstance(file, iostream.OutStream):
isatty = True
except ImportError:
pass
if isatty:
return 0.5
else:
# When run under mpirun/slurm, isatty is always False.
# Here we apply some hacky heuristics for slurm.
if 'SLURM_JOB_ID' in os.environ:
if int(os.environ.get('SLURM_JOB_NUM_NODES', 1)) > 1:
# multi-machine job, probably not interactive
return 60
else:
# possibly interactive, so let's be conservative
return 15
def _pick_tqdm_interval(file):
# Heuristics to pick a update interval for progress bar that's nice-looking for users.
isatty = file.isatty()
# Jupyter notebook should be recognized as tty.
# Wait for https://github.com/ipython/ipykernel/issues/268
try:
from ipykernel import iostream
if isinstance(file, iostream.OutStream):
isatty = True
except ImportError:
pass
if isatty:
return 0.5
else:
# When run under mpirun/slurm, isatty is always False.
# Here we apply some hacky heuristics for slurm.
if 'SLURM_JOB_ID' in os.environ:
if int(os.environ.get('SLURM_JOB_NUM_NODES', 1)) > 1:
# multi-machine job, probably not interactive
return 60
else:
# possibly interactive, so let's be conservative
return 15
def _default_stderr(self):
return OutStream(self.session, self.iopub_thread, u'stderr')