Refactor buffering, culling and activity monitoring into classes

Use composition in preparation for AsyncMappingKernelManager.
Kevin Bates 7 years ago
parent 43df5af2b6
commit 677ccc3d7a
No known key found for this signature in database
GPG Key ID: ADCCD5840EE5145F

@ -19,8 +19,8 @@ from tornado.ioloop import IOLoop, PeriodicCallback
from jupyter_client.session import Session
from jupyter_client.multikernelmanager import MultiKernelManager
from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer,
Float, Instance, default, validate
)
Float, Instance, default, validate)
from traitlets.config.configurable import LoggingConfigurable
from notebook.utils import maybe_future, to_os_path, exists
from notebook._tz import utcnow, isoformat
@ -42,10 +42,6 @@ class MappingKernelManager(MultiKernelManager):
_kernel_connections = Dict()
_culler_callback = None
_initialized_culler = False
@default('root_dir')
def _default_root_dir(self):
try:
@ -108,18 +104,22 @@ class MappingKernelManager(MultiKernelManager):
"""
)
_kernel_buffers = Any()
@default('_kernel_buffers')
def _default_kernel_buffers(self):
return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})
last_kernel_activity = Instance(datetime,
help="The last activity on any kernel, including shutting down a kernel")
# members used to hold composed instances
buffering_manager = None
kernel_culler = None
activity_monitor = None
def __init__(self, **kwargs):
super(MappingKernelManager, self).__init__(**kwargs)
self.last_kernel_activity = utcnow()
self.buffering_manager = BufferingManager(parent=self)
self.kernel_culler = KernelCuller(parent=self)
self.activity_monitor = ActivityMonitor(parent=self)
allowed_message_types = List(trait=Unicode(), config=True,
help="""White list of allowed kernel message types.
When the list is empty, all message types are allowed.
@ -187,10 +187,6 @@ class MappingKernelManager(MultiKernelManager):
self._check_kernel_id(kernel_id)
self.log.info("Using existing kernel: %s" % kernel_id)
# Initialize culling if not already
if not self._initialized_culler:
self.initialize_culler()
# py2-compat
raise gen.Return(kernel_id)
@ -208,30 +204,7 @@ class MappingKernelManager(MultiKernelManager):
channels: dict({'channel': ZMQStream})
The zmq channels whose messages should be buffered.
"""
if not self.buffer_offline_messages:
for channel, stream in channels.items():
stream.close()
return
self.log.info("Starting buffering for %s", session_key)
self._check_kernel_id(kernel_id)
# clear previous buffering state
self.stop_buffering(kernel_id)
buffer_info = self._kernel_buffers[kernel_id]
# record the session key because only one session can buffer
buffer_info['session_key'] = session_key
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
buffer_info['buffer'] = []
buffer_info['channels'] = channels
# forward any future messages to the internal buffer
def buffer_msg(channel, msg_parts):
self.log.debug("Buffering msg on %s:%s", kernel_id, channel)
buffer_info['buffer'].append((channel, msg_parts))
for channel, stream in channels.items():
stream.on_recv(partial(buffer_msg, channel))
self.buffering_manager.start_buffering(kernel_id, session_key, channels)
def get_buffer(self, kernel_id, session_key):
"""Get the buffer for a given kernel
@ -245,18 +218,7 @@ class MappingKernelManager(MultiKernelManager):
If the session_key matches the current buffered session_key,
the buffer will be returned.
"""
self.log.debug("Getting buffer for %s", kernel_id)
if kernel_id not in self._kernel_buffers:
return
buffer_info = self._kernel_buffers[kernel_id]
if buffer_info['session_key'] == session_key:
# remove buffer
self._kernel_buffers.pop(kernel_id)
# only return buffer_info if it's a match
return buffer_info
else:
self.stop_buffering(kernel_id)
return self.buffering_manager.get_buffer(kernel_id, session_key)
def stop_buffering(self, kernel_id):
"""Stop buffering kernel messages
@ -266,22 +228,7 @@ class MappingKernelManager(MultiKernelManager):
kernel_id : str
The id of the kernel to stop buffering.
"""
self.log.debug("Clearing buffer for %s", kernel_id)
self._check_kernel_id(kernel_id)
if kernel_id not in self._kernel_buffers:
return
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info['channels'].values():
if not stream.closed():
stream.on_recv(None)
stream.close()
msg_buffer = buffer_info['buffer']
if msg_buffer:
self.log.info("Discarding %s buffered messages for %s",
len(msg_buffer), buffer_info['session_key'])
self.buffering_manager.stop_buffering(kernel_id)
def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel by kernel_id"""
@ -372,9 +319,9 @@ class MappingKernelManager(MultiKernelManager):
return model
def list_kernels(self):
"""Returns a list of kernel_id's of kernels running."""
"""Returns a list of kernel models relative to the running kernels."""
kernels = []
kernel_ids = super(MappingKernelManager, self).list_kernel_ids()
kernel_ids = self.list_kernel_ids()
for kernel_id in kernel_ids:
model = self.kernel_model(kernel_id)
kernels.append(model)
@ -394,7 +341,38 @@ class MappingKernelManager(MultiKernelManager):
- update last_activity on every message
- record execution_state from status messages
"""
kernel = self._kernels[kernel_id]
self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id])
def initialize_culler(self):
"""Initial culler if not already.
"""
if self.kernel_culler is None:
self.kernel_culler = KernelCuller(parent=self)
def cull_kernels(self):
# Defer to KernelCuller
self.kernel_culler.cull_kernels()
def cull_kernel_if_idle(self, kernel_id):
# Defer to KernelCuller
self.kernel_culler.cull_kernel_if_idle(kernel_id)
class ActivityMonitor(LoggingConfigurable):
"""Establishes activity recorder for each active kernel"""
def __init__(self, **kwargs):
super(ActivityMonitor, self).__init__(**kwargs)
if not isinstance(self.parent, MappingKernelManager):
raise RuntimeError(
"ActivityMonitor requires an instance of MappingKernelManager!")
def start_watching_activity(self, kernel_id, kernel):
"""Start watching IOPub messages on a kernel for activity.
- update last_activity on every message
- record execution_state from status messages
"""
# add busy/activity markers:
kernel.execution_state = 'starting'
kernel.last_activity = utcnow()
@ -406,7 +384,7 @@ class MappingKernelManager(MultiKernelManager):
def record_activity(msg_list):
"""Record an IOPub message arriving from a kernel"""
self.last_kernel_activity = kernel.last_activity = utcnow()
self.parent.last_kernel_activity = kernel.last_activity = utcnow()
idents, fed_msg_list = session.feed_identities(msg_list)
msg = session.deserialize(fed_msg_list)
@ -420,55 +398,187 @@ class MappingKernelManager(MultiKernelManager):
kernel._activity_stream.on_recv(record_activity)
def initialize_culler(self):
"""Start idle culler if 'cull_idle_timeout' is greater than zero.
Regardless of that value, set flag that we've been here.
class BufferingManager(LoggingConfigurable):
"""Manages buffering across the active kernels"""
_kernel_buffers = Any()
@default('_kernel_buffers')
def _default_kernel_buffers(self):
return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}})
def __init__(self, **kwargs):
super(BufferingManager, self).__init__(**kwargs)
if not isinstance(self.parent, MappingKernelManager):
raise RuntimeError(
"BufferingManager requires an instance of MappingKernelManager!")
def _check_kernel_id(self, kernel_id):
"""Check a that a kernel_id exists and raise 404 if not."""
if kernel_id not in self.parent:
raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
def start_buffering(self, kernel_id, session_key, channels):
"""Start buffering messages for a kernel
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
session_key: str
The session_key, if any, that should get the buffer.
If the session_key matches the current buffered session_key,
the buffer will be returned.
channels: dict({'channel': ZMQStream})
The zmq channels whose messages should be buffered.
"""
if not self.parent.buffer_offline_messages:
for channel, stream in channels.items():
stream.close()
return
self._check_kernel_id(kernel_id)
self.log.info("Starting buffering for %s", session_key)
# clear previous buffering state
self.parent.stop_buffering(kernel_id)
buffer_info = self._kernel_buffers[kernel_id]
# record the session key because only one session can buffer
buffer_info['session_key'] = session_key
# TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple
buffer_info['buffer'] = []
buffer_info['channels'] = channels
# forward any future messages to the internal buffer
def buffer_msg(channel, msg_parts):
self.log.debug("Buffering msg on %s:%s", kernel_id, channel)
buffer_info['buffer'].append((channel, msg_parts))
for channel, stream in channels.items():
stream.on_recv(partial(buffer_msg, channel))
def get_buffer(self, kernel_id, session_key):
"""Get the buffer for a given kernel
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
session_key: str, optional
The session_key, if any, that should get the buffer.
If the session_key matches the current buffered session_key,
the buffer will be returned.
"""
if not self._initialized_culler and self.cull_idle_timeout > 0:
if self._culler_callback is None:
loop = IOLoop.current()
if self.cull_interval <= 0: #handle case where user set invalid value
self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
self.cull_interval, self.cull_interval_default)
self.cull_interval = self.cull_interval_default
self._culler_callback = PeriodicCallback(
self.cull_kernels, 1000*self.cull_interval)
self.log.info("Culling kernels with idle durations > %s seconds at %s second intervals ...",
self.cull_idle_timeout, self.cull_interval)
if self.cull_busy:
self.log.info("Culling kernels even if busy")
if self.cull_connected:
self.log.info("Culling kernels even with connected clients")
self._culler_callback.start()
self._initialized_culler = True
if kernel_id not in self._kernel_buffers:
return
self.log.debug("Getting buffer for %s", kernel_id)
buffer_info = self._kernel_buffers[kernel_id]
if buffer_info['session_key'] == session_key:
# remove buffer
self._kernel_buffers.pop(kernel_id)
# only return buffer_info if it's a match
return buffer_info
else:
self.parent.stop_buffering(kernel_id)
def stop_buffering(self, kernel_id):
"""Stop buffering kernel messages
Parameters
----------
kernel_id : str
The id of the kernel to stop buffering.
"""
self._check_kernel_id(kernel_id)
if kernel_id not in self._kernel_buffers:
return
self.log.debug("Clearing buffer for %s", kernel_id)
buffer_info = self._kernel_buffers.pop(kernel_id)
# close buffering streams
for stream in buffer_info['channels'].values():
if not stream.closed():
stream.on_recv(None)
stream.close()
msg_buffer = buffer_info['buffer']
if msg_buffer:
self.log.info("Discarding %s buffered messages for %s",
len(msg_buffer), buffer_info['session_key'])
class KernelCuller(LoggingConfigurable):
"""Handles culling responsibilities for active kernels"""
def __init__(self, **kwargs):
super(KernelCuller, self).__init__(**kwargs)
if not isinstance(self.parent, MappingKernelManager):
raise RuntimeError(
"KernelCuller requires an instance of MappingKernelManager!")
self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging
# Start idle culler if 'cull_idle_timeout' is greater than zero.
# Regardless of that value, set flag that we've been here.
if self.parent.cull_idle_timeout > 0:
if self.parent.cull_interval <= 0: # handle case where user set invalid value
self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
self.parent.cull_interval, self.parent.cull_interval_default)
self.parent.cull_interval = self.parent.cull_interval_default
self._culler_callback = PeriodicCallback(
self.parent.cull_kernels, 1000*self.parent.cull_interval)
self._culler_callback.start()
self._log_info()
def _log_info(self):
"""Builds a single informational message relative to the culling configuration (logged at startup)."""
log_msg = list()
log_msg.append("Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals".
format(cull_state=self.cull_state, timeout=self.parent.cull_idle_timeout,
interval=self.parent.cull_interval))
if self.parent.cull_busy or self.parent.cull_connected:
log_msg.append(" - including")
if self.parent.cull_busy:
log_msg.append(" busy")
if self.parent.cull_connected:
log_msg.append(" and")
if self.parent.cull_connected:
log_msg.append(" connected")
log_msg.append(" kernels")
log_msg.append(".")
self.log.info(''.join(log_msg))
def cull_kernels(self):
self.log.debug("Polling every %s seconds for kernels idle > %s seconds...",
self.cull_interval, self.cull_idle_timeout)
"""Create a separate list of kernels to avoid conflicting updates while iterating"""
for kernel_id in list(self._kernels):
self.log.debug("Polling every %s seconds for kernels %s > %s seconds...",
self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout)
# Get a separate list of kernels to avoid conflicting updates while iterating
for kernel_id in self.parent.list_kernel_ids():
try:
self.cull_kernel_if_idle(kernel_id)
self.parent.cull_kernel_if_idle(kernel_id)
except Exception as e:
self.log.exception("The following exception was encountered while checking the idle duration of kernel %s: %s",
kernel_id, e)
self.log.exception("The following exception was encountered while checking the idle "
"duration of kernel %s: %s", kernel_id, e)
def cull_kernel_if_idle(self, kernel_id):
kernel = self._kernels[kernel_id]
self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", kernel_id, kernel.kernel_name, kernel.last_activity)
if kernel.last_activity is not None:
dt_now = utcnow()
dt_idle = dt_now - kernel.last_activity
# Get the kernel model and use that to determine cullability...
model = self.parent.kernel_model(kernel_id)
self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s",
kernel_id, model['name'], model['last_activity'])
if model['last_activity'] is not None:
# Convert dates to compatible formats. Since both are UTC, strip the TZ info from
# the current time and convert the last_activity string to a datetime.
dt_now = utcnow().replace(tzinfo=None)
dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ")
dt_idle = dt_now - dt_last_activity
# Compute idle properties
is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout)
is_idle_execute = self.cull_busy or (kernel.execution_state != 'busy')
connections = self._kernel_connections.get(kernel_id, 0)
is_idle_connected = self.cull_connected or not connections
is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout)
is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy')
connections = model.get('connections', 0)
is_idle_connected = self.parent.cull_connected or connections == 0
# Cull the kernel if all three criteria are met
if (is_idle_time and is_idle_execute and is_idle_connected):
if is_idle_time and is_idle_execute and is_idle_connected:
idle_duration = int(dt_idle.total_seconds())
self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.",
kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration)
self.shutdown_kernel(kernel_id)
model['execution_state'], model['name'], kernel_id, connections, idle_duration)
self.parent.shutdown_kernel(kernel_id)

Loading…
Cancel
Save