From 677ccc3d7a34e462ddaa85a2897e5418defec179 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Tue, 12 Mar 2019 15:54:01 -0700 Subject: [PATCH 01/10] Refactor buffering, culling and activity monitoring into classes Use composition in preparation for AsyncMappingKernelManager. --- notebook/services/kernels/kernelmanager.py | 332 ++++++++++++++------- 1 file changed, 221 insertions(+), 111 deletions(-) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index a602b9ed6..422979982 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -19,8 +19,8 @@ from tornado.ioloop import IOLoop, PeriodicCallback from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer, - Float, Instance, default, validate -) + Float, Instance, default, validate) +from traitlets.config.configurable import LoggingConfigurable from notebook.utils import maybe_future, to_os_path, exists from notebook._tz import utcnow, isoformat @@ -42,10 +42,6 @@ class MappingKernelManager(MultiKernelManager): _kernel_connections = Dict() - _culler_callback = None - - _initialized_culler = False - @default('root_dir') def _default_root_dir(self): try: @@ -108,18 +104,22 @@ class MappingKernelManager(MultiKernelManager): """ ) - _kernel_buffers = Any() - @default('_kernel_buffers') - def _default_kernel_buffers(self): - return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) - last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + # members used to hold composed instances + buffering_manager = None + kernel_culler = None + activity_monitor = None + def __init__(self, **kwargs): super(MappingKernelManager, self).__init__(**kwargs) self.last_kernel_activity = utcnow() + self.buffering_manager = BufferingManager(parent=self) + self.kernel_culler = KernelCuller(parent=self) + self.activity_monitor = ActivityMonitor(parent=self) + allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. @@ -187,10 +187,6 @@ class MappingKernelManager(MultiKernelManager): self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - # Initialize culling if not already - if not self._initialized_culler: - self.initialize_culler() - # py2-compat raise gen.Return(kernel_id) @@ -208,30 +204,7 @@ class MappingKernelManager(MultiKernelManager): channels: dict({'channel': ZMQStream}) The zmq channels whose messages should be buffered. """ - - if not self.buffer_offline_messages: - for channel, stream in channels.items(): - stream.close() - return - - self.log.info("Starting buffering for %s", session_key) - self._check_kernel_id(kernel_id) - # clear previous buffering state - self.stop_buffering(kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - # record the session key because only one session can buffer - buffer_info['session_key'] = session_key - # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple - buffer_info['buffer'] = [] - buffer_info['channels'] = channels - - # forward any future messages to the internal buffer - def buffer_msg(channel, msg_parts): - self.log.debug("Buffering msg on %s:%s", kernel_id, channel) - buffer_info['buffer'].append((channel, msg_parts)) - - for channel, stream in channels.items(): - stream.on_recv(partial(buffer_msg, channel)) + self.buffering_manager.start_buffering(kernel_id, session_key, channels) def get_buffer(self, kernel_id, session_key): """Get the buffer for a given kernel @@ -245,18 +218,7 @@ class MappingKernelManager(MultiKernelManager): If the session_key matches the current buffered session_key, the buffer will be returned. """ - self.log.debug("Getting buffer for %s", kernel_id) - if kernel_id not in self._kernel_buffers: - return - - buffer_info = self._kernel_buffers[kernel_id] - if buffer_info['session_key'] == session_key: - # remove buffer - self._kernel_buffers.pop(kernel_id) - # only return buffer_info if it's a match - return buffer_info - else: - self.stop_buffering(kernel_id) + return self.buffering_manager.get_buffer(kernel_id, session_key) def stop_buffering(self, kernel_id): """Stop buffering kernel messages @@ -266,22 +228,7 @@ class MappingKernelManager(MultiKernelManager): kernel_id : str The id of the kernel to stop buffering. """ - self.log.debug("Clearing buffer for %s", kernel_id) - self._check_kernel_id(kernel_id) - - if kernel_id not in self._kernel_buffers: - return - buffer_info = self._kernel_buffers.pop(kernel_id) - # close buffering streams - for stream in buffer_info['channels'].values(): - if not stream.closed(): - stream.on_recv(None) - stream.close() - - msg_buffer = buffer_info['buffer'] - if msg_buffer: - self.log.info("Discarding %s buffered messages for %s", - len(msg_buffer), buffer_info['session_key']) + self.buffering_manager.stop_buffering(kernel_id) def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" @@ -372,9 +319,9 @@ class MappingKernelManager(MultiKernelManager): return model def list_kernels(self): - """Returns a list of kernel_id's of kernels running.""" + """Returns a list of kernel models relative to the running kernels.""" kernels = [] - kernel_ids = super(MappingKernelManager, self).list_kernel_ids() + kernel_ids = self.list_kernel_ids() for kernel_id in kernel_ids: model = self.kernel_model(kernel_id) kernels.append(model) @@ -394,7 +341,38 @@ class MappingKernelManager(MultiKernelManager): - update last_activity on every message - record execution_state from status messages """ - kernel = self._kernels[kernel_id] + self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + + def initialize_culler(self): + """Initial culler if not already. + """ + if self.kernel_culler is None: + self.kernel_culler = KernelCuller(parent=self) + + def cull_kernels(self): + # Defer to KernelCuller + self.kernel_culler.cull_kernels() + + def cull_kernel_if_idle(self, kernel_id): + # Defer to KernelCuller + self.kernel_culler.cull_kernel_if_idle(kernel_id) + + +class ActivityMonitor(LoggingConfigurable): + """Establishes activity recorder for each active kernel""" + + def __init__(self, **kwargs): + super(ActivityMonitor, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "ActivityMonitor requires an instance of MappingKernelManager!") + + def start_watching_activity(self, kernel_id, kernel): + """Start watching IOPub messages on a kernel for activity. + + - update last_activity on every message + - record execution_state from status messages + """ # add busy/activity markers: kernel.execution_state = 'starting' kernel.last_activity = utcnow() @@ -406,7 +384,7 @@ class MappingKernelManager(MultiKernelManager): def record_activity(msg_list): """Record an IOPub message arriving from a kernel""" - self.last_kernel_activity = kernel.last_activity = utcnow() + self.parent.last_kernel_activity = kernel.last_activity = utcnow() idents, fed_msg_list = session.feed_identities(msg_list) msg = session.deserialize(fed_msg_list) @@ -420,55 +398,187 @@ class MappingKernelManager(MultiKernelManager): kernel._activity_stream.on_recv(record_activity) - def initialize_culler(self): - """Start idle culler if 'cull_idle_timeout' is greater than zero. - Regardless of that value, set flag that we've been here. +class BufferingManager(LoggingConfigurable): + """Manages buffering across the active kernels""" + _kernel_buffers = Any() + + @default('_kernel_buffers') + def _default_kernel_buffers(self): + return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) + + def __init__(self, **kwargs): + super(BufferingManager, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "BufferingManager requires an instance of MappingKernelManager!") + + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self.parent: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) + + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ + + if not self.parent.buffer_offline_messages: + for channel, stream in channels.items(): + stream.close() + return + + self._check_kernel_id(kernel_id) + self.log.info("Starting buffering for %s", session_key) + # clear previous buffering state + self.parent.stop_buffering(kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + # record the session key because only one session can buffer + buffer_info['session_key'] = session_key + # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple + buffer_info['buffer'] = [] + buffer_info['channels'] = channels + + # forward any future messages to the internal buffer + def buffer_msg(channel, msg_parts): + self.log.debug("Buffering msg on %s:%s", kernel_id, channel) + buffer_info['buffer'].append((channel, msg_parts)) + + for channel, stream in channels.items(): + stream.on_recv(partial(buffer_msg, channel)) + + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. """ - if not self._initialized_culler and self.cull_idle_timeout > 0: - if self._culler_callback is None: - loop = IOLoop.current() - if self.cull_interval <= 0: #handle case where user set invalid value - self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", - self.cull_interval, self.cull_interval_default) - self.cull_interval = self.cull_interval_default - self._culler_callback = PeriodicCallback( - self.cull_kernels, 1000*self.cull_interval) - self.log.info("Culling kernels with idle durations > %s seconds at %s second intervals ...", - self.cull_idle_timeout, self.cull_interval) - if self.cull_busy: - self.log.info("Culling kernels even if busy") - if self.cull_connected: - self.log.info("Culling kernels even with connected clients") - self._culler_callback.start() - - self._initialized_culler = True + if kernel_id not in self._kernel_buffers: + return + + self.log.debug("Getting buffer for %s", kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + if buffer_info['session_key'] == session_key: + # remove buffer + self._kernel_buffers.pop(kernel_id) + # only return buffer_info if it's a match + return buffer_info + else: + self.parent.stop_buffering(kernel_id) + + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self._check_kernel_id(kernel_id) + + if kernel_id not in self._kernel_buffers: + return + self.log.debug("Clearing buffer for %s", kernel_id) + buffer_info = self._kernel_buffers.pop(kernel_id) + # close buffering streams + for stream in buffer_info['channels'].values(): + if not stream.closed(): + stream.on_recv(None) + stream.close() + + msg_buffer = buffer_info['buffer'] + if msg_buffer: + self.log.info("Discarding %s buffered messages for %s", + len(msg_buffer), buffer_info['session_key']) + + +class KernelCuller(LoggingConfigurable): + """Handles culling responsibilities for active kernels""" + + def __init__(self, **kwargs): + super(KernelCuller, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "KernelCuller requires an instance of MappingKernelManager!") + self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging + + # Start idle culler if 'cull_idle_timeout' is greater than zero. + # Regardless of that value, set flag that we've been here. + if self.parent.cull_idle_timeout > 0: + if self.parent.cull_interval <= 0: # handle case where user set invalid value + self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", + self.parent.cull_interval, self.parent.cull_interval_default) + self.parent.cull_interval = self.parent.cull_interval_default + self._culler_callback = PeriodicCallback( + self.parent.cull_kernels, 1000*self.parent.cull_interval) + self._culler_callback.start() + self._log_info() + + def _log_info(self): + """Builds a single informational message relative to the culling configuration (logged at startup).""" + log_msg = list() + log_msg.append("Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals". + format(cull_state=self.cull_state, timeout=self.parent.cull_idle_timeout, + interval=self.parent.cull_interval)) + if self.parent.cull_busy or self.parent.cull_connected: + log_msg.append(" - including") + if self.parent.cull_busy: + log_msg.append(" busy") + if self.parent.cull_connected: + log_msg.append(" and") + if self.parent.cull_connected: + log_msg.append(" connected") + log_msg.append(" kernels") + log_msg.append(".") + self.log.info(''.join(log_msg)) def cull_kernels(self): - self.log.debug("Polling every %s seconds for kernels idle > %s seconds...", - self.cull_interval, self.cull_idle_timeout) - """Create a separate list of kernels to avoid conflicting updates while iterating""" - for kernel_id in list(self._kernels): + self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", + self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) + # Get a separate list of kernels to avoid conflicting updates while iterating + for kernel_id in self.parent.list_kernel_ids(): try: - self.cull_kernel_if_idle(kernel_id) + self.parent.cull_kernel_if_idle(kernel_id) except Exception as e: - self.log.exception("The following exception was encountered while checking the idle duration of kernel %s: %s", - kernel_id, e) + self.log.exception("The following exception was encountered while checking the idle " + "duration of kernel %s: %s", kernel_id, e) def cull_kernel_if_idle(self, kernel_id): - kernel = self._kernels[kernel_id] - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", kernel_id, kernel.kernel_name, kernel.last_activity) - if kernel.last_activity is not None: - dt_now = utcnow() - dt_idle = dt_now - kernel.last_activity + + # Get the kernel model and use that to determine cullability... + model = self.parent.kernel_model(kernel_id) + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, model['name'], model['last_activity']) + if model['last_activity'] is not None: + # Convert dates to compatible formats. Since both are UTC, strip the TZ info from + # the current time and convert the last_activity string to a datetime. + dt_now = utcnow().replace(tzinfo=None) + dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") + dt_idle = dt_now - dt_last_activity # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout) - is_idle_execute = self.cull_busy or (kernel.execution_state != 'busy') - connections = self._kernel_connections.get(kernel_id, 0) - is_idle_connected = self.cull_connected or not connections + is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) + is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') + connections = model.get('connections', 0) + is_idle_connected = self.parent.cull_connected or connections == 0 # Cull the kernel if all three criteria are met - if (is_idle_time and is_idle_execute and is_idle_connected): + if is_idle_time and is_idle_execute and is_idle_connected: idle_duration = int(dt_idle.total_seconds()) self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration) - self.shutdown_kernel(kernel_id) + model['execution_state'], model['name'], kernel_id, connections, idle_duration) + self.parent.shutdown_kernel(kernel_id) From 3226b0734520e9b504fa1e582bf5e6f5eb8d14f8 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 13 Mar 2019 16:25:39 -0700 Subject: [PATCH 02/10] Add support for AsyncMappingKernelManager Supports running against incompatible jupyter_client so long as the desired kernel_manager_class is not `AsyncMappingKernelManager`. --- notebook/notebookapp.py | 14 +- notebook/services/kernels/kernelmanager.py | 428 +++++++++++++----- .../kernels/tests/test_kernels_api.py | 28 ++ notebook/services/sessions/sessionmanager.py | 2 +- 4 files changed, 349 insertions(+), 123 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index a657ce82d..e6a2c3833 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -75,7 +75,7 @@ from notebook import ( from .base.handlers import Template404, RedirectWithParams from .log import log_request -from .services.kernels.kernelmanager import MappingKernelManager +from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager, MappingKernelManagerBase from .services.config import ConfigManager from .services.contents.manager import ContentsManager from .services.contents.filemanager import FileContentsManager @@ -1177,6 +1177,7 @@ class NotebookApp(JupyterApp): kernel_manager_class = Type( default_value=MappingKernelManager, + klass=MappingKernelManagerBase, config=True, help=_('The kernel manager class to use.') ) @@ -1380,6 +1381,13 @@ class NotebookApp(JupyterApp): connection_dir=self.runtime_dir, kernel_spec_manager=self.kernel_spec_manager, ) + # Ensure the appropriate jupyter_client is in place. + # TODO: remove once dependencies are updated. + if isinstance(self.kernel_manager, AsyncMappingKernelManager): + if not hasattr(self.kernel_manager, 'list_kernel_ids'): + raise RuntimeError("Using `AsyncMappingKernelManager` without an appropriate " + "jupyter_client installed! Upgrade jupyter_client and try again.") + self.contents_manager = self.contents_manager_class( parent=self, log=self.log, @@ -1897,8 +1905,8 @@ class NotebookApp(JupyterApp): assembled_url = urljoin('file:', pathname2url(open_file)) else: assembled_url = url_path_join(self.connection_url, uri) - - b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new) + + b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new) threading.Thread(target=b).start() def start(self): diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 422979982..93faefe29 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -28,13 +28,27 @@ from ipython_genutils.py3compat import getcwd from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL +# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate juptyer_client. +# This will be confirmed at runtime in notebookapp. +# TODO: remove once dependencies are updated. +try: + from jupyter_client.multikernelmanager import AsyncMultiKernelManager +except ImportError: + class AsyncMultiKernelManager(object): + pass -class MappingKernelManager(MultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling""" - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.IOLoopKernelManager" +class MappingKernelManagerBase(LoggingConfigurable): + """ + This class exists so that class-based traits relative to MappingKernelManager and AsyncMappingKernelManager + can be satisfied since AsyncMappingKernelManager doesn't derive from the former. It is only necessary until + we converge to using only async, but that requires subclasses to be updated. + + Since we have this class, we'll reduce duplication of code between the disjoint classes by using this + common superclass for configuration properties and static methods and local member variables. + + TODO - move contents back to appropriate mapping kernel manager once we converge to async only. + """ kernel_argv = List(Unicode()) @@ -66,7 +80,7 @@ class MappingKernelManager(MultiKernelManager): for users with poor network connections.""" ) - cull_interval_default = 300 # 5 minutes + cull_interval_default = 300 # 5 minutes cull_interval = Integer(cull_interval_default, config=True, help="""The interval (in seconds) on which to check for idle kernels exceeding the cull timeout value.""" ) @@ -107,34 +121,25 @@ class MappingKernelManager(MultiKernelManager): last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + allowed_message_types = List(trait=Unicode(), config=True, + help="""White list of allowed kernel message types. + When the list is empty, all message types are allowed. + """ + ) + # members used to hold composed instances buffering_manager = None kernel_culler = None activity_monitor = None def __init__(self, **kwargs): - super(MappingKernelManager, self).__init__(**kwargs) + super(MappingKernelManagerBase, self).__init__(**kwargs) self.last_kernel_activity = utcnow() self.buffering_manager = BufferingManager(parent=self) self.kernel_culler = KernelCuller(parent=self) self.activity_monitor = ActivityMonitor(parent=self) - allowed_message_types = List(trait=Unicode(), config=True, - help="""White list of allowed kernel message types. - When the list is empty, all message types are allowed. - """ - ) - - #------------------------------------------------------------------------- - # Methods for managing kernels and sessions - #------------------------------------------------------------------------- - - def _handle_kernel_died(self, kernel_id): - """notice that a kernel died""" - self.log.warning("Kernel %s died, removing from map.", kernel_id) - self.remove_kernel(kernel_id) - def cwd_for_path(self, path): """Turn API path into absolute OS path.""" os_path = to_os_path(path, self.root_dir) @@ -144,6 +149,100 @@ class MappingKernelManager(MultiKernelManager): os_path = os.path.dirname(os_path) return os_path + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ + self.buffering_manager.start_buffering(kernel_id, session_key, channels) + + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + """ + return self.buffering_manager.get_buffer(kernel_id, session_key) + + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self.buffering_manager.stop_buffering(kernel_id) + + def notify_connect(self, kernel_id): + """Notice a new connection to a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] += 1 + + def notify_disconnect(self, kernel_id): + """Notice a disconnection from a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] -= 1 + + # monitoring activity: + + def start_watching_activity(self, kernel_id): + """Start watching IOPub messages on a kernel for activity. Remove if no overrides + + - update last_activity on every message + - record execution_state from status messages + """ + self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + + def initialize_culler(self): + """Initial culler if not already. Remove if no overrides + """ + if self.kernel_culler is None: + self.kernel_culler = KernelCuller(parent=self) + + def cull_kernels(self): + # Defer to KernelCuller. Remove if no overrides + self.kernel_culler.cull_kernels() + + def cull_kernel_if_idle(self, kernel_id): + # Defer to KernelCuller. Remove if no overrides + self.kernel_culler.cull_kernel_if_idle(kernel_id) + + +class MappingKernelManager(MultiKernelManager, MappingKernelManagerBase): + """A KernelManager that handles notebook mapping and HTTP error handling""" + + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.IOLoopKernelManager" + + def __init__(self, **kwargs): + super(MappingKernelManager, self).__init__(**kwargs) + + # ------------------------------------------------------------------------- + # Methods for managing kernels and sessions + # ------------------------------------------------------------------------- + + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warning("Kernel %s died, removing from map.", kernel_id) + self.remove_kernel(kernel_id) + @gen.coroutine def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. @@ -190,46 +289,6 @@ class MappingKernelManager(MultiKernelManager): # py2-compat raise gen.Return(kernel_id) - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to start buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - self.buffering_manager.start_buffering(kernel_id, session_key, channels) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - return self.buffering_manager.get_buffer(kernel_id, session_key) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - """ - self.buffering_manager.stop_buffering(kernel_id) - def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) @@ -291,16 +350,6 @@ class MappingKernelManager(MultiKernelManager): # wait for restart to complete yield future - def notify_connect(self, kernel_id): - """Notice a new connection to a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] += 1 - - def notify_disconnect(self, kernel_id): - """Notice a disconnection from a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] -= 1 - def kernel_model(self, kernel_id): """Return a JSON-safe dict representing a kernel @@ -333,29 +382,164 @@ class MappingKernelManager(MultiKernelManager): if kernel_id not in self: raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - # monitoring activity: - def start_watching_activity(self, kernel_id): - """Start watching IOPub messages on a kernel for activity. +class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBase): + """A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout""" - - update last_activity on every message - - record execution_state from status messages + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.AsyncIOLoopKernelManager" + + def __init__(self, **kwargs): + super(AsyncMappingKernelManager, self).__init__(**kwargs) + + # ------------------------------------------------------------------------- + # Methods for managing kernels and sessions + # ------------------------------------------------------------------------- + + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warning("Kernel %s died, removing from map.", kernel_id) + self.remove_kernel(kernel_id) + + @gen.coroutine + def start_kernel(self, kernel_id=None, path=None, **kwargs): + """Start a kernel for a session and return its kernel_id. + + Parameters + ---------- + kernel_id : uuid + The uuid to associate the new kernel with. If this + is not None, this kernel will be persistent whenever it is + requested. + path : API path + The API path (unicode, '/' delimited) for the cwd. + Will be transformed to an OS path relative to root_dir. + kernel_name : str + The name identifying which kernel spec to launch. This is ignored if + an existing kernel is returned, but it may be checked in the future. """ - self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + if kernel_id is None: + if path is not None: + kwargs['cwd'] = self.cwd_for_path(path) + kernel_id = yield super(AsyncMappingKernelManager, self).start_kernel(**kwargs) - def initialize_culler(self): - """Initial culler if not already. + self._kernel_connections[kernel_id] = 0 + self.start_watching_activity(kernel_id) + self.log.info("Kernel started (async): %s" % kernel_id) + self.log.debug("Kernel args: %r" % kwargs) + # register callback for failed auto-restart + self.add_restart_callback(kernel_id, + lambda: self._handle_kernel_died(kernel_id), + 'dead', + ) + + # Increase the metric of number of kernels running + # for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).inc() + + else: + self._check_kernel_id(kernel_id) + self.log.info("Using existing kernel: %s" % kernel_id) + + # py2-compat + raise gen.Return(kernel_id) + + @gen.coroutine + def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + if kernel._activity_stream: + kernel._activity_stream.close() + kernel._activity_stream = None + self.stop_buffering(kernel_id) + self._kernel_connections.pop(kernel_id, None) + self.last_kernel_activity = utcnow() + + # Decrease the metric of number of kernels + # running for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).dec() + + yield super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) + + @gen.coroutine + def restart_kernel(self, kernel_id, now=False): + """Restart a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + yield super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) + kernel = self.get_kernel(kernel_id) + # return a Future that will resolve when the kernel has successfully restarted + channel = kernel.connect_shell() + future = Future() + + def finish(): + """Common cleanup when restart finishes/fails for any reason.""" + if not channel.closed(): + channel.close() + loop.remove_timeout(timeout) + kernel.remove_restart_callback(on_restart_failed, 'dead') + + def on_reply(msg): + self.log.debug("Kernel info reply received: %s", kernel_id) + finish() + if not future.done(): + future.set_result(msg) + + def on_timeout(): + self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) + finish() + if not future.done(): + future.set_exception(gen.TimeoutError("Timeout waiting for restart")) + + def on_restart_failed(): + self.log.warning("Restarting kernel failed: %s", kernel_id) + finish() + if not future.done(): + future.set_exception(RuntimeError("Restart failed")) + + kernel.add_restart_callback(on_restart_failed, 'dead') + kernel.session.send(channel, "kernel_info_request") + channel.on_recv(on_reply) + loop = IOLoop.current() + timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) + raise gen.Return(future) + + def kernel_model(self, kernel_id): + """Return a JSON-safe dict representing a kernel + + For use in representing kernels in the JSON APIs. """ - if self.kernel_culler is None: - self.kernel_culler = KernelCuller(parent=self) + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] - def cull_kernels(self): - # Defer to KernelCuller - self.kernel_culler.cull_kernels() + model = { + "id":kernel_id, + "name": kernel.kernel_name, + "last_activity": isoformat(kernel.last_activity), + "execution_state": kernel.execution_state, + "connections": self._kernel_connections[kernel_id], + } + return model - def cull_kernel_if_idle(self, kernel_id): - # Defer to KernelCuller - self.kernel_culler.cull_kernel_if_idle(kernel_id) + def list_kernels(self): + """Returns a list of kernel models relative to the running kernels.""" + kernels = [] + kernel_ids = self.list_kernel_ids() + for kernel_id in kernel_ids: + model = self.kernel_model(kernel_id) + kernels.append(model) + return kernels + + # override _check_kernel_id to raise 404 instead of KeyError + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) class ActivityMonitor(LoggingConfigurable): @@ -363,9 +547,9 @@ class ActivityMonitor(LoggingConfigurable): def __init__(self, **kwargs): super(ActivityMonitor, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "ActivityMonitor requires an instance of MappingKernelManager!") + "ActivityMonitor requires an instance of MappingKernelManagerBase!") def start_watching_activity(self, kernel_id, kernel): """Start watching IOPub messages on a kernel for activity. @@ -409,9 +593,9 @@ class BufferingManager(LoggingConfigurable): def __init__(self, **kwargs): super(BufferingManager, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "BufferingManager requires an instance of MappingKernelManager!") + "BufferingManager requires an instance of MappingKernelManagerBase!") def _check_kernel_id(self, kernel_id): """Check a that a kernel_id exists and raise 404 if not.""" @@ -513,9 +697,9 @@ class KernelCuller(LoggingConfigurable): def __init__(self, **kwargs): super(KernelCuller, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManager): + if not isinstance(self.parent, MappingKernelManagerBase): raise RuntimeError( - "KernelCuller requires an instance of MappingKernelManager!") + "KernelCuller requires an instance of MappingKernelManagerBase!") self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging # Start idle culler if 'cull_idle_timeout' is greater than zero. @@ -548,37 +732,43 @@ class KernelCuller(LoggingConfigurable): log_msg.append(".") self.log.info(''.join(log_msg)) + @gen.coroutine def cull_kernels(self): self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) # Get a separate list of kernels to avoid conflicting updates while iterating for kernel_id in self.parent.list_kernel_ids(): - try: - self.parent.cull_kernel_if_idle(kernel_id) - except Exception as e: - self.log.exception("The following exception was encountered while checking the idle " - "duration of kernel %s: %s", kernel_id, e) + yield self.parent.cull_kernel_if_idle(kernel_id) + @gen.coroutine def cull_kernel_if_idle(self, kernel_id): # Get the kernel model and use that to determine cullability... - model = self.parent.kernel_model(kernel_id) - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", - kernel_id, model['name'], model['last_activity']) - if model['last_activity'] is not None: - # Convert dates to compatible formats. Since both are UTC, strip the TZ info from - # the current time and convert the last_activity string to a datetime. - dt_now = utcnow().replace(tzinfo=None) - dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") - dt_idle = dt_now - dt_last_activity - # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) - is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') - connections = model.get('connections', 0) - is_idle_connected = self.parent.cull_connected or connections == 0 - # Cull the kernel if all three criteria are met - if is_idle_time and is_idle_execute and is_idle_connected: - idle_duration = int(dt_idle.total_seconds()) - self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - model['execution_state'], model['name'], kernel_id, connections, idle_duration) - self.parent.shutdown_kernel(kernel_id) + try: + model = self.parent.kernel_model(kernel_id) + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, model['name'], model['last_activity']) + + if model['last_activity'] is not None: + # Convert dates to compatible formats. Since both are UTC, strip the TZ info from + # the current time and convert the last_activity string to a datetime. + dt_now = utcnow().replace(tzinfo=None) + dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") + dt_idle = dt_now - dt_last_activity + # Compute idle properties + is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) + is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') + connections = model.get('connections', 0) + is_idle_connected = self.parent.cull_connected or connections == 0 + # Cull the kernel if all three criteria are met + if is_idle_time and is_idle_execute and is_idle_connected: + idle_duration = int(dt_idle.total_seconds()) + self.log.warning( + "Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", + model['execution_state'], model['name'], kernel_id, connections, idle_duration) + yield maybe_future(self.parent.shutdown_kernel(kernel_id)) + except KeyError: + pass # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. + except Exception as e: # other errors are not as expected, so we'll make some noise, but continue. + self.log.exception("The following exception was encountered while checking the idle " + "duration of kernel %s: %s", kernel_id, e) diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 83bfb0c3e..0d4bc94ac 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -8,12 +8,19 @@ from traitlets.config import Config from tornado.httpclient import HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect +from unittest import SkipTest from jupyter_client.kernelspec import NATIVE_KERNEL_NAME from notebook.utils import url_path_join from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error +try: + from jupyter_client import AsyncMultiKernelManager + async_testing_enabled = True +except ImportError: + async_testing_enabled = False + class KernelAPI(object): """Wrapper for kernel REST API requests""" @@ -188,6 +195,27 @@ class KernelAPITest(NotebookTestBase): self.assertEqual(model['connections'], 0) +class AsyncKernelAPITest(KernelAPITest): + """Test the kernels web service API using the AsyncMappingKernelManager""" + + @classmethod + def get_argv(cls): + argv = super(AsyncKernelAPITest, cls).get_argv() + + # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. + if async_testing_enabled: + argv.extend(['--NotebookApp.kernel_manager_class=' + 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv + + def setUp(self): + if not async_testing_enabled: + raise SkipTest("AsyncKernelAPITest.{test_method} skipped due to down-level jupyter_client!". + format(test_method=self._testMethodName)) + super(AsyncKernelAPITest, self).setUp() + + class KernelFilterTest(NotebookTestBase): # A special install of NotebookTestBase where only `kernel_info_request` diff --git a/notebook/services/sessions/sessionmanager.py b/notebook/services/sessions/sessionmanager.py index 63e184482..5ec35edd0 100644 --- a/notebook/services/sessions/sessionmanager.py +++ b/notebook/services/sessions/sessionmanager.py @@ -22,7 +22,7 @@ from notebook.utils import maybe_future class SessionManager(LoggingConfigurable): - kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManager') + kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManagerBase') contents_manager = Instance('notebook.services.contents.manager.ContentsManager') # Session database initialized below From 7abd4bee27f9b124af219ff5a98821d6b594005a Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Tue, 10 Mar 2020 10:20:23 -0700 Subject: [PATCH 03/10] Indicate that async kernel management is configured --- notebook/notebookapp.py | 18 ++++++++++++++---- notebook/services/kernels/kernelmanager.py | 3 +-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index e6a2c3833..39552dc85 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -108,6 +108,13 @@ from notebook._sysinfo import get_sys_info from ._tz import utcnow, utcfromtimestamp from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url +# Check if we can user async kernel management +try: + from jupyter_client import AsyncMultiKernelManager + async_kernel_mgmt_available = True +except ImportError: + async_kernel_mgmt_available = False + #----------------------------------------------------------------------------- # Module globals #----------------------------------------------------------------------------- @@ -1375,6 +1382,7 @@ class NotebookApp(JupyterApp): self.kernel_spec_manager = self.kernel_spec_manager_class( parent=self, ) + self.kernel_manager = self.kernel_manager_class( parent=self, log=self.log, @@ -1382,11 +1390,13 @@ class NotebookApp(JupyterApp): kernel_spec_manager=self.kernel_spec_manager, ) # Ensure the appropriate jupyter_client is in place. - # TODO: remove once dependencies are updated. if isinstance(self.kernel_manager, AsyncMappingKernelManager): - if not hasattr(self.kernel_manager, 'list_kernel_ids'): - raise RuntimeError("Using `AsyncMappingKernelManager` without an appropriate " - "jupyter_client installed! Upgrade jupyter_client and try again.") + if not async_kernel_mgmt_available: + raise ValueError("You're using `AsyncMappingKernelManager` without an appropriate " + "jupyter_client installed! Upgrade jupyter_client or change kernel managers.") + else: + self.log.info("Asynchronous kernel management has been configured via '{}'.". + format(self.kernel_manager.__class__.__name__)) self.contents_manager = self.contents_manager_class( parent=self, diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 93faefe29..6c0bfb31c 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -35,6 +35,7 @@ try: from jupyter_client.multikernelmanager import AsyncMultiKernelManager except ImportError: class AsyncMultiKernelManager(object): + """Empty class to satisfy unused reference by AsyncMappingKernelManager.""" pass @@ -46,8 +47,6 @@ class MappingKernelManagerBase(LoggingConfigurable): Since we have this class, we'll reduce duplication of code between the disjoint classes by using this common superclass for configuration properties and static methods and local member variables. - - TODO - move contents back to appropriate mapping kernel manager once we converge to async only. """ kernel_argv = List(Unicode()) From 8a4beb0d153d31dab487d6ec655c7d2c53b6a5b2 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Tue, 10 Mar 2020 16:40:39 -0700 Subject: [PATCH 04/10] Convert to async/await and apply touch ups --- notebook/notebookapp.py | 13 +++++--- notebook/services/kernels/handlers.py | 2 +- notebook/services/kernels/kernelmanager.py | 20 +++++------ .../sessions/tests/test_sessions_api.py | 33 ++++++++++++++++++- 4 files changed, 50 insertions(+), 18 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 39552dc85..0c4244926 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -7,6 +7,7 @@ from __future__ import absolute_import, print_function import notebook +import asyncio import binascii import datetime import errno @@ -583,7 +584,7 @@ class NotebookApp(JupyterApp): flags = flags classes = [ - KernelManager, Session, MappingKernelManager, KernelSpecManager, + KernelManager, Session, MappingKernelManager, AsyncMappingKernelManager, KernelSpecManager, ContentsManager, FileContentsManager, NotebookNotary, GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient, ] @@ -1392,10 +1393,10 @@ class NotebookApp(JupyterApp): # Ensure the appropriate jupyter_client is in place. if isinstance(self.kernel_manager, AsyncMappingKernelManager): if not async_kernel_mgmt_available: - raise ValueError("You're using `AsyncMappingKernelManager` without an appropriate " + raise ValueError("You are using `AsyncMappingKernelManager` without an appropriate " "jupyter_client installed! Upgrade jupyter_client or change kernel managers.") else: - self.log.info("Asynchronous kernel management has been configured via '{}'.". + self.log.info("Asynchronous kernel management has been configured to use '{}'.". format(self.kernel_manager.__class__.__name__)) self.contents_manager = self.contents_manager_class( @@ -1800,7 +1801,11 @@ class NotebookApp(JupyterApp): n_kernels = len(self.kernel_manager.list_kernel_ids()) kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels) self.log.info(kernel_msg % n_kernels) - self.kernel_manager.shutdown_all() + # If we're using async kernel management, we need to invoke the async method via the event loop. + if isinstance(self.kernel_manager, AsyncMappingKernelManager): + asyncio.get_event_loop().run_until_complete(self.kernel_manager.shutdown_all()) + else: + self.kernel_manager.shutdown_all() def notebook_info(self, kernel_count=True): "Return the current working directory and the server url information" diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 8484dccdf..bca99ce1b 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -75,7 +75,7 @@ class KernelActionHandler(APIHandler): def post(self, kernel_id, action): km = self.kernel_manager if action == 'interrupt': - km.interrupt_kernel(kernel_id) + yield maybe_future(km.interrupt_kernel(kernel_id)) self.set_status(204) if action == 'restart': diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 6c0bfb31c..ff6fe721f 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -401,8 +401,7 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas self.log.warning("Kernel %s died, removing from map.", kernel_id) self.remove_kernel(kernel_id) - @gen.coroutine - def start_kernel(self, kernel_id=None, path=None, **kwargs): + async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. Parameters @@ -421,7 +420,7 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = yield super(AsyncMappingKernelManager, self).start_kernel(**kwargs) + kernel_id = await super(AsyncMappingKernelManager, self).start_kernel(**kwargs) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) @@ -443,11 +442,9 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - # py2-compat - raise gen.Return(kernel_id) + return kernel_id - @gen.coroutine - def shutdown_kernel(self, kernel_id, now=False, restart=False): + async def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) kernel = self._kernels[kernel_id] @@ -464,13 +461,12 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas type=self._kernels[kernel_id].kernel_name ).dec() - yield super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) + await super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) - @gen.coroutine - def restart_kernel(self, kernel_id, now=False): + async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - yield super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) + await super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -506,7 +502,7 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas channel.on_recv(on_reply) loop = IOLoop.current() timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) - raise gen.Return(future) + return future def kernel_model(self, kernel_id): """Return a JSON-safe dict representing a kernel diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 9c551fc79..9323b5966 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -9,13 +9,22 @@ import requests import shutil import time -pjoin = os.path.join +from unittest import SkipTest from notebook.utils import url_path_join from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error from nbformat.v4 import new_notebook from nbformat import write +try: + from jupyter_client import AsyncMultiKernelManager + async_testing_enabled = True +except ImportError: + async_testing_enabled = False + +pjoin = os.path.join + + class SessionAPI(object): """Wrapper for notebook API calls.""" def __init__(self, request): @@ -77,6 +86,7 @@ class SessionAPI(object): def delete(self, id): return self._req('DELETE', id) + class SessionAPITest(NotebookTestBase): """Test the sessions web service API""" def setUp(self): @@ -254,3 +264,24 @@ class SessionAPITest(NotebookTestBase): kernel.pop('last_activity') [ k.pop('last_activity') for k in kernel_list ] self.assertEqual(kernel_list, [kernel]) + + +class AsyncSessionAPITest(SessionAPITest): + """Test the sessions web service API using the AsyncMappingKernelManager""" + + @classmethod + def get_argv(cls): + argv = super(AsyncSessionAPITest, cls).get_argv() + + # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. + if async_testing_enabled: + argv.extend(['--NotebookApp.kernel_manager_class=' + 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv + + def setUp(self): + if not async_testing_enabled: + raise SkipTest("AsyncSessionAPITest.{test_method} skipped due to down-level jupyter_client!". + format(test_method=self._testMethodName)) + super(AsyncSessionAPITest, self).setUp() From 570986282e9aaf5d248296d13dcfc6e49cc4c32f Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Thu, 19 Mar 2020 17:31:13 -0700 Subject: [PATCH 05/10] Move duplicated methods to mixin, fix mixin order --- notebook/services/kernels/kernelmanager.py | 100 +++++++-------------- 1 file changed, 34 insertions(+), 66 deletions(-) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index ff6fe721f..d5cc00f2a 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -139,6 +139,38 @@ class MappingKernelManagerBase(LoggingConfigurable): self.kernel_culler = KernelCuller(parent=self) self.activity_monitor = ActivityMonitor(parent=self) + def kernel_model(self, kernel_id): + """Return a JSON-safe dict representing a kernel + + For use in representing kernels in the JSON APIs. + """ + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + + model = { + "id":kernel_id, + "name": kernel.kernel_name, + "last_activity": isoformat(kernel.last_activity), + "execution_state": kernel.execution_state, + "connections": self._kernel_connections[kernel_id], + } + return model + + def list_kernels(self): + """Returns a list of kernel models relative to the running kernels.""" + kernels = [] + kernel_ids = self.list_kernel_ids() + for kernel_id in kernel_ids: + model = self.kernel_model(kernel_id) + kernels.append(model) + return kernels + + # override _check_kernel_id to raise 404 instead of KeyError + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) + def cwd_for_path(self, path): """Turn API path into absolute OS path.""" os_path = to_os_path(path, self.root_dir) @@ -223,7 +255,7 @@ class MappingKernelManagerBase(LoggingConfigurable): self.kernel_culler.cull_kernel_if_idle(kernel_id) -class MappingKernelManager(MultiKernelManager, MappingKernelManagerBase): +class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): """A KernelManager that handles notebook mapping and HTTP error handling""" @default('kernel_manager_class') @@ -349,40 +381,8 @@ class MappingKernelManager(MultiKernelManager, MappingKernelManagerBase): # wait for restart to complete yield future - def kernel_model(self, kernel_id): - """Return a JSON-safe dict representing a kernel - - For use in representing kernels in the JSON APIs. - """ - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - - model = { - "id":kernel_id, - "name": kernel.kernel_name, - "last_activity": isoformat(kernel.last_activity), - "execution_state": kernel.execution_state, - "connections": self._kernel_connections[kernel_id], - } - return model - - def list_kernels(self): - """Returns a list of kernel models relative to the running kernels.""" - kernels = [] - kernel_ids = self.list_kernel_ids() - for kernel_id in kernel_ids: - model = self.kernel_model(kernel_id) - kernels.append(model) - return kernels - - # override _check_kernel_id to raise 404 instead of KeyError - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - -class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBase): +class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManager): """A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout""" @default('kernel_manager_class') @@ -504,38 +504,6 @@ class AsyncMappingKernelManager(AsyncMultiKernelManager, MappingKernelManagerBas timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future - def kernel_model(self, kernel_id): - """Return a JSON-safe dict representing a kernel - - For use in representing kernels in the JSON APIs. - """ - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - - model = { - "id":kernel_id, - "name": kernel.kernel_name, - "last_activity": isoformat(kernel.last_activity), - "execution_state": kernel.execution_state, - "connections": self._kernel_connections[kernel_id], - } - return model - - def list_kernels(self): - """Returns a list of kernel models relative to the running kernels.""" - kernels = [] - kernel_ids = self.list_kernel_ids() - for kernel_id in kernel_ids: - model = self.kernel_model(kernel_id) - kernels.append(model) - return kernels - - # override _check_kernel_id to raise 404 instead of KeyError - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - class ActivityMonitor(LoggingConfigurable): """Establishes activity recorder for each active kernel""" From 788bb0ebe04ac3dd84254f7352c93da2572be7fe Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 20 Mar 2020 08:09:39 -0700 Subject: [PATCH 06/10] Update all coroutine/yield methods to async/await, share another method --- notebook/services/kernels/kernelmanager.py | 40 ++++++++-------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index d5cc00f2a..346e27712 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -139,6 +139,11 @@ class MappingKernelManagerBase(LoggingConfigurable): self.kernel_culler = KernelCuller(parent=self) self.activity_monitor = ActivityMonitor(parent=self) + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warning("Kernel %s died, removing from map.", kernel_id) + self.remove_kernel(kernel_id) + def kernel_model(self, kernel_id): """Return a JSON-safe dict representing a kernel @@ -269,13 +274,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): # Methods for managing kernels and sessions # ------------------------------------------------------------------------- - def _handle_kernel_died(self, kernel_id): - """notice that a kernel died""" - self.log.warning("Kernel %s died, removing from map.", kernel_id) - self.remove_kernel(kernel_id) - - @gen.coroutine - def start_kernel(self, kernel_id=None, path=None, **kwargs): + async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. Parameters @@ -294,7 +293,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = yield maybe_future( + kernel_id = await maybe_future( super(MappingKernelManager, self).start_kernel(**kwargs) ) self._kernel_connections[kernel_id] = 0 @@ -317,8 +316,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - # py2-compat - raise gen.Return(kernel_id) + return kernel_id def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" @@ -338,11 +336,10 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) - @gen.coroutine - def restart_kernel(self, kernel_id): + async def restart_kernel(self, kernel_id): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - yield maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id)) + await maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -379,7 +376,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): loop = IOLoop.current() timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) # wait for restart to complete - yield future + await future class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManager): @@ -396,11 +393,6 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage # Methods for managing kernels and sessions # ------------------------------------------------------------------------- - def _handle_kernel_died(self, kernel_id): - """notice that a kernel died""" - self.log.warning("Kernel %s died, removing from map.", kernel_id) - self.remove_kernel(kernel_id) - async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. @@ -695,16 +687,14 @@ class KernelCuller(LoggingConfigurable): log_msg.append(".") self.log.info(''.join(log_msg)) - @gen.coroutine - def cull_kernels(self): + async def cull_kernels(self): self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) # Get a separate list of kernels to avoid conflicting updates while iterating for kernel_id in self.parent.list_kernel_ids(): - yield self.parent.cull_kernel_if_idle(kernel_id) + await self.parent.cull_kernel_if_idle(kernel_id) - @gen.coroutine - def cull_kernel_if_idle(self, kernel_id): + async def cull_kernel_if_idle(self, kernel_id): # Get the kernel model and use that to determine cullability... try: @@ -729,7 +719,7 @@ class KernelCuller(LoggingConfigurable): self.log.warning( "Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", model['execution_state'], model['name'], kernel_id, connections, idle_duration) - yield maybe_future(self.parent.shutdown_kernel(kernel_id)) + await maybe_future(self.parent.shutdown_kernel(kernel_id)) except KeyError: pass # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. except Exception as e: # other errors are not as expected, so we'll make some noise, but continue. From 72d44c58cf548f299786c4365de39bbc8bce5dd9 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 25 Mar 2020 10:01:58 -0700 Subject: [PATCH 07/10] Switch class hierarchy to not use mixin base This commit uses the approach used in jupyter_server #191 first proposed by David Brochart. This reduces code duplication and alleviates redundancy relative to configurable options. Also, the startup message now includes the version information. Co-authored-by: David Brochart --- notebook/notebookapp.py | 11 +- notebook/services/kernels/kernelmanager.py | 683 ++++++------------ .../kernels/tests/test_kernels_api.py | 1 + notebook/services/sessions/sessionmanager.py | 2 +- .../sessions/tests/test_sessions_api.py | 3 +- 5 files changed, 242 insertions(+), 458 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 0c4244926..9b7319e71 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -76,7 +76,7 @@ from notebook import ( from .base.handlers import Template404, RedirectWithParams from .log import log_request -from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager, MappingKernelManagerBase +from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager from .services.config import ConfigManager from .services.contents.manager import ContentsManager from .services.contents.filemanager import FileContentsManager @@ -109,7 +109,7 @@ from notebook._sysinfo import get_sys_info from ._tz import utcnow, utcfromtimestamp from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url -# Check if we can user async kernel management +# Check if we can use async kernel management try: from jupyter_client import AsyncMultiKernelManager async_kernel_mgmt_available = True @@ -584,7 +584,7 @@ class NotebookApp(JupyterApp): flags = flags classes = [ - KernelManager, Session, MappingKernelManager, AsyncMappingKernelManager, KernelSpecManager, + KernelManager, Session, MappingKernelManager, KernelSpecManager, ContentsManager, FileContentsManager, NotebookNotary, GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient, ] @@ -1185,7 +1185,7 @@ class NotebookApp(JupyterApp): kernel_manager_class = Type( default_value=MappingKernelManager, - klass=MappingKernelManagerBase, + klass=MappingKernelManager, config=True, help=_('The kernel manager class to use.') ) @@ -1816,7 +1816,8 @@ class NotebookApp(JupyterApp): info += kernel_msg % n_kernels info += "\n" # Format the info so that the URL fits on a single line in 80 char display - info += _("The Jupyter Notebook is running at:\n%s") % self.display_url + info += _("Jupyter Notebook {version} is running at:\n{url}". + format(version=NotebookApp.version, url=self.display_url)) if self.gateway_config.gateway_enabled: info += _("\nKernels will be managed by the Gateway server running at:\n%s") % self.gateway_config.url return info diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 346e27712..6cb5d20f9 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -1,5 +1,4 @@ """A MultiKernelManager for use in the notebook webserver - - raises HTTPErrors - creates REST API models """ @@ -12,15 +11,15 @@ from datetime import datetime, timedelta from functools import partial import os -from tornado import gen, web +from tornado import web from tornado.concurrent import Future from tornado.ioloop import IOLoop, PeriodicCallback from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer, - Float, Instance, default, validate) -from traitlets.config.configurable import LoggingConfigurable + Float, Instance, default, validate +) from notebook.utils import maybe_future, to_os_path, exists from notebook._tz import utcnow, isoformat @@ -28,9 +27,9 @@ from ipython_genutils.py3compat import getcwd from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL -# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate juptyer_client. -# This will be confirmed at runtime in notebookapp. -# TODO: remove once dependencies are updated. +# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jupyter_client. +# This will be confirmed at runtime in notebookapp. The following block can be removed once the jupyter_client's +# floor has been updated. try: from jupyter_client.multikernelmanager import AsyncMultiKernelManager except ImportError: @@ -39,15 +38,12 @@ except ImportError: pass -class MappingKernelManagerBase(LoggingConfigurable): - """ - This class exists so that class-based traits relative to MappingKernelManager and AsyncMappingKernelManager - can be satisfied since AsyncMappingKernelManager doesn't derive from the former. It is only necessary until - we converge to using only async, but that requires subclasses to be updated. +class MappingKernelManager(MultiKernelManager): + """A KernelManager that handles notebook mapping and HTTP error handling""" - Since we have this class, we'll reduce duplication of code between the disjoint classes by using this - common superclass for configuration properties and static methods and local member variables. - """ + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.IOLoopKernelManager" kernel_argv = List(Unicode()) @@ -55,6 +51,10 @@ class MappingKernelManagerBase(LoggingConfigurable): _kernel_connections = Dict() + _culler_callback = None + + _initialized_culler = False + @default('root_dir') def _default_root_dir(self): try: @@ -96,10 +96,8 @@ class MappingKernelManagerBase(LoggingConfigurable): buffer_offline_messages = Bool(True, config=True, help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory. - When True (default), messages are buffered and replayed on reconnect, avoiding lost messages due to interrupted connectivity. - Disable if long-running kernels will produce too much output while no frontends are connected. """ @@ -107,7 +105,6 @@ class MappingKernelManagerBase(LoggingConfigurable): kernel_info_timeout = Float(60, config=True, help="""Timeout for giving up on a kernel (in seconds). - On starting and restarting kernels, we check whether the kernel is running and responsive by sending kernel_info_requests. This sets the timeout in seconds for how long the kernel can take @@ -117,65 +114,38 @@ class MappingKernelManagerBase(LoggingConfigurable): """ ) + _kernel_buffers = Any() + @default('_kernel_buffers') + def _default_kernel_buffers(self): + return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) + last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.last_kernel_activity = utcnow() + allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. """ ) - # members used to hold composed instances - buffering_manager = None - kernel_culler = None - activity_monitor = None + #------------------------------------------------------------------------- + # Methods for managing kernels and sessions + #------------------------------------------------------------------------- def __init__(self, **kwargs): - super(MappingKernelManagerBase, self).__init__(**kwargs) + self.super = MultiKernelManager + self.super.__init__(self, **kwargs) self.last_kernel_activity = utcnow() - self.buffering_manager = BufferingManager(parent=self) - self.kernel_culler = KernelCuller(parent=self) - self.activity_monitor = ActivityMonitor(parent=self) - def _handle_kernel_died(self, kernel_id): """notice that a kernel died""" self.log.warning("Kernel %s died, removing from map.", kernel_id) self.remove_kernel(kernel_id) - def kernel_model(self, kernel_id): - """Return a JSON-safe dict representing a kernel - - For use in representing kernels in the JSON APIs. - """ - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - - model = { - "id":kernel_id, - "name": kernel.kernel_name, - "last_activity": isoformat(kernel.last_activity), - "execution_state": kernel.execution_state, - "connections": self._kernel_connections[kernel_id], - } - return model - - def list_kernels(self): - """Returns a list of kernel models relative to the running kernels.""" - kernels = [] - kernel_ids = self.list_kernel_ids() - for kernel_id in kernel_ids: - model = self.kernel_model(kernel_id) - kernels.append(model) - return kernels - - # override _check_kernel_id to raise 404 instead of KeyError - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - def cwd_for_path(self, path): """Turn API path into absolute OS path.""" os_path = to_os_path(path, self.root_dir) @@ -185,98 +155,8 @@ class MappingKernelManagerBase(LoggingConfigurable): os_path = os.path.dirname(os_path) return os_path - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - self.buffering_manager.start_buffering(kernel_id, session_key, channels) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - return self.buffering_manager.get_buffer(kernel_id, session_key) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - """ - self.buffering_manager.stop_buffering(kernel_id) - - def notify_connect(self, kernel_id): - """Notice a new connection to a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] += 1 - - def notify_disconnect(self, kernel_id): - """Notice a disconnection from a kernel""" - if kernel_id in self._kernel_connections: - self._kernel_connections[kernel_id] -= 1 - - # monitoring activity: - - def start_watching_activity(self, kernel_id): - """Start watching IOPub messages on a kernel for activity. Remove if no overrides - - - update last_activity on every message - - record execution_state from status messages - """ - self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) - - def initialize_culler(self): - """Initial culler if not already. Remove if no overrides - """ - if self.kernel_culler is None: - self.kernel_culler = KernelCuller(parent=self) - - def cull_kernels(self): - # Defer to KernelCuller. Remove if no overrides - self.kernel_culler.cull_kernels() - - def cull_kernel_if_idle(self, kernel_id): - # Defer to KernelCuller. Remove if no overrides - self.kernel_culler.cull_kernel_if_idle(kernel_id) - - -class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling""" - - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.IOLoopKernelManager" - - def __init__(self, **kwargs): - super(MappingKernelManager, self).__init__(**kwargs) - - # ------------------------------------------------------------------------- - # Methods for managing kernels and sessions - # ------------------------------------------------------------------------- - async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. - Parameters ---------- kernel_id : uuid @@ -293,9 +173,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await maybe_future( - super(MappingKernelManager, self).start_kernel(**kwargs) - ) + kernel_id = await maybe_future(self.super.start_kernel(self, **kwargs)) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) self.log.info("Kernel started: %s" % kernel_id) @@ -316,127 +194,99 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager): self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - return kernel_id + # Initialize culling if not already + if not self._initialized_culler: + self.initialize_culler() - def shutdown_kernel(self, kernel_id, now=False): - """Shutdown a kernel by kernel_id""" - self._check_kernel_id(kernel_id) - kernel = self._kernels[kernel_id] - if kernel._activity_stream: - kernel._activity_stream.close() - kernel._activity_stream = None - self.stop_buffering(kernel_id) - self._kernel_connections.pop(kernel_id, None) + return kernel_id - # Decrease the metric of number of kernels - # running for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels( - type=self._kernels[kernel_id].kernel_name - ).dec() + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + Parameters + ---------- + kernel_id : str + The id of the kernel to start buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ - return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) + if not self.buffer_offline_messages: + for channel, stream in channels.items(): + stream.close() + return - async def restart_kernel(self, kernel_id): - """Restart a kernel by kernel_id""" + self.log.info("Starting buffering for %s", session_key) self._check_kernel_id(kernel_id) - await maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id)) - kernel = self.get_kernel(kernel_id) - # return a Future that will resolve when the kernel has successfully restarted - channel = kernel.connect_shell() - future = Future() - - def finish(): - """Common cleanup when restart finishes/fails for any reason.""" - if not channel.closed(): - channel.close() - loop.remove_timeout(timeout) - kernel.remove_restart_callback(on_restart_failed, 'dead') - - def on_reply(msg): - self.log.debug("Kernel info reply received: %s", kernel_id) - finish() - if not future.done(): - future.set_result(msg) - - def on_timeout(): - self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(gen.TimeoutError("Timeout waiting for restart")) - - def on_restart_failed(): - self.log.warning("Restarting kernel failed: %s", kernel_id) - finish() - if not future.done(): - future.set_exception(RuntimeError("Restart failed")) - - kernel.add_restart_callback(on_restart_failed, 'dead') - kernel.session.send(channel, "kernel_info_request") - channel.on_recv(on_reply) - loop = IOLoop.current() - timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) - # wait for restart to complete - await future - - -class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManager): - """A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout""" - - @default('kernel_manager_class') - def _default_kernel_manager_class(self): - return "jupyter_client.ioloop.AsyncIOLoopKernelManager" - - def __init__(self, **kwargs): - super(AsyncMappingKernelManager, self).__init__(**kwargs) + # clear previous buffering state + self.stop_buffering(kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + # record the session key because only one session can buffer + buffer_info['session_key'] = session_key + # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple + buffer_info['buffer'] = [] + buffer_info['channels'] = channels - # ------------------------------------------------------------------------- - # Methods for managing kernels and sessions - # ------------------------------------------------------------------------- + # forward any future messages to the internal buffer + def buffer_msg(channel, msg_parts): + self.log.debug("Buffering msg on %s:%s", kernel_id, channel) + buffer_info['buffer'].append((channel, msg_parts)) - async def start_kernel(self, kernel_id=None, path=None, **kwargs): - """Start a kernel for a session and return its kernel_id. + for channel, stream in channels.items(): + stream.on_recv(partial(buffer_msg, channel)) + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel Parameters ---------- - kernel_id : uuid - The uuid to associate the new kernel with. If this - is not None, this kernel will be persistent whenever it is - requested. - path : API path - The API path (unicode, '/' delimited) for the cwd. - Will be transformed to an OS path relative to root_dir. - kernel_name : str - The name identifying which kernel spec to launch. This is ignored if - an existing kernel is returned, but it may be checked in the future. + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. """ - if kernel_id is None: - if path is not None: - kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await super(AsyncMappingKernelManager, self).start_kernel(**kwargs) + self.log.debug("Getting buffer for %s", kernel_id) + if kernel_id not in self._kernel_buffers: + return - self._kernel_connections[kernel_id] = 0 - self.start_watching_activity(kernel_id) - self.log.info("Kernel started (async): %s" % kernel_id) - self.log.debug("Kernel args: %r" % kwargs) - # register callback for failed auto-restart - self.add_restart_callback(kernel_id, - lambda: self._handle_kernel_died(kernel_id), - 'dead', - ) + buffer_info = self._kernel_buffers[kernel_id] + if buffer_info['session_key'] == session_key: + # remove buffer + self._kernel_buffers.pop(kernel_id) + # only return buffer_info if it's a match + return buffer_info + else: + self.stop_buffering(kernel_id) - # Increase the metric of number of kernels running - # for the relevant kernel type by 1 - KERNEL_CURRENTLY_RUNNING_TOTAL.labels( - type=self._kernels[kernel_id].kernel_name - ).inc() + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self.log.debug("Clearing buffer for %s", kernel_id) + self._check_kernel_id(kernel_id) - else: - self._check_kernel_id(kernel_id) - self.log.info("Using existing kernel: %s" % kernel_id) + if kernel_id not in self._kernel_buffers: + return + buffer_info = self._kernel_buffers.pop(kernel_id) + # close buffering streams + for stream in buffer_info['channels'].values(): + if not stream.closed(): + stream.on_recv(None) + stream.close() - return kernel_id + msg_buffer = buffer_info['buffer'] + if msg_buffer: + self.log.info("Discarding %s buffered messages for %s", + len(msg_buffer), buffer_info['session_key']) - async def shutdown_kernel(self, kernel_id, now=False, restart=False): + def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) kernel = self._kernels[kernel_id] @@ -445,7 +295,6 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage kernel._activity_stream = None self.stop_buffering(kernel_id) self._kernel_connections.pop(kernel_id, None) - self.last_kernel_activity = utcnow() # Decrease the metric of number of kernels # running for the relevant kernel type by 1 @@ -453,12 +302,12 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage type=self._kernels[kernel_id].kernel_name ).dec() - await super(AsyncMappingKernelManager, self).shutdown_kernel(kernel_id, now=now, restart=restart) + return self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - await super(AsyncMappingKernelManager, self).restart_kernel(kernel_id, now=now) + await maybe_future(self.super.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -481,7 +330,7 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) finish() if not future.done(): - future.set_exception(gen.TimeoutError("Timeout waiting for restart")) + future.set_exception(TimeoutError("Timeout waiting for restart")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) @@ -496,22 +345,55 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) return future + def notify_connect(self, kernel_id): + """Notice a new connection to a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] += 1 -class ActivityMonitor(LoggingConfigurable): - """Establishes activity recorder for each active kernel""" + def notify_disconnect(self, kernel_id): + """Notice a disconnection from a kernel""" + if kernel_id in self._kernel_connections: + self._kernel_connections[kernel_id] -= 1 - def __init__(self, **kwargs): - super(ActivityMonitor, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "ActivityMonitor requires an instance of MappingKernelManagerBase!") + def kernel_model(self, kernel_id): + """Return a JSON-safe dict representing a kernel + For use in representing kernels in the JSON APIs. + """ + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] - def start_watching_activity(self, kernel_id, kernel): - """Start watching IOPub messages on a kernel for activity. + model = { + "id": kernel_id, + "name": kernel.kernel_name, + "last_activity": isoformat(kernel.last_activity), + "execution_state": kernel.execution_state, + "connections": self._kernel_connections[kernel_id], + } + return model + + def list_kernels(self): + """Returns a list of kernel_id's of kernels running.""" + kernels = [] + kernel_ids = self.super.list_kernel_ids(self) + for kernel_id in kernel_ids: + model = self.kernel_model(kernel_id) + kernels.append(model) + return kernels + + # override _check_kernel_id to raise 404 instead of KeyError + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) + # monitoring activity: + + def start_watching_activity(self, kernel_id): + """Start watching IOPub messages on a kernel for activity. - update last_activity on every message - record execution_state from status messages """ + kernel = self._kernels[kernel_id] # add busy/activity markers: kernel.execution_state = 'starting' kernel.last_activity = utcnow() @@ -523,7 +405,7 @@ class ActivityMonitor(LoggingConfigurable): def record_activity(msg_list): """Record an IOPub message arriving from a kernel""" - self.parent.last_kernel_activity = kernel.last_activity = utcnow() + self.last_kernel_activity = kernel.last_activity = utcnow() idents, fed_msg_list = session.feed_identities(msg_list) msg = session.deserialize(fed_msg_list) @@ -537,191 +419,90 @@ class ActivityMonitor(LoggingConfigurable): kernel._activity_stream.on_recv(record_activity) - -class BufferingManager(LoggingConfigurable): - """Manages buffering across the active kernels""" - _kernel_buffers = Any() - - @default('_kernel_buffers') - def _default_kernel_buffers(self): - return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) - - def __init__(self, **kwargs): - super(BufferingManager, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "BufferingManager requires an instance of MappingKernelManagerBase!") - - def _check_kernel_id(self, kernel_id): - """Check a that a kernel_id exists and raise 404 if not.""" - if kernel_id not in self.parent: - raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) - - def start_buffering(self, kernel_id, session_key, channels): - """Start buffering messages for a kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - channels: dict({'channel': ZMQStream}) - The zmq channels whose messages should be buffered. - """ - - if not self.parent.buffer_offline_messages: - for channel, stream in channels.items(): - stream.close() - return - - self._check_kernel_id(kernel_id) - self.log.info("Starting buffering for %s", session_key) - # clear previous buffering state - self.parent.stop_buffering(kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - # record the session key because only one session can buffer - buffer_info['session_key'] = session_key - # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple - buffer_info['buffer'] = [] - buffer_info['channels'] = channels - - # forward any future messages to the internal buffer - def buffer_msg(channel, msg_parts): - self.log.debug("Buffering msg on %s:%s", kernel_id, channel) - buffer_info['buffer'].append((channel, msg_parts)) - - for channel, stream in channels.items(): - stream.on_recv(partial(buffer_msg, channel)) - - def get_buffer(self, kernel_id, session_key): - """Get the buffer for a given kernel - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. - session_key: str, optional - The session_key, if any, that should get the buffer. - If the session_key matches the current buffered session_key, - the buffer will be returned. - """ - if kernel_id not in self._kernel_buffers: - return - - self.log.debug("Getting buffer for %s", kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - if buffer_info['session_key'] == session_key: - # remove buffer - self._kernel_buffers.pop(kernel_id) - # only return buffer_info if it's a match - return buffer_info - else: - self.parent.stop_buffering(kernel_id) - - def stop_buffering(self, kernel_id): - """Stop buffering kernel messages - - Parameters - ---------- - kernel_id : str - The id of the kernel to stop buffering. + def initialize_culler(self): + """Start idle culler if 'cull_idle_timeout' is greater than zero. + Regardless of that value, set flag that we've been here. """ - self._check_kernel_id(kernel_id) - - if kernel_id not in self._kernel_buffers: - return - self.log.debug("Clearing buffer for %s", kernel_id) - buffer_info = self._kernel_buffers.pop(kernel_id) - # close buffering streams - for stream in buffer_info['channels'].values(): - if not stream.closed(): - stream.on_recv(None) - stream.close() - - msg_buffer = buffer_info['buffer'] - if msg_buffer: - self.log.info("Discarding %s buffered messages for %s", - len(msg_buffer), buffer_info['session_key']) + if not self._initialized_culler and self.cull_idle_timeout > 0: + if self._culler_callback is None: + loop = IOLoop.current() + if self.cull_interval <= 0: # handle case where user set invalid value + self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", + self.cull_interval, self.cull_interval_default) + self.cull_interval = self.cull_interval_default + self._culler_callback = PeriodicCallback( + self.cull_kernels, 1000*self.cull_interval) + self.log.info("Culling kernels with idle durations > %s seconds at %s second intervals ...", + self.cull_idle_timeout, self.cull_interval) + if self.cull_busy: + self.log.info("Culling kernels even if busy") + if self.cull_connected: + self.log.info("Culling kernels even with connected clients") + self._culler_callback.start() + + self._initialized_culler = True + async def cull_kernels(self): + self.log.debug("Polling every %s seconds for kernels idle > %s seconds...", + self.cull_interval, self.cull_idle_timeout) + """Create a separate list of kernels to avoid conflicting updates while iterating""" + for kernel_id in list(self._kernels): + try: + await self.cull_kernel_if_idle(kernel_id) + except Exception as e: + self.log.exception("The following exception was encountered while checking the " + "idle duration of kernel {}: {}".format(kernel_id, e)) -class KernelCuller(LoggingConfigurable): - """Handles culling responsibilities for active kernels""" + async def cull_kernel_if_idle(self, kernel_id): + try: + kernel = self._kernels[kernel_id] + except KeyError: + return # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. + + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, kernel.kernel_name, kernel.last_activity) + if kernel.last_activity is not None: + dt_now = utcnow() + dt_idle = dt_now - kernel.last_activity + # Compute idle properties + is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout) + is_idle_execute = self.cull_busy or (kernel.execution_state != 'busy') + connections = self._kernel_connections.get(kernel_id, 0) + is_idle_connected = self.cull_connected or not connections + # Cull the kernel if all three criteria are met + if (is_idle_time and is_idle_execute and is_idle_connected): + idle_duration = int(dt_idle.total_seconds()) + self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", + kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration) + await maybe_future(self.shutdown_kernel(kernel_id)) + + +# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, overriding +# only what is different. +class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.AsyncIOLoopKernelManager" def __init__(self, **kwargs): - super(KernelCuller, self).__init__(**kwargs) - if not isinstance(self.parent, MappingKernelManagerBase): - raise RuntimeError( - "KernelCuller requires an instance of MappingKernelManagerBase!") - self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging - - # Start idle culler if 'cull_idle_timeout' is greater than zero. - # Regardless of that value, set flag that we've been here. - if self.parent.cull_idle_timeout > 0: - if self.parent.cull_interval <= 0: # handle case where user set invalid value - self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", - self.parent.cull_interval, self.parent.cull_interval_default) - self.parent.cull_interval = self.parent.cull_interval_default - self._culler_callback = PeriodicCallback( - self.parent.cull_kernels, 1000*self.parent.cull_interval) - self._culler_callback.start() - self._log_info() - - def _log_info(self): - """Builds a single informational message relative to the culling configuration (logged at startup).""" - log_msg = list() - log_msg.append("Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals". - format(cull_state=self.cull_state, timeout=self.parent.cull_idle_timeout, - interval=self.parent.cull_interval)) - if self.parent.cull_busy or self.parent.cull_connected: - log_msg.append(" - including") - if self.parent.cull_busy: - log_msg.append(" busy") - if self.parent.cull_connected: - log_msg.append(" and") - if self.parent.cull_connected: - log_msg.append(" connected") - log_msg.append(" kernels") - log_msg.append(".") - self.log.info(''.join(log_msg)) + self.super = AsyncMultiKernelManager + self.super.__init__(self, **kwargs) + self.last_kernel_activity = utcnow() - async def cull_kernels(self): - self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", - self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) - # Get a separate list of kernels to avoid conflicting updates while iterating - for kernel_id in self.parent.list_kernel_ids(): - await self.parent.cull_kernel_if_idle(kernel_id) + async def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + if kernel._activity_stream: + kernel._activity_stream.close() + kernel._activity_stream = None + self.stop_buffering(kernel_id) + self._kernel_connections.pop(kernel_id, None) - async def cull_kernel_if_idle(self, kernel_id): + # Decrease the metric of number of kernels + # running for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).dec() - # Get the kernel model and use that to determine cullability... - try: - model = self.parent.kernel_model(kernel_id) - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", - kernel_id, model['name'], model['last_activity']) - - if model['last_activity'] is not None: - # Convert dates to compatible formats. Since both are UTC, strip the TZ info from - # the current time and convert the last_activity string to a datetime. - dt_now = utcnow().replace(tzinfo=None) - dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") - dt_idle = dt_now - dt_last_activity - # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) - is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') - connections = model.get('connections', 0) - is_idle_connected = self.parent.cull_connected or connections == 0 - # Cull the kernel if all three criteria are met - if is_idle_time and is_idle_execute and is_idle_connected: - idle_duration = int(dt_idle.total_seconds()) - self.log.warning( - "Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - model['execution_state'], model['name'], kernel_id, connections, idle_duration) - await maybe_future(self.parent.shutdown_kernel(kernel_id)) - except KeyError: - pass # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. - except Exception as e: # other errors are not as expected, so we'll make some noise, but continue. - self.log.exception("The following exception was encountered while checking the idle " - "duration of kernel %s: %s", kernel_id, e) + return await self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 0d4bc94ac..ff8111c1b 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -227,6 +227,7 @@ class KernelFilterTest(NotebookTestBase): } } }) + # Sanity check verifying that the configurable was properly set. def test_config(self): self.assertEqual(self.notebook.kernel_manager.allowed_message_types, ['kernel_info_request']) diff --git a/notebook/services/sessions/sessionmanager.py b/notebook/services/sessions/sessionmanager.py index 5ec35edd0..63e184482 100644 --- a/notebook/services/sessions/sessionmanager.py +++ b/notebook/services/sessions/sessionmanager.py @@ -22,7 +22,7 @@ from notebook.utils import maybe_future class SessionManager(LoggingConfigurable): - kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManagerBase') + kernel_manager = Instance('notebook.services.kernels.kernelmanager.MappingKernelManager') contents_manager = Instance('notebook.services.contents.manager.ContentsManager') # Session database initialized below diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 9323b5966..3a02f2535 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -273,11 +273,12 @@ class AsyncSessionAPITest(SessionAPITest): def get_argv(cls): argv = super(AsyncSessionAPITest, cls).get_argv() - # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # Before we extend the argv with the class, ensure that appropriate jupyter_client is available. # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. if async_testing_enabled: argv.extend(['--NotebookApp.kernel_manager_class=' 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv def setUp(self): From 99b0afd16ec724e46f9983bcabc3df986f4de261 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 27 Mar 2020 10:53:32 -0700 Subject: [PATCH 08/10] Apply changes per review Add comments and rename self.super to self.pinned_superclass to clarify intent. Add run_sync() util method to clean up shutdown_all() invocation. --- notebook/notebookapp.py | 8 ++--- notebook/services/kernels/kernelmanager.py | 26 +++++++------- notebook/utils.py | 40 ++++++++++++++++++++++ 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 9b7319e71..a8535c35d 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -107,7 +107,7 @@ from jupyter_core.paths import jupyter_runtime_dir, jupyter_path from notebook._sysinfo import get_sys_info from ._tz import utcnow, utcfromtimestamp -from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url +from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url, run_sync # Check if we can use async kernel management try: @@ -1801,11 +1801,7 @@ class NotebookApp(JupyterApp): n_kernels = len(self.kernel_manager.list_kernel_ids()) kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels) self.log.info(kernel_msg % n_kernels) - # If we're using async kernel management, we need to invoke the async method via the event loop. - if isinstance(self.kernel_manager, AsyncMappingKernelManager): - asyncio.get_event_loop().run_until_complete(self.kernel_manager.shutdown_all()) - else: - self.kernel_manager.shutdown_all() + run_sync(self.kernel_manager.shutdown_all()) def notebook_info(self, kernel_count=True): "Return the current working directory and the server url information" diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 6cb5d20f9..c9c6d18a4 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -122,10 +122,6 @@ class MappingKernelManager(MultiKernelManager): last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.last_kernel_activity = utcnow() - allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. @@ -137,8 +133,11 @@ class MappingKernelManager(MultiKernelManager): #------------------------------------------------------------------------- def __init__(self, **kwargs): - self.super = MultiKernelManager - self.super.__init__(self, **kwargs) + # Pin the superclass to better control the MRO. This is needed by + # AsyncMappingKernelManager so that it can give priority to methods + # on AsyncMultiKernelManager over this superclass. + self.pinned_superclass = MultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) self.last_kernel_activity = utcnow() def _handle_kernel_died(self, kernel_id): @@ -173,7 +172,7 @@ class MappingKernelManager(MultiKernelManager): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await maybe_future(self.super.start_kernel(self, **kwargs)) + kernel_id = await maybe_future(self.pinned_superclass.start_kernel(self, **kwargs)) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) self.log.info("Kernel started: %s" % kernel_id) @@ -302,12 +301,12 @@ class MappingKernelManager(MultiKernelManager): type=self._kernels[kernel_id].kernel_name ).dec() - return self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) + return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - await maybe_future(self.super.restart_kernel(self, kernel_id, now=now)) + await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -374,7 +373,7 @@ class MappingKernelManager(MultiKernelManager): def list_kernels(self): """Returns a list of kernel_id's of kernels running.""" kernels = [] - kernel_ids = self.super.list_kernel_ids(self) + kernel_ids = self.pinned_superclass.list_kernel_ids(self) for kernel_id in kernel_ids: model = self.kernel_model(kernel_id) kernels.append(model) @@ -485,8 +484,9 @@ class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): return "jupyter_client.ioloop.AsyncIOLoopKernelManager" def __init__(self, **kwargs): - self.super = AsyncMultiKernelManager - self.super.__init__(self, **kwargs) + # Pin the superclass to better control the MRO. + self.pinned_superclass = AsyncMultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) self.last_kernel_activity = utcnow() async def shutdown_kernel(self, kernel_id, now=False, restart=False): @@ -505,4 +505,4 @@ class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): type=self._kernels[kernel_id].kernel_name ).dec() - return await self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) + return await self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) diff --git a/notebook/utils.py b/notebook/utils.py index 69f3586f1..9ec10773f 100644 --- a/notebook/utils.py +++ b/notebook/utils.py @@ -327,3 +327,43 @@ def maybe_future(obj): f.set_result(obj) return f + +def run_sync(maybe_async): + """If async, runs maybe_async and blocks until it has executed, + possibly creating an event loop. + If not async, just returns maybe_async as it is the result of something + that has already executed. + Parameters + ---------- + maybe_async : async or non-async object + The object to be executed, if it is async. + Returns + ------- + result : + Whatever the async object returns, or the object itself. + """ + if not inspect.isawaitable(maybe_async): + # that was not something async, just return it + return maybe_async + # it is async, we need to run it in an event loop + + def wrapped(): + create_new_event_loop = False + try: + loop = asyncio.get_event_loop() + except RuntimeError: + create_new_event_loop = True + else: + if loop.is_closed(): + create_new_event_loop = True + if create_new_event_loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(maybe_async) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + # just return a Future, hoping that it will be awaited + result = asyncio.ensure_future(maybe_async) + return result + return wrapped() From 53d4d088132e86f7f434c71ace28dd4a10e47621 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Wed, 1 Apr 2020 16:39:29 -0700 Subject: [PATCH 09/10] Fix detection of unsupported config, prevent async on 3.5 --- notebook/notebookapp.py | 12 +++++++----- notebook/services/kernels/kernelmanager.py | 3 ++- .../services/kernels/tests/test_kernels_api.py | 15 +++++++++------ .../services/sessions/tests/test_sessions_api.py | 16 +++++++++------- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index a8535c35d..bbba3fc1a 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -1390,14 +1390,16 @@ class NotebookApp(JupyterApp): connection_dir=self.runtime_dir, kernel_spec_manager=self.kernel_spec_manager, ) - # Ensure the appropriate jupyter_client is in place. + # Ensure the appropriate version of Python and jupyter_client is available. if isinstance(self.kernel_manager, AsyncMappingKernelManager): + if sys.version_info < (3, 6): + raise ValueError("You are using `AsyncMappingKernelManager` in Python 3.5 (or lower) " + "which is not supported. Please upgrade Python to 3.6+ or change kernel managers.") if not async_kernel_mgmt_available: raise ValueError("You are using `AsyncMappingKernelManager` without an appropriate " - "jupyter_client installed! Upgrade jupyter_client or change kernel managers.") - else: - self.log.info("Asynchronous kernel management has been configured to use '{}'.". - format(self.kernel_manager.__class__.__name__)) + "jupyter_client installed! Please upgrade jupyter_client or change kernel managers.") + self.log.info("Asynchronous kernel management has been configured to use '{}'.". + format(self.kernel_manager.__class__.__name__)) self.contents_manager = self.contents_manager_class( parent=self, diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index c9c6d18a4..7063ca78f 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -35,7 +35,8 @@ try: except ImportError: class AsyncMultiKernelManager(object): """Empty class to satisfy unused reference by AsyncMappingKernelManager.""" - pass + def __init__(self, **kwargs): + pass class MappingKernelManager(MultiKernelManager): diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index ff8111c1b..11468a7ce 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -1,6 +1,7 @@ """Test the kernels service API.""" import json +import sys import time from traitlets.config import Config @@ -198,6 +199,14 @@ class KernelAPITest(NotebookTestBase): class AsyncKernelAPITest(KernelAPITest): """Test the kernels web service API using the AsyncMappingKernelManager""" + @classmethod + def setup_class(cls): + if not async_testing_enabled: + raise SkipTest("AsyncKernelAPITest tests skipped due to down-level jupyter_client!") + if sys.version_info < (3, 6): + raise SkipTest("AsyncKernelAPITest tests skipped due to Python < 3.6!") + super(AsyncKernelAPITest, cls).setup_class() + @classmethod def get_argv(cls): argv = super(AsyncKernelAPITest, cls).get_argv() @@ -209,12 +218,6 @@ class AsyncKernelAPITest(KernelAPITest): 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) return argv - def setUp(self): - if not async_testing_enabled: - raise SkipTest("AsyncKernelAPITest.{test_method} skipped due to down-level jupyter_client!". - format(test_method=self._testMethodName)) - super(AsyncKernelAPITest, self).setUp() - class KernelFilterTest(NotebookTestBase): diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 3a02f2535..096796da9 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -5,8 +5,8 @@ from functools import partial import io import os import json -import requests import shutil +import sys import time from unittest import SkipTest @@ -269,6 +269,14 @@ class SessionAPITest(NotebookTestBase): class AsyncSessionAPITest(SessionAPITest): """Test the sessions web service API using the AsyncMappingKernelManager""" + @classmethod + def setup_class(cls): + if not async_testing_enabled: + raise SkipTest("AsyncSessionAPITest tests skipped due to down-level jupyter_client!") + if sys.version_info < (3, 6): + raise SkipTest("AsyncSessionAPITest tests skipped due to Python < 3.6!") + super(AsyncSessionAPITest, cls).setup_class() + @classmethod def get_argv(cls): argv = super(AsyncSessionAPITest, cls).get_argv() @@ -280,9 +288,3 @@ class AsyncSessionAPITest(SessionAPITest): 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) return argv - - def setUp(self): - if not async_testing_enabled: - raise SkipTest("AsyncSessionAPITest.{test_method} skipped due to down-level jupyter_client!". - format(test_method=self._testMethodName)) - super(AsyncSessionAPITest, self).setUp() From 1fc9bc0b7276513e65973dafb636a896cd8adb0f Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Thu, 2 Apr 2020 10:52:50 -0700 Subject: [PATCH 10/10] Add comments indicating when dependency check can be removed --- notebook/notebookapp.py | 4 ++-- notebook/services/kernels/tests/test_kernels_api.py | 4 ++-- notebook/services/sessions/tests/test_sessions_api.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index bbba3fc1a..969654311 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -1392,10 +1392,10 @@ class NotebookApp(JupyterApp): ) # Ensure the appropriate version of Python and jupyter_client is available. if isinstance(self.kernel_manager, AsyncMappingKernelManager): - if sys.version_info < (3, 6): + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. raise ValueError("You are using `AsyncMappingKernelManager` in Python 3.5 (or lower) " "which is not supported. Please upgrade Python to 3.6+ or change kernel managers.") - if not async_kernel_mgmt_available: + if not async_kernel_mgmt_available: # Can be removed once jupyter_client >= 6.1 is required. raise ValueError("You are using `AsyncMappingKernelManager` without an appropriate " "jupyter_client installed! Please upgrade jupyter_client or change kernel managers.") self.log.info("Asynchronous kernel management has been configured to use '{}'.". diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 11468a7ce..70ae0e878 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -201,9 +201,9 @@ class AsyncKernelAPITest(KernelAPITest): @classmethod def setup_class(cls): - if not async_testing_enabled: + if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required. raise SkipTest("AsyncKernelAPITest tests skipped due to down-level jupyter_client!") - if sys.version_info < (3, 6): + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. raise SkipTest("AsyncKernelAPITest tests skipped due to Python < 3.6!") super(AsyncKernelAPITest, cls).setup_class() diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 096796da9..0e30fc3ea 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -271,9 +271,9 @@ class AsyncSessionAPITest(SessionAPITest): @classmethod def setup_class(cls): - if not async_testing_enabled: + if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required. raise SkipTest("AsyncSessionAPITest tests skipped due to down-level jupyter_client!") - if sys.version_info < (3, 6): + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. raise SkipTest("AsyncSessionAPITest tests skipped due to Python < 3.6!") super(AsyncSessionAPITest, cls).setup_class()