@ -16,14 +16,13 @@ import sys
import uuid
import zmq
from zmq . eventloop . zmqstream import ZMQStream
from tornado import web
from . routers import IOPubStreamRouter , ShellStreamRouter
from IPython . config . configurable import LoggingConfigurable
from IPython . zmq . ipkernel import launch_kernel
from IPython . utils . traitlets import Instance , Dict , List , Unicode
from IPython . utils . traitlets import Instance , Dict , List , Unicode , Float , Int
#-----------------------------------------------------------------------------
# Classes
@ -110,6 +109,7 @@ class KernelManager(LoggingConfigurable):
else :
kernel_process . send_signal ( signal . SIGINT )
def signal_kernel ( self , kernel_id , signum ) :
""" Sends a signal to the kernel by its uuid.
@ -182,34 +182,50 @@ class KernelManager(LoggingConfigurable):
else :
raise KeyError ( " Kernel with id not found: %s " % kernel_id )
def create_session_manager ( self , kernel_id ) :
""" Create a new session manager for a kernel by its uuid. """
from sessionmanager import SessionManager
return SessionManager (
kernel_id = kernel_id , kernel_manager = self ,
config = self . config , context = self . context , log = self . log
)
def create_connected_stream ( self , ip , port , socket_type ) :
sock = self . context . socket ( socket_type )
addr = " tcp:// %s : %i " % ( ip , port )
self . log . info ( " Connecting to: %s " % addr )
sock . connect ( addr )
return ZMQStream ( sock )
def create_iopub_stream ( self , kernel_id ) :
ip = self . get_kernel_ip ( kernel_id )
ports = self . get_kernel_ports ( kernel_id )
iopub_stream = self . create_connected_stream ( ip , ports [ ' iopub_port ' ] , zmq . SUB )
iopub_stream . socket . setsockopt ( zmq . SUBSCRIBE , b ' ' )
return iopub_stream
def create_shell_stream ( self , kernel_id ) :
ip = self . get_kernel_ip ( kernel_id )
ports = self . get_kernel_ports ( kernel_id )
shell_stream = self . create_connected_stream ( ip , ports [ ' shell_port ' ] , zmq . XREQ )
return shell_stream
class RoutingKernelManager ( LoggingConfigurable ) :
""" A KernelManager that handles WebSocket routing and HTTP error handling """
def create_hb_stream ( self , kernel_id ) :
ip = self . get_kernel_ip ( kernel_id )
ports = self . get_kernel_ports ( kernel_id )
hb_stream = self . create_connected_stream ( ip , ports [ ' hb_port ' ] , zmq . REQ )
return hb_stream
class MappingKernelManager ( KernelManager ) :
""" A KernelManager that handles notebok mapping and HTTP error handling """
kernel_argv = List ( Unicode )
kernel_manager = Instance ( KernelManager )
time_to_dead = Float ( 3.0 , config = True , help = """ Kernel heartbeat interval in seconds. """ )
max_msg_size = Int ( 65536 , config = True , help = """
The max raw message size accepted from the browser
over a WebSocket connection .
""" )
_routers = Dict ( )
_session_dict = Dict ( )
_notebook_mapping = Dict ( )
#-------------------------------------------------------------------------
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
@property
def kernel_ids ( self ) :
""" List the kernel ids. """
return self . kernel_manager . kernel_ids
def kernel_for_notebook ( self , notebook_id ) :
""" Return the kernel_id for a notebook_id or None. """
return self . _notebook_mapping . get ( notebook_id )
@ -234,7 +250,7 @@ class RoutingKernelManager(LoggingConfigurable):
del self . _notebook_mapping [ notebook_id ]
def start_kernel ( self , notebook_id = None ) :
""" Start a kernel an return its kernel_id.
""" Start a kernel for a notebok an return its kernel_id.
Parameters
- - - - - - - - - -
@ -243,108 +259,48 @@ class RoutingKernelManager(LoggingConfigurable):
is not None , this kernel will be persistent whenever the notebook
requests a kernel .
"""
self . log . info
kernel_id = self . kernel_for_notebook ( notebook_id )
if kernel_id is None :
kwargs = dict ( )
kwargs [ ' extra_arguments ' ] = self . kernel_argv
kernel_id = self . kernel_manager . start_kernel ( * * kwargs )
kernel_id = super ( MappingKernelManager , self ) . start_kernel ( * * kwargs )
self . set_kernel_for_notebook ( notebook_id , kernel_id )
self . log . info ( " Kernel started: %s " % kernel_id )
self . log . debug ( " Kernel args: %r " % kwargs )
self . start_session_manager ( kernel_id )
else :
self . log . info ( " Using existing kernel: %s " % kernel_id )
return kernel_id
def start_session_manager ( self , kernel_id ) :
""" Start the ZMQ sockets (a " session " ) to connect to a kernel. """
sm = self . kernel_manager . create_session_manager ( kernel_id )
self . _session_dict [ kernel_id ] = sm
iopub_stream = sm . get_iopub_stream ( )
shell_stream = sm . get_shell_stream ( )
iopub_router = IOPubStreamRouter (
zmq_stream = iopub_stream , session = sm . session , config = self . config
)
shell_router = ShellStreamRouter (
zmq_stream = shell_stream , session = sm . session , config = self . config
)
self . set_router ( kernel_id , ' iopub ' , iopub_router )
self . set_router ( kernel_id , ' shell ' , shell_router )
def kill_kernel ( self , kernel_id ) :
""" Kill a kernel and remove its notebook association. """
if kernel_id not in self . kernel_manager :
if kernel_id not in self :
raise web . HTTPError ( 404 )
try :
sm = self . _session_dict . pop ( kernel_id )
except KeyError :
raise web . HTTPError ( 404 )
sm . stop ( )
self . kernel_manager . kill_kernel ( kernel_id )
super ( MappingKernelManager , self ) . kill_kernel ( kernel_id )
self . delete_mapping_for_kernel ( kernel_id )
self . log . info ( " Kernel killed: %s " % kernel_id )
def interrupt_kernel ( self , kernel_id ) :
""" Interrupt a kernel. """
if kernel_id not in self . kernel_manager :
if kernel_id not in self :
raise web . HTTPError ( 404 )
self . kernel_manager . interrupt_kernel ( kernel_id )
self . log . debug ( " Kernel interrupted: %s " % kernel_id )
super ( MappingKernelManager , self ) . interrupt_kernel ( kernel_id )
self . log . info ( " Kernel interrupted: %s " % kernel_id )
def restart_kernel ( self , kernel_id ) :
""" Restart a kernel while keeping clients connected. """
if kernel_id not in self . kernel_manager :
if kernel_id not in self :
raise web . HTTPError ( 404 )
# Get the notebook_id to preserve the kernel/notebook association
# Get the notebook_id to preserve the kernel/notebook association .
notebook_id = self . notebook_for_kernel ( kernel_id )
# Create the new kernel first so we can move the clients over.
new_kernel_id = self . start_kernel ( )
# Copy the clients over to the new routers.
old_iopub_router = self . get_router ( kernel_id , ' iopub ' )
old_shell_router = self . get_router ( kernel_id , ' shell ' )
new_iopub_router = self . get_router ( new_kernel_id , ' iopub ' )
new_shell_router = self . get_router ( new_kernel_id , ' shell ' )
new_iopub_router . copy_clients ( old_iopub_router )
new_shell_router . copy_clients ( old_shell_router )
# Shut down the old routers
old_shell_router . close ( )
old_iopub_router . close ( )
self . delete_router ( kernel_id , ' shell ' )
self . delete_router ( kernel_id , ' iopub ' )
del old_shell_router
del old_iopub_router
# Now shutdown the old session and the kernel.
# TODO: This causes a hard crash in ZMQStream.close, which sets
# self.socket to None to hastily. We will need to fix this in PyZMQ
# itself. For now, we just leave the old kernel running :(
# Maybe this is fixed now, but nothing was changed really.
# Now kill the old kernel.
self . kill_kernel ( kernel_id )
# Now save the new kernel/notebook association. We have to save it
# after the old kernel is killed as that will delete the mapping.
self . set_kernel_for_notebook ( notebook_id , new_kernel_id )
self . log . debug ( " Kernel restarted: %s " % new_kernel_id )
return new_kernel_id
def get_router ( self , kernel_id , stream_name ) :
""" Return the router for a given kernel_id and stream name. """
router = self . _routers [ ( kernel_id , stream_name ) ]
return router
def set_router ( self , kernel_id , stream_name , router ) :
""" Set the router for a given kernel_id and stream_name. """
self . _routers [ ( kernel_id , stream_name ) ] = router
def delete_router ( self , kernel_id , stream_name ) :
""" Delete a router for a kernel_id and stream_name. """
try :
del self . _routers [ ( kernel_id , stream_name ) ]
except KeyError :
pass