diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index a657ce82d..969654311 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -7,6 +7,7 @@ from __future__ import absolute_import, print_function import notebook +import asyncio import binascii import datetime import errno @@ -75,7 +76,7 @@ from notebook import ( from .base.handlers import Template404, RedirectWithParams from .log import log_request -from .services.kernels.kernelmanager import MappingKernelManager +from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager from .services.config import ConfigManager from .services.contents.manager import ContentsManager from .services.contents.filemanager import FileContentsManager @@ -106,7 +107,14 @@ from jupyter_core.paths import jupyter_runtime_dir, jupyter_path from notebook._sysinfo import get_sys_info from ._tz import utcnow, utcfromtimestamp -from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url +from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url, run_sync + +# Check if we can use async kernel management +try: + from jupyter_client import AsyncMultiKernelManager + async_kernel_mgmt_available = True +except ImportError: + async_kernel_mgmt_available = False #----------------------------------------------------------------------------- # Module globals @@ -1177,6 +1185,7 @@ class NotebookApp(JupyterApp): kernel_manager_class = Type( default_value=MappingKernelManager, + klass=MappingKernelManager, config=True, help=_('The kernel manager class to use.') ) @@ -1374,12 +1383,24 @@ class NotebookApp(JupyterApp): self.kernel_spec_manager = self.kernel_spec_manager_class( parent=self, ) + self.kernel_manager = self.kernel_manager_class( parent=self, log=self.log, connection_dir=self.runtime_dir, kernel_spec_manager=self.kernel_spec_manager, ) + # Ensure the appropriate version of Python and jupyter_client is available. + if isinstance(self.kernel_manager, AsyncMappingKernelManager): + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. + raise ValueError("You are using `AsyncMappingKernelManager` in Python 3.5 (or lower) " + "which is not supported. Please upgrade Python to 3.6+ or change kernel managers.") + if not async_kernel_mgmt_available: # Can be removed once jupyter_client >= 6.1 is required. + raise ValueError("You are using `AsyncMappingKernelManager` without an appropriate " + "jupyter_client installed! Please upgrade jupyter_client or change kernel managers.") + self.log.info("Asynchronous kernel management has been configured to use '{}'.". + format(self.kernel_manager.__class__.__name__)) + self.contents_manager = self.contents_manager_class( parent=self, log=self.log, @@ -1782,7 +1803,7 @@ class NotebookApp(JupyterApp): n_kernels = len(self.kernel_manager.list_kernel_ids()) kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels) self.log.info(kernel_msg % n_kernels) - self.kernel_manager.shutdown_all() + run_sync(self.kernel_manager.shutdown_all()) def notebook_info(self, kernel_count=True): "Return the current working directory and the server url information" @@ -1793,7 +1814,8 @@ class NotebookApp(JupyterApp): info += kernel_msg % n_kernels info += "\n" # Format the info so that the URL fits on a single line in 80 char display - info += _("The Jupyter Notebook is running at:\n%s") % self.display_url + info += _("Jupyter Notebook {version} is running at:\n{url}". + format(version=NotebookApp.version, url=self.display_url)) if self.gateway_config.gateway_enabled: info += _("\nKernels will be managed by the Gateway server running at:\n%s") % self.gateway_config.url return info @@ -1897,8 +1919,8 @@ class NotebookApp(JupyterApp): assembled_url = urljoin('file:', pathname2url(open_file)) else: assembled_url = url_path_join(self.connection_url, uri) - - b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new) + + b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new) threading.Thread(target=b).start() def start(self): diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 8484dccdf..bca99ce1b 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -75,7 +75,7 @@ class KernelActionHandler(APIHandler): def post(self, kernel_id, action): km = self.kernel_manager if action == 'interrupt': - km.interrupt_kernel(kernel_id) + yield maybe_future(km.interrupt_kernel(kernel_id)) self.set_status(204) if action == 'restart': diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index a602b9ed6..7063ca78f 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -1,5 +1,4 @@ """A MultiKernelManager for use in the notebook webserver - - raises HTTPErrors - creates REST API models """ @@ -12,7 +11,7 @@ from datetime import datetime, timedelta from functools import partial import os -from tornado import gen, web +from tornado import web from tornado.concurrent import Future from tornado.ioloop import IOLoop, PeriodicCallback @@ -28,6 +27,17 @@ from ipython_genutils.py3compat import getcwd from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL +# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jupyter_client. +# This will be confirmed at runtime in notebookapp. The following block can be removed once the jupyter_client's +# floor has been updated. +try: + from jupyter_client.multikernelmanager import AsyncMultiKernelManager +except ImportError: + class AsyncMultiKernelManager(object): + """Empty class to satisfy unused reference by AsyncMappingKernelManager.""" + def __init__(self, **kwargs): + pass + class MappingKernelManager(MultiKernelManager): """A KernelManager that handles notebook mapping and HTTP error handling""" @@ -70,7 +80,7 @@ class MappingKernelManager(MultiKernelManager): for users with poor network connections.""" ) - cull_interval_default = 300 # 5 minutes + cull_interval_default = 300 # 5 minutes cull_interval = Integer(cull_interval_default, config=True, help="""The interval (in seconds) on which to check for idle kernels exceeding the cull timeout value.""" ) @@ -87,10 +97,8 @@ class MappingKernelManager(MultiKernelManager): buffer_offline_messages = Bool(True, config=True, help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory. - When True (default), messages are buffered and replayed on reconnect, avoiding lost messages due to interrupted connectivity. - Disable if long-running kernels will produce too much output while no frontends are connected. """ @@ -98,7 +106,6 @@ class MappingKernelManager(MultiKernelManager): kernel_info_timeout = Float(60, config=True, help="""Timeout for giving up on a kernel (in seconds). - On starting and restarting kernels, we check whether the kernel is running and responsive by sending kernel_info_requests. This sets the timeout in seconds for how long the kernel can take @@ -116,10 +123,6 @@ class MappingKernelManager(MultiKernelManager): last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") - def __init__(self, **kwargs): - super(MappingKernelManager, self).__init__(**kwargs) - self.last_kernel_activity = utcnow() - allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. @@ -130,6 +133,14 @@ class MappingKernelManager(MultiKernelManager): # Methods for managing kernels and sessions #------------------------------------------------------------------------- + def __init__(self, **kwargs): + # Pin the superclass to better control the MRO. This is needed by + # AsyncMappingKernelManager so that it can give priority to methods + # on AsyncMultiKernelManager over this superclass. + self.pinned_superclass = MultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) + self.last_kernel_activity = utcnow() + def _handle_kernel_died(self, kernel_id): """notice that a kernel died""" self.log.warning("Kernel %s died, removing from map.", kernel_id) @@ -144,10 +155,8 @@ class MappingKernelManager(MultiKernelManager): os_path = os.path.dirname(os_path) return os_path - @gen.coroutine - def start_kernel(self, kernel_id=None, path=None, **kwargs): + async def start_kernel(self, kernel_id=None, path=None, **kwargs): """Start a kernel for a session and return its kernel_id. - Parameters ---------- kernel_id : uuid @@ -164,9 +173,7 @@ class MappingKernelManager(MultiKernelManager): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = yield maybe_future( - super(MappingKernelManager, self).start_kernel(**kwargs) - ) + kernel_id = await maybe_future(self.pinned_superclass.start_kernel(self, **kwargs)) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) self.log.info("Kernel started: %s" % kernel_id) @@ -191,12 +198,10 @@ class MappingKernelManager(MultiKernelManager): if not self._initialized_culler: self.initialize_culler() - # py2-compat - raise gen.Return(kernel_id) + return kernel_id def start_buffering(self, kernel_id, session_key, channels): """Start buffering messages for a kernel - Parameters ---------- kernel_id : str @@ -235,7 +240,6 @@ class MappingKernelManager(MultiKernelManager): def get_buffer(self, kernel_id, session_key): """Get the buffer for a given kernel - Parameters ---------- kernel_id : str @@ -260,7 +264,6 @@ class MappingKernelManager(MultiKernelManager): def stop_buffering(self, kernel_id): """Stop buffering kernel messages - Parameters ---------- kernel_id : str @@ -283,7 +286,7 @@ class MappingKernelManager(MultiKernelManager): self.log.info("Discarding %s buffered messages for %s", len(msg_buffer), buffer_info['session_key']) - def shutdown_kernel(self, kernel_id, now=False): + def shutdown_kernel(self, kernel_id, now=False, restart=False): """Shutdown a kernel by kernel_id""" self._check_kernel_id(kernel_id) kernel = self._kernels[kernel_id] @@ -299,13 +302,12 @@ class MappingKernelManager(MultiKernelManager): type=self._kernels[kernel_id].kernel_name ).dec() - return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) + return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) - @gen.coroutine - def restart_kernel(self, kernel_id): + async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - yield maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id)) + await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -328,7 +330,7 @@ class MappingKernelManager(MultiKernelManager): self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id) finish() if not future.done(): - future.set_exception(gen.TimeoutError("Timeout waiting for restart")) + future.set_exception(TimeoutError("Timeout waiting for restart")) def on_restart_failed(): self.log.warning("Restarting kernel failed: %s", kernel_id) @@ -341,8 +343,7 @@ class MappingKernelManager(MultiKernelManager): channel.on_recv(on_reply) loop = IOLoop.current() timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout) - # wait for restart to complete - yield future + return future def notify_connect(self, kernel_id): """Notice a new connection to a kernel""" @@ -356,14 +357,13 @@ class MappingKernelManager(MultiKernelManager): def kernel_model(self, kernel_id): """Return a JSON-safe dict representing a kernel - For use in representing kernels in the JSON APIs. """ self._check_kernel_id(kernel_id) kernel = self._kernels[kernel_id] model = { - "id":kernel_id, + "id": kernel_id, "name": kernel.kernel_name, "last_activity": isoformat(kernel.last_activity), "execution_state": kernel.execution_state, @@ -374,7 +374,7 @@ class MappingKernelManager(MultiKernelManager): def list_kernels(self): """Returns a list of kernel_id's of kernels running.""" kernels = [] - kernel_ids = super(MappingKernelManager, self).list_kernel_ids() + kernel_ids = self.pinned_superclass.list_kernel_ids(self) for kernel_id in kernel_ids: model = self.kernel_model(kernel_id) kernels.append(model) @@ -390,7 +390,6 @@ class MappingKernelManager(MultiKernelManager): def start_watching_activity(self, kernel_id): """Start watching IOPub messages on a kernel for activity. - - update last_activity on every message - record execution_state from status messages """ @@ -422,13 +421,12 @@ class MappingKernelManager(MultiKernelManager): def initialize_culler(self): """Start idle culler if 'cull_idle_timeout' is greater than zero. - Regardless of that value, set flag that we've been here. """ if not self._initialized_culler and self.cull_idle_timeout > 0: if self._culler_callback is None: loop = IOLoop.current() - if self.cull_interval <= 0: #handle case where user set invalid value + if self.cull_interval <= 0: # handle case where user set invalid value self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", self.cull_interval, self.cull_interval_default) self.cull_interval = self.cull_interval_default @@ -444,20 +442,25 @@ class MappingKernelManager(MultiKernelManager): self._initialized_culler = True - def cull_kernels(self): + async def cull_kernels(self): self.log.debug("Polling every %s seconds for kernels idle > %s seconds...", self.cull_interval, self.cull_idle_timeout) """Create a separate list of kernels to avoid conflicting updates while iterating""" for kernel_id in list(self._kernels): try: - self.cull_kernel_if_idle(kernel_id) + await self.cull_kernel_if_idle(kernel_id) except Exception as e: - self.log.exception("The following exception was encountered while checking the idle duration of kernel %s: %s", - kernel_id, e) + self.log.exception("The following exception was encountered while checking the " + "idle duration of kernel {}: {}".format(kernel_id, e)) - def cull_kernel_if_idle(self, kernel_id): - kernel = self._kernels[kernel_id] - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", kernel_id, kernel.kernel_name, kernel.last_activity) + async def cull_kernel_if_idle(self, kernel_id): + try: + kernel = self._kernels[kernel_id] + except KeyError: + return # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made. + + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, kernel.kernel_name, kernel.last_activity) if kernel.last_activity is not None: dt_now = utcnow() dt_idle = dt_now - kernel.last_activity @@ -471,4 +474,36 @@ class MappingKernelManager(MultiKernelManager): idle_duration = int(dt_idle.total_seconds()) self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration) - self.shutdown_kernel(kernel_id) + await maybe_future(self.shutdown_kernel(kernel_id)) + + +# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, overriding +# only what is different. +class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): + @default('kernel_manager_class') + def _default_kernel_manager_class(self): + return "jupyter_client.ioloop.AsyncIOLoopKernelManager" + + def __init__(self, **kwargs): + # Pin the superclass to better control the MRO. + self.pinned_superclass = AsyncMultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) + self.last_kernel_activity = utcnow() + + async def shutdown_kernel(self, kernel_id, now=False, restart=False): + """Shutdown a kernel by kernel_id""" + self._check_kernel_id(kernel_id) + kernel = self._kernels[kernel_id] + if kernel._activity_stream: + kernel._activity_stream.close() + kernel._activity_stream = None + self.stop_buffering(kernel_id) + self._kernel_connections.pop(kernel_id, None) + + # Decrease the metric of number of kernels + # running for the relevant kernel type by 1 + KERNEL_CURRENTLY_RUNNING_TOTAL.labels( + type=self._kernels[kernel_id].kernel_name + ).dec() + + return await self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) diff --git a/notebook/services/kernels/tests/test_kernels_api.py b/notebook/services/kernels/tests/test_kernels_api.py index 83bfb0c3e..70ae0e878 100644 --- a/notebook/services/kernels/tests/test_kernels_api.py +++ b/notebook/services/kernels/tests/test_kernels_api.py @@ -1,6 +1,7 @@ """Test the kernels service API.""" import json +import sys import time from traitlets.config import Config @@ -8,12 +9,19 @@ from traitlets.config import Config from tornado.httpclient import HTTPRequest from tornado.ioloop import IOLoop from tornado.websocket import websocket_connect +from unittest import SkipTest from jupyter_client.kernelspec import NATIVE_KERNEL_NAME from notebook.utils import url_path_join from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error +try: + from jupyter_client import AsyncMultiKernelManager + async_testing_enabled = True +except ImportError: + async_testing_enabled = False + class KernelAPI(object): """Wrapper for kernel REST API requests""" @@ -188,6 +196,29 @@ class KernelAPITest(NotebookTestBase): self.assertEqual(model['connections'], 0) +class AsyncKernelAPITest(KernelAPITest): + """Test the kernels web service API using the AsyncMappingKernelManager""" + + @classmethod + def setup_class(cls): + if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required. + raise SkipTest("AsyncKernelAPITest tests skipped due to down-level jupyter_client!") + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. + raise SkipTest("AsyncKernelAPITest tests skipped due to Python < 3.6!") + super(AsyncKernelAPITest, cls).setup_class() + + @classmethod + def get_argv(cls): + argv = super(AsyncKernelAPITest, cls).get_argv() + + # before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. + if async_testing_enabled: + argv.extend(['--NotebookApp.kernel_manager_class=' + 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + return argv + + class KernelFilterTest(NotebookTestBase): # A special install of NotebookTestBase where only `kernel_info_request` @@ -199,6 +230,7 @@ class KernelFilterTest(NotebookTestBase): } } }) + # Sanity check verifying that the configurable was properly set. def test_config(self): self.assertEqual(self.notebook.kernel_manager.allowed_message_types, ['kernel_info_request']) diff --git a/notebook/services/sessions/tests/test_sessions_api.py b/notebook/services/sessions/tests/test_sessions_api.py index 9c551fc79..0e30fc3ea 100644 --- a/notebook/services/sessions/tests/test_sessions_api.py +++ b/notebook/services/sessions/tests/test_sessions_api.py @@ -5,17 +5,26 @@ from functools import partial import io import os import json -import requests import shutil +import sys import time -pjoin = os.path.join +from unittest import SkipTest from notebook.utils import url_path_join from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error from nbformat.v4 import new_notebook from nbformat import write +try: + from jupyter_client import AsyncMultiKernelManager + async_testing_enabled = True +except ImportError: + async_testing_enabled = False + +pjoin = os.path.join + + class SessionAPI(object): """Wrapper for notebook API calls.""" def __init__(self, request): @@ -77,6 +86,7 @@ class SessionAPI(object): def delete(self, id): return self._req('DELETE', id) + class SessionAPITest(NotebookTestBase): """Test the sessions web service API""" def setUp(self): @@ -254,3 +264,27 @@ class SessionAPITest(NotebookTestBase): kernel.pop('last_activity') [ k.pop('last_activity') for k in kernel_list ] self.assertEqual(kernel_list, [kernel]) + + +class AsyncSessionAPITest(SessionAPITest): + """Test the sessions web service API using the AsyncMappingKernelManager""" + + @classmethod + def setup_class(cls): + if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required. + raise SkipTest("AsyncSessionAPITest tests skipped due to down-level jupyter_client!") + if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped. + raise SkipTest("AsyncSessionAPITest tests skipped due to Python < 3.6!") + super(AsyncSessionAPITest, cls).setup_class() + + @classmethod + def get_argv(cls): + argv = super(AsyncSessionAPITest, cls).get_argv() + + # Before we extend the argv with the class, ensure that appropriate jupyter_client is available. + # if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests. + if async_testing_enabled: + argv.extend(['--NotebookApp.kernel_manager_class=' + 'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager']) + + return argv diff --git a/notebook/utils.py b/notebook/utils.py index 69f3586f1..9ec10773f 100644 --- a/notebook/utils.py +++ b/notebook/utils.py @@ -327,3 +327,43 @@ def maybe_future(obj): f.set_result(obj) return f + +def run_sync(maybe_async): + """If async, runs maybe_async and blocks until it has executed, + possibly creating an event loop. + If not async, just returns maybe_async as it is the result of something + that has already executed. + Parameters + ---------- + maybe_async : async or non-async object + The object to be executed, if it is async. + Returns + ------- + result : + Whatever the async object returns, or the object itself. + """ + if not inspect.isawaitable(maybe_async): + # that was not something async, just return it + return maybe_async + # it is async, we need to run it in an event loop + + def wrapped(): + create_new_event_loop = False + try: + loop = asyncio.get_event_loop() + except RuntimeError: + create_new_event_loop = True + else: + if loop.is_closed(): + create_new_event_loop = True + if create_new_event_loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(maybe_async) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + # just return a Future, hoping that it will be awaited + result = asyncio.ensure_future(maybe_async) + return result + return wrapped()