Merge pull request #4479 from kevin-bates/async-subclasses

Add support for async kernel management
Zachary Sailer 6 years ago committed by GitHub
commit d04cbb6a98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,6 +7,7 @@
from __future__ import absolute_import, print_function
import notebook
import asyncio
import binascii
import datetime
import errno
@ -75,7 +76,7 @@ from notebook import (
from .base.handlers import Template404, RedirectWithParams
from .log import log_request
from .services.kernels.kernelmanager import MappingKernelManager
from .services.kernels.kernelmanager import MappingKernelManager, AsyncMappingKernelManager
from .services.config import ConfigManager
from .services.contents.manager import ContentsManager
from .services.contents.filemanager import FileContentsManager
@ -106,7 +107,14 @@ from jupyter_core.paths import jupyter_runtime_dir, jupyter_path
from notebook._sysinfo import get_sys_info
from ._tz import utcnow, utcfromtimestamp
from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url
from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url, run_sync
# Check if we can use async kernel management
try:
from jupyter_client import AsyncMultiKernelManager
async_kernel_mgmt_available = True
except ImportError:
async_kernel_mgmt_available = False
#-----------------------------------------------------------------------------
# Module globals
@ -1177,6 +1185,7 @@ class NotebookApp(JupyterApp):
kernel_manager_class = Type(
default_value=MappingKernelManager,
klass=MappingKernelManager,
config=True,
help=_('The kernel manager class to use.')
)
@ -1374,12 +1383,24 @@ class NotebookApp(JupyterApp):
self.kernel_spec_manager = self.kernel_spec_manager_class(
parent=self,
)
self.kernel_manager = self.kernel_manager_class(
parent=self,
log=self.log,
connection_dir=self.runtime_dir,
kernel_spec_manager=self.kernel_spec_manager,
)
# Ensure the appropriate version of Python and jupyter_client is available.
if isinstance(self.kernel_manager, AsyncMappingKernelManager):
if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped.
raise ValueError("You are using `AsyncMappingKernelManager` in Python 3.5 (or lower) "
"which is not supported. Please upgrade Python to 3.6+ or change kernel managers.")
if not async_kernel_mgmt_available: # Can be removed once jupyter_client >= 6.1 is required.
raise ValueError("You are using `AsyncMappingKernelManager` without an appropriate "
"jupyter_client installed! Please upgrade jupyter_client or change kernel managers.")
self.log.info("Asynchronous kernel management has been configured to use '{}'.".
format(self.kernel_manager.__class__.__name__))
self.contents_manager = self.contents_manager_class(
parent=self,
log=self.log,
@ -1782,7 +1803,7 @@ class NotebookApp(JupyterApp):
n_kernels = len(self.kernel_manager.list_kernel_ids())
kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels)
self.log.info(kernel_msg % n_kernels)
self.kernel_manager.shutdown_all()
run_sync(self.kernel_manager.shutdown_all())
def notebook_info(self, kernel_count=True):
"Return the current working directory and the server url information"
@ -1793,7 +1814,8 @@ class NotebookApp(JupyterApp):
info += kernel_msg % n_kernels
info += "\n"
# Format the info so that the URL fits on a single line in 80 char display
info += _("The Jupyter Notebook is running at:\n%s") % self.display_url
info += _("Jupyter Notebook {version} is running at:\n{url}".
format(version=NotebookApp.version, url=self.display_url))
if self.gateway_config.gateway_enabled:
info += _("\nKernels will be managed by the Gateway server running at:\n%s") % self.gateway_config.url
return info
@ -1897,8 +1919,8 @@ class NotebookApp(JupyterApp):
assembled_url = urljoin('file:', pathname2url(open_file))
else:
assembled_url = url_path_join(self.connection_url, uri)
b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new)
b = lambda: browser.open(assembled_url, new=self.webbrowser_open_new)
threading.Thread(target=b).start()
def start(self):

