|
|
|
|
@ -128,6 +128,65 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
|
|
|
|
|
stream.channel = channel
|
|
|
|
|
|
|
|
|
|
def nudge(self):
|
|
|
|
|
shell_channel = self.channels['shell']
|
|
|
|
|
iopub_channel = self.channels['iopub']
|
|
|
|
|
|
|
|
|
|
future = Future()
|
|
|
|
|
info_future = Future()
|
|
|
|
|
iopub_future = Future()
|
|
|
|
|
|
|
|
|
|
def finish():
|
|
|
|
|
"""Common cleanup"""
|
|
|
|
|
loop.remove_timeout(timeout)
|
|
|
|
|
loop.remove_timeout(nudge_handle)
|
|
|
|
|
iopub_channel.stop_on_recv()
|
|
|
|
|
shell_channel.stop_on_recv()
|
|
|
|
|
|
|
|
|
|
def on_shell_reply(msg):
|
|
|
|
|
if not info_future.done():
|
|
|
|
|
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
|
|
|
|
|
shell_channel.stop_on_recv()
|
|
|
|
|
self.log.debug("Nudge: resolving shell future")
|
|
|
|
|
info_future.set_result(msg)
|
|
|
|
|
if iopub_future.done():
|
|
|
|
|
finish()
|
|
|
|
|
self.log.debug("Nudge: resolving main future in shell handler")
|
|
|
|
|
future.set_result(info_future.result())
|
|
|
|
|
|
|
|
|
|
def on_iopub(msg):
|
|
|
|
|
if not iopub_future.done():
|
|
|
|
|
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
|
|
|
|
|
iopub_channel.stop_on_recv()
|
|
|
|
|
self.log.debug("Nudge: resolving iopub future")
|
|
|
|
|
iopub_future.set_result(None)
|
|
|
|
|
if info_future.done():
|
|
|
|
|
finish()
|
|
|
|
|
self.log.debug("Nudge: resolving main future in iopub handler")
|
|
|
|
|
future.set_result(info_future.result())
|
|
|
|
|
|
|
|
|
|
def on_timeout():
|
|
|
|
|
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
|
|
|
|
|
finish()
|
|
|
|
|
if not future.done():
|
|
|
|
|
future.set_exception(TimeoutError("Timeout waiting for nudge"))
|
|
|
|
|
|
|
|
|
|
iopub_channel.on_recv(on_iopub)
|
|
|
|
|
shell_channel.on_recv(on_shell_reply)
|
|
|
|
|
loop = IOLoop.current()
|
|
|
|
|
|
|
|
|
|
# Nudge the kernel with kernel info requests until we get an IOPub message
|
|
|
|
|
def nudge():
|
|
|
|
|
self.log.debug("Nudge")
|
|
|
|
|
if not future.done():
|
|
|
|
|
self.log.debug("nudging")
|
|
|
|
|
self.session.send(shell_channel, "kernel_info_request")
|
|
|
|
|
nudge_handle = loop.call_later(0.5, nudge)
|
|
|
|
|
nudge_handle = loop.call_later(0, nudge)
|
|
|
|
|
|
|
|
|
|
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
|
|
|
|
|
return future
|
|
|
|
|
|
|
|
|
|
def request_kernel_info(self):
|
|
|
|
|
"""send a request for kernel_info"""
|
|
|
|
|
km = self.kernel_manager
|
|
|
|
|
@ -253,6 +312,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
yield stale_handler.close()
|
|
|
|
|
self._open_sessions[self.session_key] = self
|
|
|
|
|
|
|
|
|
|
@gen.coroutine
|
|
|
|
|
def open(self, kernel_id):
|
|
|
|
|
super().open()
|
|
|
|
|
km = self.kernel_manager
|
|
|
|
|
@ -263,15 +323,21 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
if buffer_info and buffer_info['session_key'] == self.session_key:
|
|
|
|
|
self.log.info("Restoring connection for %s", self.session_key)
|
|
|
|
|
self.channels = buffer_info['channels']
|
|
|
|
|
replay_buffer = buffer_info['buffer']
|
|
|
|
|
if replay_buffer:
|
|
|
|
|
self.log.info("Replaying %s buffered messages", len(replay_buffer))
|
|
|
|
|
for channel, msg_list in replay_buffer:
|
|
|
|
|
stream = self.channels[channel]
|
|
|
|
|
self._on_zmq_reply(stream, msg_list)
|
|
|
|
|
connected = self.nudge()
|
|
|
|
|
|
|
|
|
|
def replay(value):
|
|
|
|
|
replay_buffer = buffer_info['buffer']
|
|
|
|
|
if replay_buffer:
|
|
|
|
|
self.log.info("Replaying %s buffered messages", len(replay_buffer))
|
|
|
|
|
for channel, msg_list in replay_buffer:
|
|
|
|
|
stream = self.channels[channel]
|
|
|
|
|
self._on_zmq_reply(stream, msg_list)
|
|
|
|
|
|
|
|
|
|
connected.add_done_callback(replay)
|
|
|
|
|
else:
|
|
|
|
|
try:
|
|
|
|
|
self.create_stream()
|
|
|
|
|
connected = self.nudge()
|
|
|
|
|
except web.HTTPError as e:
|
|
|
|
|
self.log.error("Error opening stream: %s", e)
|
|
|
|
|
# WebSockets don't response to traditional error codes so we
|
|
|
|
|
@ -285,8 +351,13 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
|
|
|
|
|
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
|
|
|
|
|
|
|
|
|
|
for channel, stream in self.channels.items():
|
|
|
|
|
stream.on_recv_stream(self._on_zmq_reply)
|
|
|
|
|
def subscribe(value):
|
|
|
|
|
for channel, stream in self.channels.items():
|
|
|
|
|
stream.on_recv_stream(self._on_zmq_reply)
|
|
|
|
|
|
|
|
|
|
connected.add_done_callback(subscribe)
|
|
|
|
|
|
|
|
|
|
return connected
|
|
|
|
|
|
|
|
|
|
def on_message(self, msg):
|
|
|
|
|
if not self.channels:
|
|
|
|
|
|