Merge pull request #3011 from minrk/kernelclient

IPEP 12: add KernelClient
Brian E. Granger 13 years ago
commit 13032d01ba

@ -407,6 +407,9 @@ class ZMQStreamHandler(websocket.WebSocketHandler):
return jsonapi.dumps(msg, default=date_default)
def _on_zmq_reply(self, msg_list):
# Sometimes this gets triggered when the on_close method is scheduled in the
# eventloop but hasn't been called.
if self.stream.closed(): return
try:
msg = self._reserialize_reply(msg_list)
except Exception:
@ -466,10 +469,7 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
class IOPubHandler(AuthenticatedZMQStreamHandler):
def initialize(self, *args, **kwargs):
self._kernel_alive = True
self._beating = False
self.iopub_stream = None
self.hb_stream = None
def on_first_message(self, msg):
try:
@ -478,12 +478,11 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
self.close()
return
km = self.application.kernel_manager
self.time_to_dead = km.time_to_dead
self.first_beat = km.first_beat
kernel_id = self.kernel_id
km.add_restart_callback(kernel_id, self.on_kernel_restarted)
km.add_restart_callback(kernel_id, self.on_restart_failed, 'dead')
try:
self.iopub_stream = km.create_iopub_stream(kernel_id)
self.hb_stream = km.create_hb_stream(kernel_id)
self.iopub_stream = km.connect_iopub(kernel_id)
except web.HTTPError:
# WebSockets don't response to traditional error codes so we
# close the connection.
@ -492,81 +491,39 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
self.close()
else:
self.iopub_stream.on_recv(self._on_zmq_reply)
self.start_hb(self.kernel_died)
def on_message(self, msg):
pass
def _send_status_message(self, status):
msg = self.session.msg("status",
{'execution_state': status}
)
self.write_message(jsonapi.dumps(msg, default=date_default))
def on_kernel_restarted(self):
logging.warn("kernel %s restarted", self.kernel_id)
self._send_status_message('restarting')
def on_restart_failed(self):
logging.error("kernel %s restarted failed!", self.kernel_id)
self._send_status_message('dead')
def on_close(self):
# This method can be called twice, once by self.kernel_died and once
# from the WebSocket close event. If the WebSocket connection is
# closed before the ZMQ streams are setup, they could be None.
self.stop_hb()
km = self.application.kernel_manager
if self.kernel_id in km:
km.remove_restart_callback(
self.kernel_id, self.on_kernel_restarted,
)
km.remove_restart_callback(
self.kernel_id, self.on_restart_failed, 'dead',
)
if self.iopub_stream is not None and not self.iopub_stream.closed():
self.iopub_stream.on_recv(None)
self.iopub_stream.close()
if self.hb_stream is not None and not self.hb_stream.closed():
self.hb_stream.close()
def start_hb(self, callback):
"""Start the heartbeating and call the callback if the kernel dies."""
if not self._beating:
self._kernel_alive = True
def ping_or_dead():
self.hb_stream.flush()
if self._kernel_alive:
self._kernel_alive = False
self.hb_stream.send(b'ping')
# flush stream to force immediate socket send
self.hb_stream.flush()
else:
try:
callback()
except:
pass
finally:
self.stop_hb()
def beat_received(msg):
self._kernel_alive = True
self.hb_stream.on_recv(beat_received)
loop = ioloop.IOLoop.instance()
self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
self._beating= True
def _really_start_hb(self):
"""callback for delayed heartbeat start
Only start the hb loop if we haven't been closed during the wait.
"""
if self._beating and not self.hb_stream.closed():
self._hb_periodic_callback.start()
def stop_hb(self):
"""Stop the heartbeating and cancel all related callbacks."""
if self._beating:
self._beating = False
self._hb_periodic_callback.stop()
if not self.hb_stream.closed():
self.hb_stream.on_recv(None)
def _delete_kernel_data(self):
"""Remove the kernel data and notebook mapping."""
self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
def kernel_died(self):
self._delete_kernel_data()
self.application.log.error("Kernel died: %s" % self.kernel_id)
self.write_message(
{'header': {'msg_type': 'status'},
'parent_header': {},
'content': {'execution_state':'dead'}
}
)
self.on_close()
class ShellHandler(AuthenticatedZMQStreamHandler):
@ -584,7 +541,7 @@ class ShellHandler(AuthenticatedZMQStreamHandler):
self.max_msg_size = km.max_msg_size
kernel_id = self.kernel_id
try:
self.shell_stream = km.create_shell_stream(kernel_id)
self.shell_stream = km.connect_shell(kernel_id)
except web.HTTPError:
# WebSockets don't response to traditional error codes so we
# close the connection.