@ -75,7 +75,7 @@ class KernelActionHandler(APIHandler):
def post(self, kernel_id, action):
km = self.kernel_manager
if action == 'interrupt':
km.interrupt_kernel(kernel_id)
yield maybe_future(km.interrupt_kernel(kernel_id))
self.set_status(204)
if action == 'restart':

@ -1,5 +1,4 @@
"""A MultiKernelManager for use in the notebook webserver
- raises HTTPErrors
- creates REST API models
"""
@ -12,7 +11,7 @@ from datetime import datetime, timedelta
from functools import partial
import os
from tornado import gen, web
from tornado import web
from tornado.concurrent import Future
from tornado.ioloop import IOLoop, PeriodicCallback
@ -28,6 +27,17 @@ from ipython_genutils.py3compat import getcwd
from notebook.prometheus.metrics import KERNEL_CURRENTLY_RUNNING_TOTAL
# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jupyter_client.
# This will be confirmed at runtime in notebookapp. The following block can be removed once the jupyter_client's
# floor has been updated.
try:
from jupyter_client.multikernelmanager import AsyncMultiKernelManager
except ImportError:
class AsyncMultiKernelManager(object):
"""Empty class to satisfy unused reference by AsyncMappingKernelManager."""
def __init__(self, **kwargs):
pass
class MappingKernelManager(MultiKernelManager):
"""A KernelManager that handles notebook mapping and HTTP error handling"""
@ -70,7 +80,7 @@ class MappingKernelManager(MultiKernelManager):
for users with poor network connections."""
)
cull_interval_default = 300 # 5 minutes
cull_interval_default = 300 # 5 minutes
cull_interval = Integer(cull_interval_default, config=True,
help="""The interval (in seconds) on which to check for idle kernels exceeding the cull timeout value."""
)
@ -87,10 +97,8 @@ class MappingKernelManager(MultiKernelManager):
buffer_offline_messages = Bool(True, config=True,
help="""Whether messages from kernels whose frontends have disconnected should be buffered in-memory.
When True (default), messages are buffered and replayed on reconnect,
avoiding lost messages due to interrupted connectivity.
Disable if long-running kernels will produce too much output while
no frontends are connected.
"""
@ -98,7 +106,6 @@ class MappingKernelManager(MultiKernelManager):
kernel_info_timeout = Float(60, config=True,
help="""Timeout for giving up on a kernel (in seconds).
On starting and restarting kernels, we check whether the
kernel is running and responsive by sending kernel_info_requests.
This sets the timeout in seconds for how long the kernel can take
@ -116,10 +123,6 @@ class MappingKernelManager(MultiKernelManager):
last_kernel_activity = Instance(datetime,
help="The last activity on any kernel, including shutting down a kernel")
def __init__(self, **kwargs):
super(MappingKernelManager, self).__init__(**kwargs)
self.last_kernel_activity = utcnow()
allowed_message_types = List(trait=Unicode(), config=True,
help="""White list of allowed kernel message types.
When the list is empty, all message types are allowed.
@ -130,6 +133,14 @@ class MappingKernelManager(MultiKernelManager):
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
def __init__(self, **kwargs):
# Pin the superclass to better control the MRO. This is needed by
# AsyncMappingKernelManager so that it can give priority to methods
# on AsyncMultiKernelManager over this superclass.
self.pinned_superclass = MultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()
def _handle_kernel_died(self, kernel_id):
"""notice that a kernel died"""
self.log.warning("Kernel %s died, removing from map.", kernel_id)
@ -144,10 +155,8 @@ class MappingKernelManager(MultiKernelManager):
os_path = os.path.dirname(os_path)
return os_path
@gen.coroutine
def start_kernel(self, kernel_id=None, path=None, **kwargs):
async def start_kernel(self, kernel_id=None, path=None, **kwargs):
"""Start a kernel for a session and return its kernel_id.
Parameters
----------
kernel_id : uuid
@ -164,9 +173,7 @@ class MappingKernelManager(MultiKernelManager):
if kernel_id is None:
if path is not None:
kwargs['cwd'] = self.cwd_for_path(path)
kernel_id = yield maybe_future(
super(MappingKernelManager, self).start_kernel(**kwargs)
)
kernel_id = await maybe_future(self.pinned_superclass.start_kernel(self, **kwargs))
self._kernel_connections[kernel_id] = 0
self.start_watching_activity(kernel_id)
self.log.info("Kernel started: %s" % kernel_id)
@ -191,12 +198,10 @@ class MappingKernelManager(MultiKernelManager):
if not self._initialized_culler:
self.initialize_culler()
# py2-compat
raise gen.Return(kernel_id)
return kernel_id
def start_buffering(self, kernel_id, session_key, channels):
"""Start buffering messages for a kernel
Parameters
----------
kernel_id : str
@ -235,7 +240,6 @@ class MappingKernelManager(MultiKernelManager):
def get_buffer(self, kernel_id, session_key):
"""Get the buffer for a given kernel
Parameters
----------
kernel_id : str
@ -260,7 +264,6 @@ class MappingKernelManager(MultiKernelManager):
def stop_buffering(self, kernel_id):
"""Stop buffering kernel messages
Parameters
----------
kernel_id : str
@ -283,7 +286,7 @@ class MappingKernelManager(MultiKernelManager):
self.log.info("Discarding %s buffered messages for %s",
len(msg_buffer), buffer_info['session_key'])
def shutdown_kernel(self, kernel_id, now=False):
def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
kernel = self._kernels[kernel_id]
@ -299,13 +302,12 @@ class MappingKernelManager(MultiKernelManager):
type=self._kernels[kernel_id].kernel_name
).dec()
return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now)
return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)
@gen.coroutine
def restart_kernel(self, kernel_id):
async def restart_kernel(self, kernel_id, now=False):
"""Restart a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
yield maybe_future(super(MappingKernelManager, self).restart_kernel(kernel_id))
await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now))
kernel = self.get_kernel(kernel_id)
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel.connect_shell()
@ -328,7 +330,7 @@ class MappingKernelManager(MultiKernelManager):
self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
finish()
if not future.done():
future.set_exception(gen.TimeoutError("Timeout waiting for restart"))
future.set_exception(TimeoutError("Timeout waiting for restart"))
def on_restart_failed():
self.log.warning("Restarting kernel failed: %s", kernel_id)
@ -341,8 +343,7 @@ class MappingKernelManager(MultiKernelManager):
channel.on_recv(on_reply)
loop = IOLoop.current()
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
# wait for restart to complete
yield future
return future
def notify_connect(self, kernel_id):
"""Notice a new connection to a kernel"""
@ -356,14 +357,13 @@ class MappingKernelManager(MultiKernelManager):
def kernel_model(self, kernel_id):
"""Return a JSON-safe dict representing a kernel
For use in representing kernels in the JSON APIs.
"""
self._check_kernel_id(kernel_id)
kernel = self._kernels[kernel_id]
model = {
"id":kernel_id,
"id": kernel_id,
"name": kernel.kernel_name,
"last_activity": isoformat(kernel.last_activity),
"execution_state": kernel.execution_state,
@ -374,7 +374,7 @@ class MappingKernelManager(MultiKernelManager):
def list_kernels(self):
"""Returns a list of kernel_id's of kernels running."""
kernels = []
kernel_ids = super(MappingKernelManager, self).list_kernel_ids()
kernel_ids = self.pinned_superclass.list_kernel_ids(self)
for kernel_id in kernel_ids:
model = self.kernel_model(kernel_id)
kernels.append(model)
@ -390,7 +390,6 @@ class MappingKernelManager(MultiKernelManager):
def start_watching_activity(self, kernel_id):
"""Start watching IOPub messages on a kernel for activity.
- update last_activity on every message
- record execution_state from status messages
"""
@ -422,13 +421,12 @@ class MappingKernelManager(MultiKernelManager):
def initialize_culler(self):
"""Start idle culler if 'cull_idle_timeout' is greater than zero.
Regardless of that value, set flag that we've been here.
"""
if not self._initialized_culler and self.cull_idle_timeout > 0:
if self._culler_callback is None:
loop = IOLoop.current()
if self.cull_interval <= 0: #handle case where user set invalid value
if self.cull_interval <= 0: # handle case where user set invalid value
self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
self.cull_interval, self.cull_interval_default)
self.cull_interval = self.cull_interval_default
@ -444,20 +442,25 @@ class MappingKernelManager(MultiKernelManager):
self._initialized_culler = True
def cull_kernels(self):
async def cull_kernels(self):
self.log.debug("Polling every %s seconds for kernels idle > %s seconds...",
self.cull_interval, self.cull_idle_timeout)
"""Create a separate list of kernels to avoid conflicting updates while iterating"""
for kernel_id in list(self._kernels):
try:
self.cull_kernel_if_idle(kernel_id)
await self.cull_kernel_if_idle(kernel_id)
except Exception as e:
self.log.exception("The following exception was encountered while checking the idle duration of kernel %s: %s",
kernel_id, e)
self.log.exception("The following exception was encountered while checking the "
"idle duration of kernel {}: {}".format(kernel_id, e))
def cull_kernel_if_idle(self, kernel_id):
kernel = self._kernels[kernel_id]
self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", kernel_id, kernel.kernel_name, kernel.last_activity)
async def cull_kernel_if_idle(self, kernel_id):
try:
kernel = self._kernels[kernel_id]
except KeyError:
return # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made.
self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s",
kernel_id, kernel.kernel_name, kernel.last_activity)
if kernel.last_activity is not None:
dt_now = utcnow()
dt_idle = dt_now - kernel.last_activity
@ -471,4 +474,36 @@ class MappingKernelManager(MultiKernelManager):
idle_duration = int(dt_idle.total_seconds())
self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.",
kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration)
self.shutdown_kernel(kernel_id)
await maybe_future(self.shutdown_kernel(kernel_id))
# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, overriding
# only what is different.
class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager):
@default('kernel_manager_class')
def _default_kernel_manager_class(self):
return "jupyter_client.ioloop.AsyncIOLoopKernelManager"
def __init__(self, **kwargs):
# Pin the superclass to better control the MRO.
self.pinned_superclass = AsyncMultiKernelManager
self.pinned_superclass.__init__(self, **kwargs)
self.last_kernel_activity = utcnow()
async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
kernel = self._kernels[kernel_id]
if kernel._activity_stream:
kernel._activity_stream.close()
kernel._activity_stream = None
self.stop_buffering(kernel_id)
self._kernel_connections.pop(kernel_id, None)
# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL.labels(
type=self._kernels[kernel_id].kernel_name
).dec()
return await self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart)

