|
|
|
|
@ -138,7 +138,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
"""Nudge the zmq connections with kernel_info_requests
|
|
|
|
|
|
|
|
|
|
Returns a Future that will resolve when we have received
|
|
|
|
|
a shell reply and at least one iopub message,
|
|
|
|
|
a shell or control reply and at least one iopub message,
|
|
|
|
|
ensuring that zmq subscriptions are established,
|
|
|
|
|
sockets are fully connected, and kernel is responsive.
|
|
|
|
|
|
|
|
|
|
@ -157,10 +157,12 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
f = Future()
|
|
|
|
|
f.set_result(None)
|
|
|
|
|
return f
|
|
|
|
|
|
|
|
|
|
# Use a transient shell channel to prevent leaking
|
|
|
|
|
# shell responses to the front-end.
|
|
|
|
|
shell_channel = kernel.connect_shell()
|
|
|
|
|
# Use a transient control channel to prevent leaking
|
|
|
|
|
# control responses to the front-end.
|
|
|
|
|
control_channel = kernel.connect_control()
|
|
|
|
|
# The IOPub used by the client, whose subscriptions we are verifying.
|
|
|
|
|
iopub_channel = self.channels["iopub"]
|
|
|
|
|
|
|
|
|
|
@ -183,6 +185,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
iopub_channel.stop_on_recv()
|
|
|
|
|
if not shell_channel.closed():
|
|
|
|
|
shell_channel.close()
|
|
|
|
|
if not control_channel.closed():
|
|
|
|
|
control_channel.close()
|
|
|
|
|
|
|
|
|
|
# trigger cleanup when both message futures are resolved
|
|
|
|
|
both_done.add_done_callback(cleanup)
|
|
|
|
|
@ -193,6 +197,12 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
self.log.debug("Nudge: resolving shell future: %s", self.kernel_id)
|
|
|
|
|
info_future.set_result(None)
|
|
|
|
|
|
|
|
|
|
def on_control_reply(msg):
|
|
|
|
|
self.log.debug("Nudge: control info reply received: %s", self.kernel_id)
|
|
|
|
|
if not info_future.done():
|
|
|
|
|
self.log.debug("Nudge: resolving control future: %s", self.kernel_id)
|
|
|
|
|
info_future.set_result(None)
|
|
|
|
|
|
|
|
|
|
def on_iopub(msg):
|
|
|
|
|
self.log.debug("Nudge: IOPub received: %s", self.kernel_id)
|
|
|
|
|
if not iopub_future.done():
|
|
|
|
|
@ -202,6 +212,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
|
|
|
|
|
iopub_channel.on_recv(on_iopub)
|
|
|
|
|
shell_channel.on_recv(on_shell_reply)
|
|
|
|
|
control_channel.on_recv(on_control_reply)
|
|
|
|
|
loop = IOLoop.current()
|
|
|
|
|
|
|
|
|
|
# Nudge the kernel with kernel info requests until we get an IOPub message
|
|
|
|
|
@ -227,6 +238,12 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
|
|
|
|
|
# check for closed zmq socket
|
|
|
|
|
if shell_channel.closed():
|
|
|
|
|
self.log.debug("Nudge: cancelling on closed zmq socket: %s", self.kernel_id)
|
|
|
|
|
finish()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# check for closed zmq socket
|
|
|
|
|
if control_channel.closed():
|
|
|
|
|
self.log.debug(
|
|
|
|
|
"Nudge: cancelling on closed zmq socket: %s", self.kernel_id
|
|
|
|
|
)
|
|
|
|
|
@ -237,6 +254,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
log = self.log.warning if count % 10 == 0 else self.log.debug
|
|
|
|
|
log("Nudge: attempt %s on kernel %s" % (count, self.kernel_id))
|
|
|
|
|
self.session.send(shell_channel, "kernel_info_request")
|
|
|
|
|
self.session.send(control_channel, "kernel_info_request")
|
|
|
|
|
nonlocal nudge_handle
|
|
|
|
|
nudge_handle = loop.call_later(0.5, nudge, count)
|
|
|
|
|
|
|
|
|
|
|