@ -20,8 +20,9 @@ from tornado import web
from IPython.kernel.multikernelmanager import MultiKernelManager
from IPython.utils.traitlets import (
Dict, List, Unicode, Float, Integer,
Dict, List, Unicode, Integer,
)
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
@ -30,11 +31,11 @@ from IPython.utils.traitlets import (
class MappingKernelManager(MultiKernelManager):
"""A KernelManager that handles notebok mapping and HTTP error handling"""
def _kernel_manager_class_default(self):
return "IPython.kernel.ioloop.IOLoopKernelManager"
kernel_argv = List(Unicode)
time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
max_msg_size = Integer(65536, config=True, help="""
The max raw message size accepted from the browser
over a WebSocket connection.
@ -57,11 +58,10 @@ class MappingKernelManager(MultiKernelManager):
def notebook_for_kernel(self, kernel_id):
"""Return the notebook_id for a kernel_id or None."""
notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id]
if len(notebook_ids) == 1:
return notebook_ids[0]
else:
return None
for notebook_id, kid in self._notebook_mapping.iteritems():
if kernel_id == kid:
return notebook_id
return None
def delete_mapping_for_kernel(self, kernel_id):
"""Remove the kernel/notebook mapping for kernel_id."""
@ -69,8 +69,14 @@ class MappingKernelManager(MultiKernelManager):
if notebook_id is not None:
del self._notebook_mapping[notebook_id]
def _handle_kernel_died(self, kernel_id):
"""notice that a kernel died"""
self.log.warn("Kernel %s died, removing from map.", kernel_id)
self.delete_mapping_for_kernel(kernel_id)
self.remove_kernel(kernel_id, now=True)
def start_kernel(self, notebook_id=None, **kwargs):
"""Start a kernel for a notebok an return its kernel_id.
"""Start a kernel for a notebook an return its kernel_id.
Parameters
----------
@ -86,46 +92,22 @@ class MappingKernelManager(MultiKernelManager):
self.set_kernel_for_notebook(notebook_id, kernel_id)
self.log.info("Kernel started: %s" % kernel_id)
self.log.debug("Kernel args: %r" % kwargs)
# register callback for failed auto-restart
self.add_restart_callback(kernel_id,
lambda : self._handle_kernel_died(kernel_id),
'dead',
)
else:
self.log.info("Using existing kernel: %s" % kernel_id)
return kernel_id
def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel and remove its notebook association."""
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).shutdown_kernel(
kernel_id, now=now
)
"""Shutdown a kernel by kernel_id"""
super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now)
self.delete_mapping_for_kernel(kernel_id)
self.log.info("Kernel shutdown: %s" % kernel_id)
def interrupt_kernel(self, kernel_id):
"""Interrupt a kernel."""
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).interrupt_kernel(kernel_id)
self.log.info("Kernel interrupted: %s" % kernel_id)
def restart_kernel(self, kernel_id):
"""Restart a kernel while keeping clients connected."""
self._check_kernel_id(kernel_id)
super(MappingKernelManager, self).restart_kernel(kernel_id)
self.log.info("Kernel restarted: %s" % kernel_id)
def create_iopub_stream(self, kernel_id):
"""Create a new iopub stream."""
self._check_kernel_id(kernel_id)
return super(MappingKernelManager, self).create_iopub_stream(kernel_id)
def create_shell_stream(self, kernel_id):
"""Create a new shell stream."""
self._check_kernel_id(kernel_id)
return super(MappingKernelManager, self).create_shell_stream(kernel_id)
def create_hb_stream(self, kernel_id):
"""Create a new hb stream."""
self._check_kernel_id(kernel_id)
return super(MappingKernelManager, self).create_hb_stream(kernel_id)
# override _check_kernel_id to raise 404 instead of KeyError
def _check_kernel_id(self, kernel_id):
"""Check a that a kernel_id exists and raise 404 if not."""
if kernel_id not in self:

@ -104,8 +104,6 @@ var IPython = (function (IPython) {
this.ws_url = json.ws_url;
this.kernel_url = this.base_url + "/" + this.kernel_id;
this.start_channels();
this.shell_channel.onmessage = $.proxy(this._handle_shell_reply,this);
this.iopub_channel.onmessage = $.proxy(this._handle_iopub_reply,this);
$([IPython.events]).trigger('status_started.Kernel', {kernel: this});
};
@ -165,6 +163,8 @@ var IPython = (function (IPython) {
that.iopub_channel.onclose = ws_closed_late;
}
}, 1000);
this.shell_channel.onmessage = $.proxy(this._handle_shell_reply,this);
this.iopub_channel.onmessage = $.proxy(this._handle_iopub_reply,this);
};
/**
@ -418,6 +418,8 @@ var IPython = (function (IPython) {
$([IPython.events]).trigger('status_busy.Kernel', {kernel: this});
} else if (content.execution_state === 'idle') {
$([IPython.events]).trigger('status_idle.Kernel', {kernel: this});
} else if (content.execution_state === 'restarting') {
$([IPython.events]).trigger('status_restarting.Kernel', {kernel: this});
} else if (content.execution_state === 'dead') {
this.stop_channels();
$([IPython.events]).trigger('status_dead.Kernel', {kernel: this});

@ -84,7 +84,7 @@ var IPython = (function (IPython) {
$([IPython.events]).on('status_restarting.Kernel',function () {
IPython.save_widget.update_document_title();
knw.set_message("Restarting kernel",1000);
knw.set_message("Restarting kernel", 2000);
});
$([IPython.events]).on('status_interrupting.Kernel',function () {
@ -93,9 +93,10 @@ var IPython = (function (IPython) {
$([IPython.events]).on('status_dead.Kernel',function () {
var dialog = $('<div/>');
dialog.html('The kernel has died, would you like to restart it?' +
' If you do not restart the kernel, you will be able to save' +
' the notebook, but running code will not work until the notebook' +
dialog.html('The kernel has died, and the automatic restart has failed.' +
' It is possible the kernel cannot be restarted.' +
' If you are not able to restart the kernel, you will still be able to save' +
' the notebook, but running code will no longer work until the notebook' +
' is reopened.'
);
$(document).append(dialog);
@ -105,7 +106,7 @@ var IPython = (function (IPython) {
title: "Dead kernel",
close: function(event, ui) {$(this).dialog('destroy').remove();},
buttons : {
"Restart": function () {
"Manual Restart": function () {
$([IPython.events]).trigger('status_restarting.Kernel');
IPython.notebook.start_kernel();
$(this).dialog('close');

Loading…
Cancel
Save