@ -1,6 +1,7 @@
"""Test the kernels service API."""
import json
import sys
import time
from traitlets.config import Config
@ -8,12 +9,19 @@ from traitlets.config import Config
from tornado.httpclient import HTTPRequest
from tornado.ioloop import IOLoop
from tornado.websocket import websocket_connect
from unittest import SkipTest
from jupyter_client.kernelspec import NATIVE_KERNEL_NAME
from notebook.utils import url_path_join
from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error
try:
from jupyter_client import AsyncMultiKernelManager
async_testing_enabled = True
except ImportError:
async_testing_enabled = False
class KernelAPI(object):
"""Wrapper for kernel REST API requests"""
@ -188,6 +196,29 @@ class KernelAPITest(NotebookTestBase):
self.assertEqual(model['connections'], 0)
class AsyncKernelAPITest(KernelAPITest):
"""Test the kernels web service API using the AsyncMappingKernelManager"""
@classmethod
def setup_class(cls):
if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required.
raise SkipTest("AsyncKernelAPITest tests skipped due to down-level jupyter_client!")
if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped.
raise SkipTest("AsyncKernelAPITest tests skipped due to Python < 3.6!")
super(AsyncKernelAPITest, cls).setup_class()
@classmethod
def get_argv(cls):
argv = super(AsyncKernelAPITest, cls).get_argv()
# before we extend the argv with the class, ensure that appropriate jupyter_client is available.
# if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests.
if async_testing_enabled:
argv.extend(['--NotebookApp.kernel_manager_class='
'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager'])
return argv
class KernelFilterTest(NotebookTestBase):
# A special install of NotebookTestBase where only `kernel_info_request`
@ -199,6 +230,7 @@ class KernelFilterTest(NotebookTestBase):
}
}
})
# Sanity check verifying that the configurable was properly set.
def test_config(self):
self.assertEqual(self.notebook.kernel_manager.allowed_message_types, ['kernel_info_request'])

