@ -123,60 +123,76 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
def create_stream ( self ) :
km = self . kernel_manager
identity = self . session . bsession
for channel in ( ' shell ' , ' control ' , ' iopub ' , ' stdin ' ) :
meth = getattr ( km , ' connect_ ' + channel )
for channel in ( " iopub " , " shell " , " control " , " stdin " ) :
meth = getattr ( km , " connect_ " + channel )
self . channels [ channel ] = stream = meth ( self . kernel_id , identity = identity )
stream . channel = channel
def nudge ( self ) :
def nudge ( self ) :
""" Nudge the zmq connections with kernel_info_requests
Returns a Future that will resolve when we have received
a shell reply and at least one iopub message ,
ensuring that zmq subscriptions are established ,
sockets are fully connected , and kernel is responsive .
Keeps retrying kernel_info_request until these are both received .
"""
kernel = self . kernel_manager . get_kernel ( self . kernel_id )
# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
# nudging in this case would cause a potentially very long wait
# before connections are opened,
# plus it is *very* unlikely that a busy kernel will not finish
# establishing its zmq subscriptions before processing the next request.
if getattr ( kernel , " execution_state " ) == " busy " :
self . log . debug ( " Nudge: not nudging busy kernel %s " , self . kernel_id )
f = Future ( )
f . set_result ( None )
return f
# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
kernel = self . kernel_manager . get_kernel ( self . kernel_id )
shell_channel = kernel . connect_shell ( )
# The IOPub used by the client, whose subscriptions we are verifying.
iopub_channel = self . channels [ " iopub " ]
# The IOPub used by the client.
iopub_channel = self . channels [ ' iopub ' ]
future = Future ( )
info_future = Future ( )
iopub_future = Future ( )
both_done = gen . multi ( [ info_future , iopub_future ] )
def finish ( f = None ) :
""" Ensure all futures are resolved
def finish ( ) :
which in turn triggers cleanup
"""
for f in ( info_future , iopub_future ) :
if not f . done ( ) :
f . set_result ( None )
def cleanup ( f = None ) :
""" Common cleanup """
loop . remove_timeout ( timeout )
loop . remove_timeout ( nudge_handle )
iopub_channel . stop_on_recv ( )
if not shell_channel . closed ( ) :
shell_channel . close ( )
# trigger cleanup when both message futures are resolved
both_done . add_done_callback ( cleanup )
def on_shell_reply ( msg ) :
self . log . debug ( " Nudge: shell info reply received: %s " , self . kernel_id )
if not info_future . done ( ) :
self . log . debug ( " Nudge: shell info reply received: %s " , self . kernel_id )
if not shell_channel . closed ( ) :
shell_channel . close ( )
self . log . debug ( " Nudge: resolving shell future " )
self . log . debug ( " Nudge: resolving shell future: %s " , self . kernel_id )
info_future . set_result ( None )
if iopub_future . done ( ) :
finish ( )
self . log . debug ( " Nudge: resolving main future in shell handler " )
future . set_result ( None )
def on_iopub ( msg ) :
self . log . debug ( " Nudge: IOPub received: %s " , self . kernel_id )
if not iopub_future . done ( ) :
self . log . debug ( " Nudge: first IOPub received: %s " , self . kernel_id )
iopub_channel . stop_on_recv ( )
self . log . debug ( " Nudge: resolving iopub future " )
self . log . debug ( " Nudge: resolving iopub future : %s " , self . kernel_id )
iopub_future . set_result ( None )
if info_future . done ( ) :
finish ( )
self . log . debug ( " Nudge: resolving main future in iopub handler " )
future . set_result ( None )
def on_timeout ( ) :
self . log . warning ( " Nudge: Timeout waiting for kernel_info_reply: %s " , self . kernel_id )
finish ( )
if not future . done ( ) :
future . set_exception ( TimeoutError ( " Timeout waiting for nudge " ) )
iopub_channel . on_recv ( on_iopub )
shell_channel . on_recv ( on_shell_reply )
@ -184,19 +200,46 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
# Nudge the kernel with kernel info requests until we get an IOPub message
def nudge ( count ) :
# Do not nudge busy kernels as kernel info requests sent to shell are
# queued behind execution requests.
if kernel . execution_state == ' busy ' :
future . set_result ( None )
if not future . done ( ) :
count + = 1
# NOTE: this close check appears to never be True during on_open,
# even when the peer has closed the connection
if self . ws_connection is None or self . ws_connection . is_closing ( ) :
self . log . debug (
" Nudge: cancelling on closed websocket: %s " , self . kernel_id
)
finish ( )
return
# check for stopped kernel
if self . kernel_id not in self . kernel_manager :
self . log . debug (
" Nudge: cancelling on stopped kernel: %s " , self . kernel_id
)
finish ( )
return
# check for closed zmq socket
if shell_channel . closed ( ) :
self . log . debug (
" Nudge: cancelling on closed zmq socket: %s " , self . kernel_id
)
finish ( )
return
if not both_done . done ( ) :
log = self . log . warning if count % 10 == 0 else self . log . debug
log ( " Nudging attempt %s on kernel %s " % ( count , self . kernel_id ) )
log ( " Nudg e: attempt %s on kernel %s " % ( count , self . kernel_id ) )
self . session . send ( shell_channel , " kernel_info_request " )
nonlocal nudge_handle
nudge_handle = loop . call_later ( 0.5 , nudge , count )
nudge_handle = loop . call_later ( 0 , nudge , count = 0 )
timeout = loop . add_timeout ( loop . time ( ) + self . kernel_info_timeout , on_timeout )
# resolve with a timeout if we get no response
future = gen . with_timeout ( loop . time ( ) + self . kernel_info_timeout , both_done )
# ensure we have no dangling resources or unresolved Futures in case of timeout
future . add_done_callback ( finish )
return future
def request_kernel_info ( self ) :
@ -221,7 +264,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . log . debug ( " Waiting for pending kernel_info request " )
future . add_done_callback ( lambda f : self . _finish_kernel_info ( f . result ( ) ) )
return self . _kernel_info_future
def _handle_kernel_info_reply ( self , msg ) :
""" process the kernel_info_reply