Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
)
kernel_info_timeout = Float(60, config=True,
help="""Timeout for giving up on a kernel (in seconds).
On starting and restarting kernels, we check whether the
kernel is running and responsive by sending kernel_info_requests.
This sets the timeout in seconds for how long the kernel can take
before being presumed dead.
This affects the MappingKernelManager (which handles kernel restarts)
and the ZMQChannelsHandler (which handles the startup).
"""
)
_kernel_buffers = Any()
@default('_kernel_buffers')
def _default_kernel_buffers(self):
return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})
last_kernel_activity = Instance(datetime,
help="The last activity on any kernel, including shutting down a kernel")
def __init__(self, **kwargs):
super(MappingKernelManager, self).__init__(**kwargs)
self.last_kernel_activity = utcnow()
allowed_message_types = List(trait=Unicode(), config=True,
help="""White list of allowed kernel message types.
When the list is empty, all message types are allowed.
"""
)
except ImportError:
_use_experimental_60_completion = False
_EXPERIMENTAL_KEY_NAME = '_jupyter_types_experimental'
class IPythonKernel(KernelBase):
shell = Instance('IPython.core.interactiveshell.InteractiveShellABC',
allow_none=True)
shell_class = Type(ZMQInteractiveShell)
use_experimental_completions = Bool(True,
help="Set this flag to False to deactivate the use of experimental IPython completion APIs.",
).tag(config=True)
user_module = Any()
def _user_module_changed(self, name, old, new):
if self.shell is not None:
self.shell.user_module = new
user_ns = Instance(dict, args=None, allow_none=True)
def _user_ns_changed(self, name, old, new):
if self.shell is not None:
self.shell.user_ns = new
self.shell.init_user_ns()
# A reference to the Python builtin 'raw_input' function.
# (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
_sys_raw_input = Any()
_sys_eval_input = Any()
def __init__(self, **kwargs):
"""
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.Context)
def _context_default(self):
return zmq.Context.instance()
# The classes to use for the various channels
shell_channel_class = Type(ChannelABC)
iopub_channel_class = Type(ChannelABC)
stdin_channel_class = Type(ChannelABC)
hb_channel_class = Type(HBChannelABC)
# Protected traits
_shell_channel = Any()
_iopub_channel = Any()
_stdin_channel = Any()
_hb_channel = Any()
# flag for whether execute requests should be allowed to call raw_input:
allow_stdin = True
#--------------------------------------------------------------------------
# Channel proxy methods
#--------------------------------------------------------------------------
def get_shell_msg(self, *args, **kwargs):
"""Get a message from the shell channel"""
return self.shell_channel.get_msg(*args, **kwargs)
def get_iopub_msg(self, *args, **kwargs):
"""Get a message from the iopub channel"""
class Data(BaseObject):
"""Wrapper for Vega-Lite Data definition.
Attributes
----------
format: DataFormat
An object that specifies the format for the data file or values.
url: Unicode
A URL from which to load the data set.
values: List(Any)
Pass array of objects instead of a url to a file.
"""
format = T.Instance(DataFormat, allow_none=True, default_value=None, help="""An object that specifies the format for the data file or values.""")
url = T.Unicode(allow_none=True, default_value=None, help="""A URL from which to load the data set.""")
values = T.List(T.Any(), allow_none=True, default_value=None, help="""Pass array of objects instead of a url to a file.""")
def __init__(self, format=None, url=None, values=None, **kwargs):
kwds = dict(format=format, url=url, values=values)
kwargs.update({k:v for k, v in kwds.items() if v is not None})
super(Data, self).__init__(**kwargs)
In addition to these, the following method(s) may need to be implemented:
- :meth:`.start` start the proxy, if it should be launched by the Hub
instead of externally managed.
If the proxy is externally managed, it should set :attr:`should_start` to False.
- :meth:`.stop` stop the proxy. Only used if :meth:`.start` is also used.
And the following method(s) are optional, but can be provided:
- :meth:`.get_route` gets a single route.
There is a default implementation that extracts data from :meth:`.get_all_routes`,
but implementations may choose to provide a more efficient implementation
of fetching a single route.
"""
db_factory = Any()
@property
def db(self):
return self.db_factory()
app = Any()
hub = Any()
public_url = Unicode()
ssl_key = Unicode()
ssl_cert = Unicode()
host_routing = Bool()
should_start = Bool(
True,
config=True,
help="""Should the Hub start the proxy
Parameters
----------
{multiple_selection_params}
rows: int
The number of rows to display in the widget.
"""
_view_name = Unicode('SelectMultipleView').tag(sync=True)
_model_name = Unicode('SelectMultipleModel').tag(sync=True)
rows = Int(5, help="The number of rows to display.").tag(sync=True)
class _SelectionNonempty(_Selection):
"""Selection that is guaranteed to have a value selected."""
# don't allow None to be an option.
value = Any(help="Selected value")
label = Unicode(help="Selected label")
index = Int(help="Selected index").tag(sync=True)
def __init__(self, *args, **kwargs):
if len(kwargs.get('options', ())) == 0:
raise TraitError('options must be nonempty')
super(_SelectionNonempty, self).__init__(*args, **kwargs)
@validate('options')
def _validate_options(self, proposal):
if isinstance(proposal.value, Iterable) and not isinstance(proposal.value, Mapping):
proposal.value = tuple(proposal.value)
self._options_full = _make_options(proposal.value)
if len(self._options_full) == 0:
raise TraitError("Option list must be nonempty")
return proposal.value
mw_index_url = Unicode(
os.environ.get('MW_INDEX_URL', 'https://meta.wikimedia.org/w/index.php'),
config=True,
help='Full path to index.php of the MW instance to use to log in'
)
executor_threads = Integer(12,
help="""Number of executor threads.
MediaWiki OAuth requests happen in this thread,
so it is mostly waiting for network replies.
""",
config=True,
)
executor = Any()
def normalize_username(self, username):
"""
Override normalize_username to avoid lowercasing usernames
"""
return username
def _executor_default(self):
return ThreadPoolExecutor(self.executor_threads)
async def authenticate(self, handler, data=None):
consumer_token = ConsumerToken(
self.client_id,
self.client_secret,
)
class BaseLauncher(LoggingConfigurable):
"""An asbtraction for starting, stopping and signaling a process."""
# In all of the launchers, the work_dir is where child processes will be
# run. This will usually be the profile_dir, but may not be. any work_dir
# passed into the __init__ method will override the config value.
# This should not be used to set the work_dir for the actual engine
# and controller. Instead, use their own config files or the
# controller_args, engine_args attributes of the launchers to add
# the work_dir option.
work_dir = Unicode(u'.')
loop = Instance('tornado.ioloop.IOLoop', allow_none=True)
start_data = Any()
stop_data = Any()
def _loop_default(self):
return ioloop.IOLoop.current()
def __init__(self, work_dir=u'.', config=None, **kwargs):
super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
self.state = 'before' # can be before, running, after
self.stop_callbacks = []
@property
def args(self):
"""A list of cmd and args that will be used to start the process.
This is what is passed to :func:`spawnProcess` and the first element
will be the process name.
"""
# attribute to override with a GUI
eventloop = Any(None)
@observe('eventloop')
def _update_eventloop(self, change):
"""schedule call to eventloop from IOLoop"""
loop = ioloop.IOLoop.current()
if change.new is not None:
loop.add_callback(self.enter_eventloop)
session = Instance(Session, allow_none=True)
profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True)
shell_streams = List()
control_stream = Instance(ZMQStream, allow_none=True)
iopub_socket = Any()
iopub_thread = Any()
stdin_socket = Any()
log = Instance(logging.Logger, allow_none=True)
# identities:
int_id = Integer(-1)
ident = Unicode()
@default('ident')
def _default_ident(self):
return unicode_type(uuid.uuid4())
# This should be overridden by wrapper kernels that implement any real
# language.
language_info = {}
def _get_additional_traits(cls):
try:
default = cls._additional_traits[0]
except TypeError:
default = cls._additional_traits
if isinstance(default, T.TraitType):
return default
elif default:
return T.Any()
else:
return None