From 677ccc3d7a34e462ddaa85a2897e5418defec179 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Tue, 12 Mar 2019 15:54:01 -0700 Subject: [PATCH] Refactor buffering, culling and activity monitoring into classes Use composition in preparation for AsyncMappingKernelManager. --- notebook/services/kernels/kernelmanager.py | 332 ++++++++++++++------- 1 file changed, 221 insertions(+), 111 deletions(-) diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index a602b9ed6..422979982 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -19,8 +19,8 @@ from tornado.ioloop import IOLoop, PeriodicCallback from jupyter_client.session import Session from jupyter_client.multikernelmanager import MultiKernelManager from traitlets import (Any, Bool, Dict, List, Unicode, TraitError, Integer, - Float, Instance, default, validate -) + Float, Instance, default, validate) +from traitlets.config.configurable import LoggingConfigurable from notebook.utils import maybe_future, to_os_path, exists from notebook._tz import utcnow, isoformat @@ -42,10 +42,6 @@ class MappingKernelManager(MultiKernelManager): _kernel_connections = Dict() - _culler_callback = None - - _initialized_culler = False - @default('root_dir') def _default_root_dir(self): try: @@ -108,18 +104,22 @@ class MappingKernelManager(MultiKernelManager): """ ) - _kernel_buffers = Any() - @default('_kernel_buffers') - def _default_kernel_buffers(self): - return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) - last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") + # members used to hold composed instances + buffering_manager = None + kernel_culler = None + activity_monitor = None + def __init__(self, **kwargs): super(MappingKernelManager, self).__init__(**kwargs) self.last_kernel_activity = utcnow() + self.buffering_manager = BufferingManager(parent=self) + self.kernel_culler = KernelCuller(parent=self) + self.activity_monitor = ActivityMonitor(parent=self) + allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. @@ -187,10 +187,6 @@ class MappingKernelManager(MultiKernelManager): self._check_kernel_id(kernel_id) self.log.info("Using existing kernel: %s" % kernel_id) - # Initialize culling if not already - if not self._initialized_culler: - self.initialize_culler() - # py2-compat raise gen.Return(kernel_id) @@ -208,30 +204,7 @@ class MappingKernelManager(MultiKernelManager): channels: dict({'channel': ZMQStream}) The zmq channels whose messages should be buffered. """ - - if not self.buffer_offline_messages: - for channel, stream in channels.items(): - stream.close() - return - - self.log.info("Starting buffering for %s", session_key) - self._check_kernel_id(kernel_id) - # clear previous buffering state - self.stop_buffering(kernel_id) - buffer_info = self._kernel_buffers[kernel_id] - # record the session key because only one session can buffer - buffer_info['session_key'] = session_key - # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple - buffer_info['buffer'] = [] - buffer_info['channels'] = channels - - # forward any future messages to the internal buffer - def buffer_msg(channel, msg_parts): - self.log.debug("Buffering msg on %s:%s", kernel_id, channel) - buffer_info['buffer'].append((channel, msg_parts)) - - for channel, stream in channels.items(): - stream.on_recv(partial(buffer_msg, channel)) + self.buffering_manager.start_buffering(kernel_id, session_key, channels) def get_buffer(self, kernel_id, session_key): """Get the buffer for a given kernel @@ -245,18 +218,7 @@ class MappingKernelManager(MultiKernelManager): If the session_key matches the current buffered session_key, the buffer will be returned. """ - self.log.debug("Getting buffer for %s", kernel_id) - if kernel_id not in self._kernel_buffers: - return - - buffer_info = self._kernel_buffers[kernel_id] - if buffer_info['session_key'] == session_key: - # remove buffer - self._kernel_buffers.pop(kernel_id) - # only return buffer_info if it's a match - return buffer_info - else: - self.stop_buffering(kernel_id) + return self.buffering_manager.get_buffer(kernel_id, session_key) def stop_buffering(self, kernel_id): """Stop buffering kernel messages @@ -266,22 +228,7 @@ class MappingKernelManager(MultiKernelManager): kernel_id : str The id of the kernel to stop buffering. """ - self.log.debug("Clearing buffer for %s", kernel_id) - self._check_kernel_id(kernel_id) - - if kernel_id not in self._kernel_buffers: - return - buffer_info = self._kernel_buffers.pop(kernel_id) - # close buffering streams - for stream in buffer_info['channels'].values(): - if not stream.closed(): - stream.on_recv(None) - stream.close() - - msg_buffer = buffer_info['buffer'] - if msg_buffer: - self.log.info("Discarding %s buffered messages for %s", - len(msg_buffer), buffer_info['session_key']) + self.buffering_manager.stop_buffering(kernel_id) def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by kernel_id""" @@ -372,9 +319,9 @@ class MappingKernelManager(MultiKernelManager): return model def list_kernels(self): - """Returns a list of kernel_id's of kernels running.""" + """Returns a list of kernel models relative to the running kernels.""" kernels = [] - kernel_ids = super(MappingKernelManager, self).list_kernel_ids() + kernel_ids = self.list_kernel_ids() for kernel_id in kernel_ids: model = self.kernel_model(kernel_id) kernels.append(model) @@ -394,7 +341,38 @@ class MappingKernelManager(MultiKernelManager): - update last_activity on every message - record execution_state from status messages """ - kernel = self._kernels[kernel_id] + self.activity_monitor.start_watching_activity(kernel_id, self._kernels[kernel_id]) + + def initialize_culler(self): + """Initial culler if not already. + """ + if self.kernel_culler is None: + self.kernel_culler = KernelCuller(parent=self) + + def cull_kernels(self): + # Defer to KernelCuller + self.kernel_culler.cull_kernels() + + def cull_kernel_if_idle(self, kernel_id): + # Defer to KernelCuller + self.kernel_culler.cull_kernel_if_idle(kernel_id) + + +class ActivityMonitor(LoggingConfigurable): + """Establishes activity recorder for each active kernel""" + + def __init__(self, **kwargs): + super(ActivityMonitor, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "ActivityMonitor requires an instance of MappingKernelManager!") + + def start_watching_activity(self, kernel_id, kernel): + """Start watching IOPub messages on a kernel for activity. + + - update last_activity on every message + - record execution_state from status messages + """ # add busy/activity markers: kernel.execution_state = 'starting' kernel.last_activity = utcnow() @@ -406,7 +384,7 @@ class MappingKernelManager(MultiKernelManager): def record_activity(msg_list): """Record an IOPub message arriving from a kernel""" - self.last_kernel_activity = kernel.last_activity = utcnow() + self.parent.last_kernel_activity = kernel.last_activity = utcnow() idents, fed_msg_list = session.feed_identities(msg_list) msg = session.deserialize(fed_msg_list) @@ -420,55 +398,187 @@ class MappingKernelManager(MultiKernelManager): kernel._activity_stream.on_recv(record_activity) - def initialize_culler(self): - """Start idle culler if 'cull_idle_timeout' is greater than zero. - Regardless of that value, set flag that we've been here. +class BufferingManager(LoggingConfigurable): + """Manages buffering across the active kernels""" + _kernel_buffers = Any() + + @default('_kernel_buffers') + def _default_kernel_buffers(self): + return defaultdict(lambda: {'buffer': [], 'session_key': '', 'channels': {}}) + + def __init__(self, **kwargs): + super(BufferingManager, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "BufferingManager requires an instance of MappingKernelManager!") + + def _check_kernel_id(self, kernel_id): + """Check a that a kernel_id exists and raise 404 if not.""" + if kernel_id not in self.parent: + raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id) + + def start_buffering(self, kernel_id, session_key, channels): + """Start buffering messages for a kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. + channels: dict({'channel': ZMQStream}) + The zmq channels whose messages should be buffered. + """ + + if not self.parent.buffer_offline_messages: + for channel, stream in channels.items(): + stream.close() + return + + self._check_kernel_id(kernel_id) + self.log.info("Starting buffering for %s", session_key) + # clear previous buffering state + self.parent.stop_buffering(kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + # record the session key because only one session can buffer + buffer_info['session_key'] = session_key + # TODO: the buffer should likely be a memory bounded queue, we're starting with a list to keep it simple + buffer_info['buffer'] = [] + buffer_info['channels'] = channels + + # forward any future messages to the internal buffer + def buffer_msg(channel, msg_parts): + self.log.debug("Buffering msg on %s:%s", kernel_id, channel) + buffer_info['buffer'].append((channel, msg_parts)) + + for channel, stream in channels.items(): + stream.on_recv(partial(buffer_msg, channel)) + + def get_buffer(self, kernel_id, session_key): + """Get the buffer for a given kernel + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + session_key: str, optional + The session_key, if any, that should get the buffer. + If the session_key matches the current buffered session_key, + the buffer will be returned. """ - if not self._initialized_culler and self.cull_idle_timeout > 0: - if self._culler_callback is None: - loop = IOLoop.current() - if self.cull_interval <= 0: #handle case where user set invalid value - self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", - self.cull_interval, self.cull_interval_default) - self.cull_interval = self.cull_interval_default - self._culler_callback = PeriodicCallback( - self.cull_kernels, 1000*self.cull_interval) - self.log.info("Culling kernels with idle durations > %s seconds at %s second intervals ...", - self.cull_idle_timeout, self.cull_interval) - if self.cull_busy: - self.log.info("Culling kernels even if busy") - if self.cull_connected: - self.log.info("Culling kernels even with connected clients") - self._culler_callback.start() - - self._initialized_culler = True + if kernel_id not in self._kernel_buffers: + return + + self.log.debug("Getting buffer for %s", kernel_id) + buffer_info = self._kernel_buffers[kernel_id] + if buffer_info['session_key'] == session_key: + # remove buffer + self._kernel_buffers.pop(kernel_id) + # only return buffer_info if it's a match + return buffer_info + else: + self.parent.stop_buffering(kernel_id) + + def stop_buffering(self, kernel_id): + """Stop buffering kernel messages + + Parameters + ---------- + kernel_id : str + The id of the kernel to stop buffering. + """ + self._check_kernel_id(kernel_id) + + if kernel_id not in self._kernel_buffers: + return + self.log.debug("Clearing buffer for %s", kernel_id) + buffer_info = self._kernel_buffers.pop(kernel_id) + # close buffering streams + for stream in buffer_info['channels'].values(): + if not stream.closed(): + stream.on_recv(None) + stream.close() + + msg_buffer = buffer_info['buffer'] + if msg_buffer: + self.log.info("Discarding %s buffered messages for %s", + len(msg_buffer), buffer_info['session_key']) + + +class KernelCuller(LoggingConfigurable): + """Handles culling responsibilities for active kernels""" + + def __init__(self, **kwargs): + super(KernelCuller, self).__init__(**kwargs) + if not isinstance(self.parent, MappingKernelManager): + raise RuntimeError( + "KernelCuller requires an instance of MappingKernelManager!") + self.cull_state = "idle" if not self.parent.cull_busy else "inactive" # used during logging + + # Start idle culler if 'cull_idle_timeout' is greater than zero. + # Regardless of that value, set flag that we've been here. + if self.parent.cull_idle_timeout > 0: + if self.parent.cull_interval <= 0: # handle case where user set invalid value + self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).", + self.parent.cull_interval, self.parent.cull_interval_default) + self.parent.cull_interval = self.parent.cull_interval_default + self._culler_callback = PeriodicCallback( + self.parent.cull_kernels, 1000*self.parent.cull_interval) + self._culler_callback.start() + self._log_info() + + def _log_info(self): + """Builds a single informational message relative to the culling configuration (logged at startup).""" + log_msg = list() + log_msg.append("Culling kernels with {cull_state} durations > {timeout} seconds at {interval} second intervals". + format(cull_state=self.cull_state, timeout=self.parent.cull_idle_timeout, + interval=self.parent.cull_interval)) + if self.parent.cull_busy or self.parent.cull_connected: + log_msg.append(" - including") + if self.parent.cull_busy: + log_msg.append(" busy") + if self.parent.cull_connected: + log_msg.append(" and") + if self.parent.cull_connected: + log_msg.append(" connected") + log_msg.append(" kernels") + log_msg.append(".") + self.log.info(''.join(log_msg)) def cull_kernels(self): - self.log.debug("Polling every %s seconds for kernels idle > %s seconds...", - self.cull_interval, self.cull_idle_timeout) - """Create a separate list of kernels to avoid conflicting updates while iterating""" - for kernel_id in list(self._kernels): + self.log.debug("Polling every %s seconds for kernels %s > %s seconds...", + self.parent.cull_interval, self.cull_state, self.parent.cull_idle_timeout) + # Get a separate list of kernels to avoid conflicting updates while iterating + for kernel_id in self.parent.list_kernel_ids(): try: - self.cull_kernel_if_idle(kernel_id) + self.parent.cull_kernel_if_idle(kernel_id) except Exception as e: - self.log.exception("The following exception was encountered while checking the idle duration of kernel %s: %s", - kernel_id, e) + self.log.exception("The following exception was encountered while checking the idle " + "duration of kernel %s: %s", kernel_id, e) def cull_kernel_if_idle(self, kernel_id): - kernel = self._kernels[kernel_id] - self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", kernel_id, kernel.kernel_name, kernel.last_activity) - if kernel.last_activity is not None: - dt_now = utcnow() - dt_idle = dt_now - kernel.last_activity + + # Get the kernel model and use that to determine cullability... + model = self.parent.kernel_model(kernel_id) + self.log.debug("kernel_id=%s, kernel_name=%s, last_activity=%s", + kernel_id, model['name'], model['last_activity']) + if model['last_activity'] is not None: + # Convert dates to compatible formats. Since both are UTC, strip the TZ info from + # the current time and convert the last_activity string to a datetime. + dt_now = utcnow().replace(tzinfo=None) + dt_last_activity = datetime.strptime(model['last_activity'], "%Y-%m-%dT%H:%M:%S.%fZ") + dt_idle = dt_now - dt_last_activity # Compute idle properties - is_idle_time = dt_idle > timedelta(seconds=self.cull_idle_timeout) - is_idle_execute = self.cull_busy or (kernel.execution_state != 'busy') - connections = self._kernel_connections.get(kernel_id, 0) - is_idle_connected = self.cull_connected or not connections + is_idle_time = dt_idle > timedelta(seconds=self.parent.cull_idle_timeout) + is_idle_execute = self.parent.cull_busy or (model['execution_state'] != 'busy') + connections = model.get('connections', 0) + is_idle_connected = self.parent.cull_connected or connections == 0 # Cull the kernel if all three criteria are met - if (is_idle_time and is_idle_execute and is_idle_connected): + if is_idle_time and is_idle_execute and is_idle_connected: idle_duration = int(dt_idle.total_seconds()) self.log.warning("Culling '%s' kernel '%s' (%s) with %d connections due to %s seconds of inactivity.", - kernel.execution_state, kernel.kernel_name, kernel_id, connections, idle_duration) - self.shutdown_kernel(kernel_id) + model['execution_state'], model['name'], kernel_id, connections, idle_duration) + self.parent.shutdown_kernel(kernel_id)