@ -1,5 +1,4 @@
""" A MultiKernelManager for use in the notebook webserver
- raises HTTPErrors
- creates REST API models
"""
@ -12,15 +11,15 @@ from datetime import datetime, timedelta
from functools import partial
import os
from tornado import gen, web
from tornado import web
from tornado . concurrent import Future
from tornado . ioloop import IOLoop , PeriodicCallback
from jupyter_client . session import Session
from jupyter_client . multikernelmanager import MultiKernelManager
from traitlets import ( Any , Bool , Dict , List , Unicode , TraitError , Integer ,
Float , Instance , default , validate )
from traitlets . config . configurable import LoggingConfigurable
Float , Instance , default , validate
)
from notebook . utils import maybe_future , to_os_path , exists
from notebook . _tz import utcnow , isoformat
@ -28,9 +27,9 @@ from ipython_genutils.py3compat import getcwd
from notebook . prometheus . metrics import KERNEL_CURRENTLY_RUNNING_TOTAL
# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jup t yer_client.
# This will be confirmed at runtime in notebookapp.
# TODO: remove once dependencies are updated.
# Since use of AsyncMultiKernelManager is optional at the moment, don't require appropriate jup yt er_client.
# This will be confirmed at runtime in notebookapp. The following block can be removed once the jupyter_client's
# floor has been updated.
try :
from jupyter_client . multikernelmanager import AsyncMultiKernelManager
except ImportError :
@ -39,15 +38,12 @@ except ImportError:
pass
class MappingKernelManagerBase ( LoggingConfigurable ) :
"""
This class exists so that class - based traits relative to MappingKernelManager and AsyncMappingKernelManager
can be satisfied since AsyncMappingKernelManager doesn ' t derive from the former. It is only necessary until
we converge to using only async , but that requires subclasses to be updated .
class MappingKernelManager ( MultiKernelManager ) :
""" A KernelManager that handles notebook mapping and HTTP error handling """
Since we have this class , we ' ll reduce duplication of code between the disjoint classes by using this
common superclass for configuration properties and static methods and local member variables .
"" "
@default ( ' kernel_manager_class ' )
def _default_kernel_manager_class ( self ) :
return " jupyter_client.ioloop.IOLoopKernelManager "
kernel_argv = List ( Unicode ( ) )
@ -55,6 +51,10 @@ class MappingKernelManagerBase(LoggingConfigurable):
_kernel_connections = Dict ( )
_culler_callback = None
_initialized_culler = False
@default ( ' root_dir ' )
def _default_root_dir ( self ) :
try :
@ -96,10 +96,8 @@ class MappingKernelManagerBase(LoggingConfigurable):
buffer_offline_messages = Bool ( True , config = True ,
help = """ Whether messages from kernels whose frontends have disconnected should be buffered in-memory.
When True ( default ) , messages are buffered and replayed on reconnect ,
avoiding lost messages due to interrupted connectivity .
Disable if long - running kernels will produce too much output while
no frontends are connected .
"""
@ -107,7 +105,6 @@ class MappingKernelManagerBase(LoggingConfigurable):
kernel_info_timeout = Float ( 60 , config = True ,
help = """ Timeout for giving up on a kernel (in seconds).
On starting and restarting kernels , we check whether the
kernel is running and responsive by sending kernel_info_requests .
This sets the timeout in seconds for how long the kernel can take
@ -117,65 +114,38 @@ class MappingKernelManagerBase(LoggingConfigurable):
"""
)
_kernel_buffers = Any ( )
@default ( ' _kernel_buffers ' )
def _default_kernel_buffers ( self ) :
return defaultdict ( lambda : { ' buffer ' : [ ] , ' session_key ' : ' ' , ' channels ' : { } } )
last_kernel_activity = Instance ( datetime ,
help = " The last activity on any kernel, including shutting down a kernel " )
def __init__ ( self , * * kwargs ) :
super ( ) . __init__ ( * * kwargs )
self . last_kernel_activity = utcnow ( )
allowed_message_types = List ( trait = Unicode ( ) , config = True ,
help = """ White list of allowed kernel message types.
When the list is empty , all message types are allowed .
"""
)
# members used to hold composed instances
buffering_manager = None
kernel_culler = None
activity_monitor = None
#-------------------------------------------------------------------------
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
def __init__ ( self , * * kwargs ) :
super ( MappingKernelManagerBase , self ) . __init__ ( * * kwargs )
self . super = MultiKernelManager
self . super . __init__ ( self , * * kwargs )
self . last_kernel_activity = utcnow ( )
self . buffering_manager = BufferingManager ( parent = self )
self . kernel_culler = KernelCuller ( parent = self )
self . activity_monitor = ActivityMonitor ( parent = self )
def _handle_kernel_died ( self , kernel_id ) :
""" notice that a kernel died """
self . log . warning ( " Kernel %s died, removing from map. " , kernel_id )
self . remove_kernel ( kernel_id )
def kernel_model ( self , kernel_id ) :
""" Return a JSON-safe dict representing a kernel
For use in representing kernels in the JSON APIs .
"""
self . _check_kernel_id ( kernel_id )
kernel = self . _kernels [ kernel_id ]
model = {
" id " : kernel_id ,
" name " : kernel . kernel_name ,
" last_activity " : isoformat ( kernel . last_activity ) ,
" execution_state " : kernel . execution_state ,
" connections " : self . _kernel_connections [ kernel_id ] ,
}
return model
def list_kernels ( self ) :
""" Returns a list of kernel models relative to the running kernels. """
kernels = [ ]
kernel_ids = self . list_kernel_ids ( )
for kernel_id in kernel_ids :
model = self . kernel_model ( kernel_id )
kernels . append ( model )
return kernels
# override _check_kernel_id to raise 404 instead of KeyError
def _check_kernel_id ( self , kernel_id ) :
""" Check a that a kernel_id exists and raise 404 if not. """
if kernel_id not in self :
raise web . HTTPError ( 404 , u ' Kernel does not exist: %s ' % kernel_id )
def cwd_for_path ( self , path ) :
""" Turn API path into absolute OS path. """
os_path = to_os_path ( path , self . root_dir )
@ -185,98 +155,8 @@ class MappingKernelManagerBase(LoggingConfigurable):
os_path = os . path . dirname ( os_path )
return os_path
def start_buffering ( self , kernel_id , session_key , channels ) :
""" Start buffering messages for a kernel
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
session_key : str
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
channels : dict ( { ' channel ' : ZMQStream } )
The zmq channels whose messages should be buffered .
"""
self . buffering_manager . start_buffering ( kernel_id , session_key , channels )
def get_buffer ( self , kernel_id , session_key ) :
""" Get the buffer for a given kernel
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
session_key : str , optional
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
"""
return self . buffering_manager . get_buffer ( kernel_id , session_key )
def stop_buffering ( self , kernel_id ) :
""" Stop buffering kernel messages
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
"""
self . buffering_manager . stop_buffering ( kernel_id )
def notify_connect ( self , kernel_id ) :
""" Notice a new connection to a kernel """
if kernel_id in self . _kernel_connections :
self . _kernel_connections [ kernel_id ] + = 1
def notify_disconnect ( self , kernel_id ) :
""" Notice a disconnection from a kernel """
if kernel_id in self . _kernel_connections :
self . _kernel_connections [ kernel_id ] - = 1
# monitoring activity:
def start_watching_activity ( self , kernel_id ) :
""" Start watching IOPub messages on a kernel for activity. Remove if no overrides
- update last_activity on every message
- record execution_state from status messages
"""
self . activity_monitor . start_watching_activity ( kernel_id , self . _kernels [ kernel_id ] )
def initialize_culler ( self ) :
""" Initial culler if not already. Remove if no overrides
"""
if self . kernel_culler is None :
self . kernel_culler = KernelCuller ( parent = self )
def cull_kernels ( self ) :
# Defer to KernelCuller. Remove if no overrides
self . kernel_culler . cull_kernels ( )
def cull_kernel_if_idle ( self , kernel_id ) :
# Defer to KernelCuller. Remove if no overrides
self . kernel_culler . cull_kernel_if_idle ( kernel_id )
class MappingKernelManager ( MappingKernelManagerBase , MultiKernelManager ) :
""" A KernelManager that handles notebook mapping and HTTP error handling """
@default ( ' kernel_manager_class ' )
def _default_kernel_manager_class ( self ) :
return " jupyter_client.ioloop.IOLoopKernelManager "
def __init__ ( self , * * kwargs ) :
super ( MappingKernelManager , self ) . __init__ ( * * kwargs )
# -------------------------------------------------------------------------
# Methods for managing kernels and sessions
# -------------------------------------------------------------------------
async def start_kernel ( self , kernel_id = None , path = None , * * kwargs ) :
""" Start a kernel for a session and return its kernel_id.
Parameters
- - - - - - - - - -
kernel_id : uuid
@ -293,9 +173,7 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager):
if kernel_id is None :
if path is not None :
kwargs [ ' cwd ' ] = self . cwd_for_path ( path )
kernel_id = await maybe_future (
super ( MappingKernelManager , self ) . start_kernel ( * * kwargs )
)
kernel_id = await maybe_future ( self . super . start_kernel ( self , * * kwargs ) )
self . _kernel_connections [ kernel_id ] = 0
self . start_watching_activity ( kernel_id )
self . log . info ( " Kernel started: %s " % kernel_id )
@ -316,127 +194,99 @@ class MappingKernelManager(MappingKernelManagerBase, MultiKernelManager):
self . _check_kernel_id ( kernel_id )
self . log . info ( " Using existing kernel: %s " % kernel_id )
return kernel_id
# Initialize culling if not already
if not self . _initialized_culler :
self . initialize_culler ( )
def shutdown_kernel ( self , kernel_id , now = False ) :
""" Shutdown a kernel by kernel_id """
self . _check_kernel_id ( kernel_id )
kernel = self . _kernels [ kernel_id ]
if kernel . _activity_stream :
kernel . _activity_stream . close ( )
kernel . _activity_stream = None
self . stop_buffering ( kernel_id )
self . _kernel_connections . pop ( kernel_id , None )
return kernel_id
# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL . labels (
type = self . _kernels [ kernel_id ] . kernel_name
) . dec ( )
def start_buffering ( self , kernel_id , session_key , channels ) :
""" Start buffering messages for a kernel
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to start buffering .
session_key : str
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
channels : dict ( { ' channel ' : ZMQStream } )
The zmq channels whose messages should be buffered .
"""
return super ( MappingKernelManager , self ) . shutdown_kernel ( kernel_id , now = now )
if not self . buffer_offline_messages :
for channel , stream in channels . items ( ) :
stream . close ( )
return
async def restart_kernel ( self , kernel_id ) :
""" Restart a kernel by kernel_id """
self . log . info ( " Starting buffering for %s " , session_key )
self . _check_kernel_id ( kernel_id )
await maybe_future ( super ( MappingKernelManager , self ) . restart_kernel ( kernel_id ) )
kernel = self . get_kernel ( kernel_id )
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel . connect_shell ( )
future = Future ( )
def finish ( ) :
""" Common cleanup when restart finishes/fails for any reason. """
if not channel . closed ( ) :
channel . close ( )
loop . remove_timeout ( timeout )
kernel . remove_restart_callback ( on_restart_failed , ' dead ' )
def on_reply ( msg ) :
self . log . debug ( " Kernel info reply received: %s " , kernel_id )
finish ( )
if not future . done ( ) :
future . set_result ( msg )
def on_timeout ( ) :
self . log . warning ( " Timeout waiting for kernel_info_reply: %s " , kernel_id )
finish ( )
if not future . done ( ) :
future . set_exception ( gen . TimeoutError ( " Timeout waiting for restart " ) )
def on_restart_failed ( ) :
self . log . warning ( " Restarting kernel failed: %s " , kernel_id )
finish ( )
if not future . done ( ) :
future . set_exception ( RuntimeError ( " Restart failed " ) )
kernel . add_restart_callback ( on_restart_failed , ' dead ' )
kernel . session . send ( channel , " kernel_info_request " )
channel . on_recv ( on_reply )
loop = IOLoop . current ( )
timeout = loop . add_timeout ( loop . time ( ) + self . kernel_info_timeout , on_timeout )
# wait for restart to complete
await future
class AsyncMappingKernelManager ( MappingKernelManagerBase , AsyncMultiKernelManager ) :
""" A KernelManager that handles notebook mapping and HTTP error handling using coroutines throughout """
@default ( ' kernel_manager_class ' )
def _default_kernel_manager_class ( self ) :
return " jupyter_client.ioloop.AsyncIOLoopKernelManager "
def __init__ ( self , * * kwargs ) :
super ( AsyncMappingKernelManager , self ) . __init__ ( * * kwargs )
# clear previous buffering state
self . stop_buffering ( kernel_id )
buffer_info = self . _kernel_buffers [ kernel_id ]
# record the session key because only one session can buffer
buffer_info [ ' session_key ' ] = session_key
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
buffer_info [ ' buffer ' ] = [ ]
buffer_info [ ' channels ' ] = channels
# -------------------------------------------------------------------------
# Methods for managing kernels and sessions
# -------------------------------------------------------------------------
# forward any future messages to the internal buffer
def buffer_msg ( channel , msg_parts ) :
self . log . debug ( " Buffering msg on %s : %s " , kernel_id , channel )
buffer_info [ ' buffer ' ] . append ( ( channel , msg_parts ) )
async def start_kernel ( self , kernel_id = None , path = None , * * kwargs ) :
""" Start a kernel for a session and return its kernel_id.
for channel , stream in channels . items ( ) :
stream . on_recv ( partial ( buffer_msg , channel ) )
def get_buffer ( self , kernel_id , session_key ) :
""" Get the buffer for a given kernel
Parameters
- - - - - - - - - -
kernel_id : uuid
The uuid to associate the new kernel with . If this
is not None , this kernel will be persistent whenever it is
requested .
path : API path
The API path ( unicode , ' / ' delimited ) for the cwd .
Will be transformed to an OS path relative to root_dir .
kernel_name : str
The name identifying which kernel spec to launch . This is ignored if
an existing kernel is returned , but it may be checked in the future .
kernel_id : str
The id of the kernel to stop buffering .
session_key : str , optional
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
"""
if kernel_id is None :
if path is not None :
kwargs [ ' cwd ' ] = self . cwd_for_path ( path )
kernel_id = await super ( AsyncMappingKernelManager , self ) . start_kernel ( * * kwargs )
self . log . debug ( " Getting buffer for %s " , kernel_id )
if kernel_id not in self . _kernel_buffers :
return
self . _kernel_connections [ kernel_id ] = 0
self . start_watching_activity ( kernel_id )
self . log . info ( " Kernel started (async): %s " % kernel_id )
self . log . debug ( " Kernel args: %r " % kwargs )
# register callback for failed auto-restart
self . add_restart_callback ( kernel_id ,
lambda : self . _handle_kernel_died ( kernel_id ) ,
' dead ' ,
)
buffer_info = self . _kernel_buffers [ kernel_id ]
if buffer_info [ ' session_key ' ] == session_key :
# remove buffer
self . _kernel_buffers . pop ( kernel_id )
# only return buffer_info if it's a match
return buffer_info
else :
self . stop_buffering ( kernel_id )
# Increase the metric of number of kernels running
# for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL . labels (
type = self . _kernels [ kernel_id ] . kernel_name
) . inc ( )
def stop_buffering ( self , kernel_id ) :
""" Stop buffering kernel messages
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
"""
self . log . debug ( " Clearing buffer for %s " , kernel_id )
self . _check_kernel_id ( kernel_id )
else :
self . _check_kernel_id ( kernel_id )
self . log . info ( " Using existing kernel: %s " % kernel_id )
if kernel_id not in self . _kernel_buffers :
return
buffer_info = self . _kernel_buffers . pop ( kernel_id )
# close buffering streams
for stream in buffer_info [ ' channels ' ] . values ( ) :
if not stream . closed ( ) :
stream . on_recv ( None )
stream . close ( )
return kernel_id
msg_buffer = buffer_info [ ' buffer ' ]
if msg_buffer :
self . log . info ( " Discarding %s buffered messages for %s " ,
len ( msg_buffer ) , buffer_info [ ' session_key ' ] )
async def shutdown_kernel ( self , kernel_id , now = False , restart = False ) :
def shutdown_kernel ( self , kernel_id , now = False , restart = False ) :
""" Shutdown a kernel by kernel_id """
self . _check_kernel_id ( kernel_id )
kernel = self . _kernels [ kernel_id ]
@ -445,7 +295,6 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage
kernel . _activity_stream = None
self . stop_buffering ( kernel_id )
self . _kernel_connections . pop ( kernel_id , None )
self . last_kernel_activity = utcnow ( )
# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
@ -453,12 +302,12 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage
type = self . _kernels [ kernel_id ] . kernel_name
) . dec ( )
await super ( AsyncMappingKernelManager , self ) . shutdown_kernel ( kernel_id , now = now , restart = restart )
return self . super . shutdown_kernel ( self , kernel_id , now = now , restart = restart )
async def restart_kernel ( self , kernel_id , now = False ) :
""" Restart a kernel by kernel_id """
self . _check_kernel_id ( kernel_id )
await super ( AsyncMappingKernelManager , self ) . restart_kernel ( kernel_id , now = now )
await maybe_future ( self . super . restart_kernel ( self , kernel_id , now = now ) )
kernel = self . get_kernel ( kernel_id )
# return a Future that will resolve when the kernel has successfully restarted
channel = kernel . connect_shell ( )
@ -481,7 +330,7 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage
self . log . warning ( " Timeout waiting for kernel_info_reply: %s " , kernel_id )
finish ( )
if not future . done ( ) :
future . set_exception ( gen . TimeoutError( " Timeout waiting for restart " ) )
future . set_exception ( TimeoutError( " Timeout waiting for restart " ) )
def on_restart_failed ( ) :
self . log . warning ( " Restarting kernel failed: %s " , kernel_id )
@ -496,22 +345,55 @@ class AsyncMappingKernelManager(MappingKernelManagerBase, AsyncMultiKernelManage
timeout = loop . add_timeout ( loop . time ( ) + self . kernel_info_timeout , on_timeout )
return future
def notify_connect ( self , kernel_id ) :
""" Notice a new connection to a kernel """
if kernel_id in self . _kernel_connections :
self . _kernel_connections [ kernel_id ] + = 1
class ActivityMonitor ( LoggingConfigurable ) :
""" Establishes activity recorder for each active kernel """
def notify_disconnect ( self , kernel_id ) :
""" Notice a disconnection from a kernel """
if kernel_id in self . _kernel_connections :
self . _kernel_connections [ kernel_id ] - = 1
def __init__ ( self , * * kwargs ) :
super ( ActivityMonitor , self ) . __init__ ( * * kwargs )
if not isinstance ( self . parent , MappingKernelManagerBase ) :
raise RuntimeError (
" ActivityMonitor requires an instance of MappingKernelManagerBase! " )
def kernel_model ( self , kernel_id ) :
""" Return a JSON-safe dict representing a kernel
For use in representing kernels in the JSON APIs .
"""
self . _check_kernel_id ( kernel_id )
kernel = self . _kernels [ kernel_id ]
def start_watching_activity ( self , kernel_id , kernel ) :
""" Start watching IOPub messages on a kernel for activity.
model = {
" id " : kernel_id ,
" name " : kernel . kernel_name ,
" last_activity " : isoformat ( kernel . last_activity ) ,
" execution_state " : kernel . execution_state ,
" connections " : self . _kernel_connections [ kernel_id ] ,
}
return model
def list_kernels ( self ) :
""" Returns a list of kernel_id ' s of kernels running. """
kernels = [ ]
kernel_ids = self . super . list_kernel_ids ( self )
for kernel_id in kernel_ids :
model = self . kernel_model ( kernel_id )
kernels . append ( model )
return kernels
# override _check_kernel_id to raise 404 instead of KeyError
def _check_kernel_id ( self , kernel_id ) :
""" Check a that a kernel_id exists and raise 404 if not. """
if kernel_id not in self :
raise web . HTTPError ( 404 , u ' Kernel does not exist: %s ' % kernel_id )
# monitoring activity:
def start_watching_activity ( self , kernel_id ) :
""" Start watching IOPub messages on a kernel for activity.
- update last_activity on every message
- record execution_state from status messages
"""
kernel = self . _kernels [ kernel_id ]
# add busy/activity markers:
kernel . execution_state = ' starting '
kernel . last_activity = utcnow ( )
@ -523,7 +405,7 @@ class ActivityMonitor(LoggingConfigurable):
def record_activity ( msg_list ) :
""" Record an IOPub message arriving from a kernel """
self . parent. last_kernel_activity = kernel . last_activity = utcnow ( )
self . last_kernel_activity = kernel . last_activity = utcnow ( )
idents , fed_msg_list = session . feed_identities ( msg_list )
msg = session . deserialize ( fed_msg_list )
@ -537,191 +419,90 @@ class ActivityMonitor(LoggingConfigurable):
kernel . _activity_stream . on_recv ( record_activity )
class BufferingManager ( LoggingConfigurable ) :
""" Manages buffering across the active kernels """
_kernel_buffers = Any ( )
@default ( ' _kernel_buffers ' )
def _default_kernel_buffers ( self ) :
return defaultdict ( lambda : { ' buffer ' : [ ] , ' session_key ' : ' ' , ' channels ' : { } } )
def __init__ ( self , * * kwargs ) :
super ( BufferingManager , self ) . __init__ ( * * kwargs )
if not isinstance ( self . parent , MappingKernelManagerBase ) :
raise RuntimeError (
" BufferingManager requires an instance of MappingKernelManagerBase! " )
def _check_kernel_id ( self , kernel_id ) :
""" Check a that a kernel_id exists and raise 404 if not. """
if kernel_id not in self . parent :
raise web . HTTPError ( 404 , u ' Kernel does not exist: %s ' % kernel_id )
def start_buffering ( self , kernel_id , session_key , channels ) :
""" Start buffering messages for a kernel
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
session_key : str
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
channels : dict ( { ' channel ' : ZMQStream } )
The zmq channels whose messages should be buffered .
"""
if not self . parent . buffer_offline_messages :
for channel , stream in channels . items ( ) :
stream . close ( )
return
self . _check_kernel_id ( kernel_id )
self . log . info ( " Starting buffering for %s " , session_key )
# clear previous buffering state
self . parent . stop_buffering ( kernel_id )
buffer_info = self . _kernel_buffers [ kernel_id ]
# record the session key because only one session can buffer
buffer_info [ ' session_key ' ] = session_key
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
buffer_info [ ' buffer ' ] = [ ]
buffer_info [ ' channels ' ] = channels
# forward any future messages to the internal buffer
def buffer_msg ( channel , msg_parts ) :
self . log . debug ( " Buffering msg on %s : %s " , kernel_id , channel )
buffer_info [ ' buffer ' ] . append ( ( channel , msg_parts ) )
for channel , stream in channels . items ( ) :
stream . on_recv ( partial ( buffer_msg , channel ) )
def get_buffer ( self , kernel_id , session_key ) :
""" Get the buffer for a given kernel
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
session_key : str , optional
The session_key , if any , that should get the buffer .
If the session_key matches the current buffered session_key ,
the buffer will be returned .
"""
if kernel_id not in self . _kernel_buffers :
return
self . log . debug ( " Getting buffer for %s " , kernel_id )
buffer_info = self . _kernel_buffers [ kernel_id ]
if buffer_info [ ' session_key ' ] == session_key :
# remove buffer
self . _kernel_buffers . pop ( kernel_id )
# only return buffer_info if it's a match
return buffer_info
else :
self . parent . stop_buffering ( kernel_id )
def stop_buffering ( self , kernel_id ) :
""" Stop buffering kernel messages
Parameters
- - - - - - - - - -
kernel_id : str
The id of the kernel to stop buffering .
def initialize_culler ( self ) :
""" Start idle culler if ' cull_idle_timeout ' is greater than zero.
Regardless of that value , set flag that we ' ve been here.
"""
self . _check_kernel_id ( kernel_id )
if kernel_id not in self . _kernel_buffers :
return
self . log . debug ( " Clearing buffer for %s " , kernel_id )
buffer_info = self . _kernel_buffers . pop ( kernel_id )
# close buffering streams
for stream in buffer_info [ ' channels ' ] . values ( ) :
if not stream . closed ( ) :
stream . on_recv ( None )
stream . close ( )
msg_buffer = buffer_info [ ' buffer ' ]
if msg_buffer :
self . log . info ( " Discarding %s buffered messages for %s " ,
len ( msg_buffer ) , buffer_info [ ' session_key ' ] )
if not self . _initialized_culler and self . cull_idle_timeout > 0 :
if self . _culler_callback is None :
loop = IOLoop . current ( )
if self . cull_interval < = 0 : # handle case where user set invalid value
self . log . warning ( " Invalid value for ' cull_interval ' detected ( %s ) - using default value ( %s ). " ,
self . cull_interval , self . cull_interval_default )
self . cull_interval = self . cull_interval_default
self . _culler_callback = PeriodicCallback (
self . cull_kernels , 1000 * self . cull_interval )
self . log . info ( " Culling kernels with idle durations > %s seconds at %s second intervals ... " ,
self . cull_idle_timeout , self . cull_interval )
if self . cull_busy :
self . log . info ( " Culling kernels even if busy " )
if self . cull_connected :
self . log . info ( " Culling kernels even with connected clients " )
self . _culler_callback . start ( )
self . _initialized_culler = True
async def cull_kernels ( self ) :
self . log . debug ( " Polling every %s seconds for kernels idle > %s seconds... " ,
self . cull_interval , self . cull_idle_timeout )
""" Create a separate list of kernels to avoid conflicting updates while iterating """
for kernel_id in list ( self . _kernels ) :
try :
await self . cull_kernel_if_idle ( kernel_id )
except Exception as e :
self . log . exception ( " The following exception was encountered while checking the "
" idle duration of kernel {} : {} " . format ( kernel_id , e ) )
class KernelCuller ( LoggingConfigurable ) :
""" Handles culling responsibilities for active kernels """
async def cull_kernel_if_idle ( self , kernel_id ) :
try :
kernel = self . _kernels [ kernel_id ]
except KeyError :
return # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made.
self . log . debug ( " kernel_id= %s , kernel_name= %s , last_activity= %s " ,
kernel_id , kernel . kernel_name , kernel . last_activity )
if kernel . last_activity is not None :
dt_now = utcnow ( )
dt_idle = dt_now - kernel . last_activity
# Compute idle properties
is_idle_time = dt_idle > timedelta ( seconds = self . cull_idle_timeout )
is_idle_execute = self . cull_busy or ( kernel . execution_state != ' busy ' )
connections = self . _kernel_connections . get ( kernel_id , 0 )
is_idle_connected = self . cull_connected or not connections
# Cull the kernel if all three criteria are met
if ( is_idle_time and is_idle_execute and is_idle_connected ) :
idle_duration = int ( dt_idle . total_seconds ( ) )
self . log . warning ( " Culling ' %s ' kernel ' %s ' ( %s ) with %d connections due to %s seconds of inactivity. " ,
kernel . execution_state , kernel . kernel_name , kernel_id , connections , idle_duration )
await maybe_future ( self . shutdown_kernel ( kernel_id ) )
# AsyncMappingKernelManager inherits as much as possible from MappingKernelManager, overriding
# only what is different.
class AsyncMappingKernelManager ( MappingKernelManager , AsyncMultiKernelManager ) :
@default ( ' kernel_manager_class ' )
def _default_kernel_manager_class ( self ) :
return " jupyter_client.ioloop.AsyncIOLoopKernelManager "
def __init__ ( self , * * kwargs ) :
super ( KernelCuller , self ) . __init__ ( * * kwargs )
if not isinstance ( self . parent , MappingKernelManagerBase ) :
raise RuntimeError (
" KernelCuller requires an instance of MappingKernelManagerBase! " )
self . cull_state = " idle " if not self . parent . cull_busy else " inactive " # used during logging
# Start idle culler if 'cull_idle_timeout' is greater than zero.
# Regardless of that value, set flag that we've been here.
if self . parent . cull_idle_timeout > 0 :
if self . parent . cull_interval < = 0 : # handle case where user set invalid value
self . log . warning ( " Invalid value for ' cull_interval ' detected ( %s ) - using default value ( %s ). " ,
self . parent . cull_interval , self . parent . cull_interval_default )
self . parent . cull_interval = self . parent . cull_interval_default
self . _culler_callback = PeriodicCallback (
self . parent . cull_kernels , 1000 * self . parent . cull_interval )
self . _culler_callback . start ( )
self . _log_info ( )
def _log_info ( self ) :
""" Builds a single informational message relative to the culling configuration (logged at startup). """
log_msg = list ( )
log_msg . append ( " Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals " .
format ( cull_state = self . cull_state , timeout = self . parent . cull_idle_timeout ,
interval = self . parent . cull_interval ) )
if self . parent . cull_busy or self . parent . cull_connected :
log_msg . append ( " - including " )
if self . parent . cull_busy :
log_msg . append ( " busy " )
if self . parent . cull_connected :
log_msg . append ( " and " )
if self . parent . cull_connected :
log_msg . append ( " connected " )
log_msg . append ( " kernels " )
log_msg . append ( " . " )
self . log . info ( ' ' . join ( log_msg ) )
self . super = AsyncMultiKernelManager
self . super . __init__ ( self , * * kwargs )
self . last_kernel_activity = utcnow ( )
async def cull_kernels ( self ) :
self . log . debug ( " Polling every %s seconds for kernels %s > %s seconds... " ,
self . parent . cull_interval , self . cull_state , self . parent . cull_idle_timeout )
# Get a separate list of kernels to avoid conflicting updates while iterating
for kernel_id in self . parent . list_kernel_ids ( ) :
await self . parent . cull_kernel_if_idle ( kernel_id )
async def shutdown_kernel ( self , kernel_id , now = False , restart = False ) :
""" Shutdown a kernel by kernel_id """
self . _check_kernel_id ( kernel_id )
kernel = self . _kernels [ kernel_id ]
if kernel . _activity_stream :
kernel . _activity_stream . close ( )
kernel . _activity_stream = None
self . stop_buffering ( kernel_id )
self . _kernel_connections . pop ( kernel_id , None )
async def cull_kernel_if_idle ( self , kernel_id ) :
# Decrease the metric of number of kernels
# running for the relevant kernel type by 1
KERNEL_CURRENTLY_RUNNING_TOTAL . labels (
type = self . _kernels [ kernel_id ] . kernel_name
) . dec ( )
# Get the kernel model and use that to determine cullability...
try :
model = self . parent . kernel_model ( kernel_id )
self . log . debug ( " kernel_id= %s , kernel_name= %s , last_activity= %s " ,
kernel_id , model [ ' name ' ] , model [ ' last_activity ' ] )
if model [ ' last_activity ' ] is not None :
# Convert dates to compatible formats. Since both are UTC, strip the TZ info from
# the current time and convert the last_activity string to a datetime.
dt_now = utcnow ( ) . replace ( tzinfo = None )
dt_last_activity = datetime . strptime ( model [ ' last_activity ' ] , " % Y- % m- %d T % H: % M: % S. %f Z " )
dt_idle = dt_now - dt_last_activity
# Compute idle properties
is_idle_time = dt_idle > timedelta ( seconds = self . parent . cull_idle_timeout )
is_idle_execute = self . parent . cull_busy or ( model [ ' execution_state ' ] != ' busy ' )
connections = model . get ( ' connections ' , 0 )
is_idle_connected = self . parent . cull_connected or connections == 0
# Cull the kernel if all three criteria are met
if is_idle_time and is_idle_execute and is_idle_connected :
idle_duration = int ( dt_idle . total_seconds ( ) )
self . log . warning (
" Culling ' %s ' kernel ' %s ' ( %s ) with %d connections due to %s seconds of inactivity. " ,
model [ ' execution_state ' ] , model [ ' name ' ] , kernel_id , connections , idle_duration )
await maybe_future ( self . parent . shutdown_kernel ( kernel_id ) )
except KeyError :
pass # KeyErrors are somewhat expected since the kernel can be shutdown as the culling check is made.
except Exception as e : # other errors are not as expected, so we'll make some noise, but continue.
self . log . exception ( " The following exception was encountered while checking the idle "
" duration of kernel %s : %s " , kernel_id , e )
return await self . super . shutdown_kernel ( self , kernel_id , now = now , restart = restart )