Starting to refactor heart beating of notebook kernels.

pull/37/head
Brian E. Granger 13 years ago committed by MinRK
parent 0a3bdf042e
commit 4e0c180429

@ -466,10 +466,9 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
class IOPubHandler(AuthenticatedZMQStreamHandler):
def initialize(self, *args, **kwargs):
self._kernel_alive = True
self._beating = False
self.iopub_stream = None
self.hb_stream = None
self.heartbeat = None
def on_first_message(self, msg):
try:
@ -478,12 +477,13 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
self.close()
return
km = self.application.kernel_manager
self.time_to_dead = km.time_to_dead
self.first_beat = km.first_beat
kernel_id = self.kernel_id
try:
self.iopub_stream = km.create_iopub_stream(kernel_id)
self.hb_stream = km.create_hb_stream(kernel_id)
self.heartbeat = Heartbeat(
stream=self.hb_stream, config=self.application.config
)
except web.HTTPError:
# WebSockets don't response to traditional error codes so we
# close the connection.
@ -492,7 +492,7 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
self.close()
else:
self.iopub_stream.on_recv(self._on_zmq_reply)
self.start_hb(self.kernel_died)
self.heartbeat.start(self.kernel_died)
def on_message(self, msg):
pass
@ -507,51 +507,7 @@ class IOPubHandler(AuthenticatedZMQStreamHandler):
self.iopub_stream.close()
if self.hb_stream is not None and not self.hb_stream.closed():
self.hb_stream.close()
def start_hb(self, callback):
"""Start the heartbeating and call the callback if the kernel dies."""
if not self._beating:
self._kernel_alive = True
def ping_or_dead():
self.hb_stream.flush()
if self._kernel_alive:
self._kernel_alive = False
self.hb_stream.send(b'ping')
# flush stream to force immediate socket send
self.hb_stream.flush()
else:
try:
callback()
except:
pass
finally:
self.stop_hb()
def beat_received(msg):
self._kernel_alive = True
self.hb_stream.on_recv(beat_received)
loop = ioloop.IOLoop.instance()
self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
self._beating= True
def _really_start_hb(self):
"""callback for delayed heartbeat start
Only start the hb loop if we haven't been closed during the wait.
"""
if self._beating and not self.hb_stream.closed():
self._hb_periodic_callback.start()
def stop_hb(self):
"""Stop the heartbeating and cancel all related callbacks."""
if self._beating:
self._beating = False
self._hb_periodic_callback.stop()
if not self.hb_stream.closed():
self.hb_stream.on_recv(None)
# stop the heartbeat here
def _delete_kernel_data(self):
"""Remove the kernel data and notebook mapping."""

@ -22,6 +22,7 @@ from IPython.kernel.multikernelmanager import MultiKernelManager
from IPython.utils.traitlets import (
Dict, List, Unicode, Float, Integer,
)
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------

Loading…
Cancel
Save