@ -5,17 +5,26 @@ from functools import partial
import io
import os
import json
import requests
import shutil
import sys
import time
pjoin = os.path.join
from unittest import SkipTest
from notebook.utils import url_path_join
from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error
from nbformat.v4 import new_notebook
from nbformat import write
try:
from jupyter_client import AsyncMultiKernelManager
async_testing_enabled = True
except ImportError:
async_testing_enabled = False
pjoin = os.path.join
class SessionAPI(object):
"""Wrapper for notebook API calls."""
def __init__(self, request):
@ -77,6 +86,7 @@ class SessionAPI(object):
def delete(self, id):
return self._req('DELETE', id)
class SessionAPITest(NotebookTestBase):
"""Test the sessions web service API"""
def setUp(self):
@ -254,3 +264,27 @@ class SessionAPITest(NotebookTestBase):
kernel.pop('last_activity')
[ k.pop('last_activity') for k in kernel_list ]
self.assertEqual(kernel_list, [kernel])
class AsyncSessionAPITest(SessionAPITest):
"""Test the sessions web service API using the AsyncMappingKernelManager"""
@classmethod
def setup_class(cls):
if not async_testing_enabled: # Can be removed once jupyter_client >= 6.1 is required.
raise SkipTest("AsyncSessionAPITest tests skipped due to down-level jupyter_client!")
if sys.version_info < (3, 6): # Can be removed once 3.5 is dropped.
raise SkipTest("AsyncSessionAPITest tests skipped due to Python < 3.6!")
super(AsyncSessionAPITest, cls).setup_class()
@classmethod
def get_argv(cls):
argv = super(AsyncSessionAPITest, cls).get_argv()
# Before we extend the argv with the class, ensure that appropriate jupyter_client is available.
# if not available, don't set kernel_manager_class, resulting in the repeat of sync-based tests.
if async_testing_enabled:
argv.extend(['--NotebookApp.kernel_manager_class='
'notebook.services.kernels.kernelmanager.AsyncMappingKernelManager'])
return argv

@ -327,3 +327,43 @@ def maybe_future(obj):
f.set_result(obj)
return f
def run_sync(maybe_async):
"""If async, runs maybe_async and blocks until it has executed,
possibly creating an event loop.
If not async, just returns maybe_async as it is the result of something
that has already executed.
Parameters
----------
maybe_async : async or non-async object
The object to be executed, if it is async.
Returns
-------
result :
Whatever the async object returns, or the object itself.
"""
if not inspect.isawaitable(maybe_async):
# that was not something async, just return it
return maybe_async
# it is async, we need to run it in an event loop
def wrapped():
create_new_event_loop = False
try:
loop = asyncio.get_event_loop()
except RuntimeError:
create_new_event_loop = True
else:
if loop.is_closed():
create_new_event_loop = True
if create_new_event_loop:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(maybe_async)
except RuntimeError as e:
if str(e) == 'This event loop is already running':
# just return a Future, hoping that it will be awaited
result = asyncio.ensure_future(maybe_async)
return result
return wrapped()

Loading…
Cancel
Save