From 3b3c4d031596f0acd20a352d5984762976e90a6f Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Tue, 5 Jan 2016 16:50:05 -0800 Subject: [PATCH 01/11] Add simple iopub message rate limiter --- notebook/notebookapp.py | 22 ++++++-- notebook/services/kernels/handlers.py | 78 +++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 2c1d392d7..eeb7bbbc9 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -77,7 +77,7 @@ from jupyter_client.session import Session from nbformat.sign import NotebookNotary from traitlets import ( Dict, Unicode, Integer, List, Bool, Bytes, Instance, - TraitError, Type, + TraitError, Type, Float ) from ipython_genutils import py3compat from jupyter_core.paths import jupyter_runtime_dir, jupyter_path @@ -187,7 +187,12 @@ class NotebookWebApplication(web.Application): }, version_hash=version_hash, ignore_minified_js=ipython_app.ignore_minified_js, - + + # rate limits + iopub_msg_rate_limit=ipython_app.iopub_msg_rate_limit, + iopub_data_rate_limit=ipython_app.iopub_data_rate_limit, + limit_poll_interval=ipython_app.limit_poll_interval, + # authentication cookie_secret=ipython_app.cookie_secret, login_url=url_path_join(base_url,'/login'), @@ -785,9 +790,20 @@ class NotebookApp(JupyterApp): help="Reraise exceptions encountered loading server extensions?", ) + iopub_msg_rate_limit = Float(config=True, allow_none=True, help="""(msg/sec) + Maximum rate at which messages can be sent on iopub before they are + limitted.""") + + iopub_data_rate_limit = Float(config=True, allow_none=True, help="""(bytes/sec) + Maximum rate at which messages can be sent on iopub before they are + limited.""") + + limit_poll_interval = Float(1.0, config=True, help="""(sec) Interval in + which to check the message and data rate limits.""") + def parse_command_line(self, argv=None): super(NotebookApp, self).parse_command_line(argv) - + if self.extra_args: arg0 = self.extra_args[0] f = os.path.abspath(arg0) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 0da36714f..02b69dd6f 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -8,6 +8,7 @@ Preliminary documentation at https://github.com/ipython/ipython/wiki/IPEP-16%3A- import json import logging +import time from tornado import gen, web from tornado.concurrent import Future from tornado.ioloop import IOLoop @@ -96,14 +97,26 @@ class KernelActionHandler(APIHandler): class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): - + @property def kernel_info_timeout(self): return self.settings.get('kernel_info_timeout', 10) - + + @property + def iopub_msg_rate_limit(self): + return self.settings.get('iopub_msg_rate_limit', None) + + @property + def iopub_data_rate_limit(self): + return self.settings.get('iopub_data_rate_limit', None) + + @property + def limit_poll_interval(self): + return self.settings.get('limit_poll_interval', 1.0) + def __repr__(self): return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) - + def create_stream(self): km = self.kernel_manager identity = self.session.bsession @@ -182,7 +195,14 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.kernel_id = None self.kernel_info_channel = None self._kernel_info_future = Future() - + + # Rate limiting code + self._iopub_msg_count = 0 + self._iopub_byte_count = 0 + self._iopub_msgs_exceeded = False + self._iopub_bytes_exceeded = False + self._last_limit_check = time.time() + @gen.coroutine def pre_get(self): # authenticate first @@ -231,6 +251,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # already closed, ignore the message self.log.debug("Received message on closed websocket %r", msg) return + length = len(msg) if isinstance(msg, bytes): msg = deserialize_binary_message(msg) else: @@ -242,6 +263,55 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): if channel not in self.channels: self.log.warn("No such channel: %r", channel) return + if channel == 'iopub': + # Increment the bytes and message count + self._iopub_msg_count += 1 + self._iopub_byte_count += length + + # If the elapsed time is equal to or greater than the limiter + # interval, check the limits, set the limit flags, and reset the + # message and data counts. + elapsed_time = (time.time() - self._last_limit_check).seconds() + if elapsed_time >= self.limit_poll_interval: + self._last_limit_check = time.time() + + msg_rate = self._iopub_msg_count / elapsed_time + data_rate = self._iopub_byte_count / elapsed_time + + # Check the msg rate + if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit: + if not self._iopub_msgs_exceeded: + self._iopub_msgs_exceeded = True + self.log.warn("""iopub message rate exceeded. The + notebook server will temporarily stop sending iopub + messages to the client in order to avoid crashing it. + To change this limit, set the config variable + `--NotebookApp.iopub_msg_rate_limit`.""") + else: + if self._iopub_msgs_exceeded: + self._iopub_msgs_exceeded = False + if not self._iopub_data_exceeded: + self.log.warn("""iopub messages resumed""") + + # Check the data rate + if self.iopub_data_rate_limit is not None and data_rate > self.iopub_data_rate_limit: + if not self._iopub_data_exceeded: + self._iopub_data_exceeded = True + self.log.warn("""iopub data rate exceeded. The + notebook server will temporarily stop sending iopub + messages to the client in order to avoid crashing it. + To change this limit, set the config variable + `--NotebookApp.iopub_data_rate_limit`.""") + else: + if self._iopub_data_exceeded: + self._iopub_data_exceeded = False + if not self._iopub_msgs_exceeded: + self.log.warn("""iopub messages resumed""") + + # If either of the limit flags are set, do not send the message. + if self._iopub_msgs_exceeded or self._iopub_data_exceeded: + return + stream = self.channels[channel] self.session.send(stream, msg) From 4c99c5f7f86e54c336b1117029600c4b27e29123 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Mon, 11 Jan 2016 10:49:23 -0800 Subject: [PATCH 02/11] Limit on websocket, not zmq --- notebook/notebookapp.py | 8 ++-- notebook/services/kernels/handlers.py | 64 ++++++++++++++++++++------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index eeb7bbbc9..95eacf4fc 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -790,15 +790,15 @@ class NotebookApp(JupyterApp): help="Reraise exceptions encountered loading server extensions?", ) - iopub_msg_rate_limit = Float(config=True, allow_none=True, help="""(msg/sec) + iopub_msg_rate_limit = Float(None, config=True, allow_none=True, help="""(msg/sec) Maximum rate at which messages can be sent on iopub before they are - limitted.""") + limited.""") - iopub_data_rate_limit = Float(config=True, allow_none=True, help="""(bytes/sec) + iopub_data_rate_limit = Float(None, config=True, allow_none=True, help="""(bytes/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") - limit_poll_interval = Float(1.0, config=True, help="""(sec) Interval in + limit_poll_interval = Float(0.1, config=True, help="""(sec) Interval in which to check the message and data rate limits.""") def parse_command_line(self, argv=None): diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 02b69dd6f..c970de42e 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -200,7 +200,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self._iopub_msg_count = 0 self._iopub_byte_count = 0 self._iopub_msgs_exceeded = False - self._iopub_bytes_exceeded = False + self._iopub_data_exceeded = False self._last_limit_check = time.time() @gen.coroutine @@ -251,7 +251,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # already closed, ignore the message self.log.debug("Received message on closed websocket %r", msg) return - length = len(msg) if isinstance(msg, bytes): msg = deserialize_binary_message(msg) else: @@ -263,57 +262,88 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): if channel not in self.channels: self.log.warn("No such channel: %r", channel) return + stream = self.channels[channel] + self.session.send(stream, msg) + + def _on_zmq_reply(self, stream, msg_list): + + def write_stderr(error_message): + parent = json.loads(msg_list[-3]) + msg = self.session.msg("stream", + content={"text": error_message, "name": "stderr"}, + parent=parent + ) + msg['channel'] = 'iopub' + self.write_message(json.dumps(msg, default=date_default)) + self.log.warn(error_message) + + channel = getattr(stream, 'channel', None) if channel == 'iopub': + + # If the elapsed time is double the check interval, reset the counts, + # lock flags, and reset the timer. + elapsed_time = (time.time() - self._last_limit_check) + if elapsed_time >= self.limit_poll_interval * 2.0: + self._iopub_msg_count = 0 + self._iopub_byte_count = 0 + if self._iopub_msgs_exceeded or self._iopub_data_exceeded: + self._iopub_data_exceeded = False + self._iopub_msgs_exceeded = False + self.log.warn("iopub messages resumed") + # Increment the bytes and message count self._iopub_msg_count += 1 - self._iopub_byte_count += length - + self._iopub_byte_count += sum([len(x) for x in msg_list]) + # If the elapsed time is equal to or greater than the limiter # interval, check the limits, set the limit flags, and reset the # message and data counts. - elapsed_time = (time.time() - self._last_limit_check).seconds() if elapsed_time >= self.limit_poll_interval: self._last_limit_check = time.time() - - msg_rate = self._iopub_msg_count / elapsed_time - data_rate = self._iopub_byte_count / elapsed_time - + + msg_rate = float(self._iopub_msg_count) / elapsed_time + data_rate = float(self._iopub_byte_count) / elapsed_time + self.log.warn('check msg/s{} & bytes/s{}'.format(msg_rate, data_rate)) + self._iopub_msg_count = 0 + self._iopub_byte_count = 0 + # Check the msg rate if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit: if not self._iopub_msgs_exceeded: self._iopub_msgs_exceeded = True - self.log.warn("""iopub message rate exceeded. The + write_stderr("""iopub message rate exceeded. The notebook server will temporarily stop sending iopub messages to the client in order to avoid crashing it. To change this limit, set the config variable `--NotebookApp.iopub_msg_rate_limit`.""") + return else: if self._iopub_msgs_exceeded: self._iopub_msgs_exceeded = False if not self._iopub_data_exceeded: - self.log.warn("""iopub messages resumed""") - + self.log.warn("iopub messages resumed") + # Check the data rate if self.iopub_data_rate_limit is not None and data_rate > self.iopub_data_rate_limit: if not self._iopub_data_exceeded: self._iopub_data_exceeded = True - self.log.warn("""iopub data rate exceeded. The + write_stderr("""iopub data rate exceeded. The notebook server will temporarily stop sending iopub messages to the client in order to avoid crashing it. To change this limit, set the config variable `--NotebookApp.iopub_data_rate_limit`.""") + return else: if self._iopub_data_exceeded: self._iopub_data_exceeded = False if not self._iopub_msgs_exceeded: - self.log.warn("""iopub messages resumed""") - + self.log.warn("iopub messages resumed") + # If either of the limit flags are set, do not send the message. if self._iopub_msgs_exceeded or self._iopub_data_exceeded: return + super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg_list) - stream = self.channels[channel] - self.session.send(stream, msg) def on_close(self): km = self.kernel_manager From f83485fcee4ed6d993ae165f838076c1cf2dc8d3 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Mon, 11 Jan 2016 12:52:35 -0800 Subject: [PATCH 03/11] Remove log msg --- notebook/services/kernels/handlers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index c970de42e..cf5feec94 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -303,7 +303,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): msg_rate = float(self._iopub_msg_count) / elapsed_time data_rate = float(self._iopub_byte_count) / elapsed_time - self.log.warn('check msg/s{} & bytes/s{}'.format(msg_rate, data_rate)) self._iopub_msg_count = 0 self._iopub_byte_count = 0 From 69faf0fb9b42c92d7b01ceca5a9b37f826859817 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Thu, 14 Jan 2016 10:06:10 -0800 Subject: [PATCH 04/11] Change the logic so the intervals are checked all of the time, also changed no-limit default to "0" --- notebook/notebookapp.py | 10 +-- notebook/services/kernels/handlers.py | 125 ++++++++++++++------------ 2 files changed, 74 insertions(+), 61 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 95eacf4fc..b0c2d3150 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -191,7 +191,7 @@ class NotebookWebApplication(web.Application): # rate limits iopub_msg_rate_limit=ipython_app.iopub_msg_rate_limit, iopub_data_rate_limit=ipython_app.iopub_data_rate_limit, - limit_poll_interval=ipython_app.limit_poll_interval, + limit_window=ipython_app.limit_window, # authentication cookie_secret=ipython_app.cookie_secret, @@ -790,16 +790,16 @@ class NotebookApp(JupyterApp): help="Reraise exceptions encountered loading server extensions?", ) - iopub_msg_rate_limit = Float(None, config=True, allow_none=True, help="""(msg/sec) + iopub_msg_rate_limit = Float(0, config=True, allow_none=True, help="""(msg/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") - iopub_data_rate_limit = Float(None, config=True, allow_none=True, help="""(bytes/sec) + iopub_data_rate_limit = Float(0, config=True, allow_none=True, help="""(bytes/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") - limit_poll_interval = Float(0.1, config=True, help="""(sec) Interval in - which to check the message and data rate limits.""") + limit_window = Float(0.1, config=True, help="""(sec) Time window used to + check the message and data rate limits.""") def parse_command_line(self, argv=None): super(NotebookApp, self).parse_command_line(argv) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index cf5feec94..9a7a33a7c 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -8,7 +8,6 @@ Preliminary documentation at https://github.com/ipython/ipython/wiki/IPEP-16%3A- import json import logging -import time from tornado import gen, web from tornado.concurrent import Future from tornado.ioloop import IOLoop @@ -111,8 +110,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): return self.settings.get('iopub_data_rate_limit', None) @property - def limit_poll_interval(self): - return self.settings.get('limit_poll_interval', 1.0) + def limit_window(self): + return self.settings.get('limit_window', 1.0) def __repr__(self): return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) @@ -201,7 +200,11 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self._iopub_byte_count = 0 self._iopub_msgs_exceeded = False self._iopub_data_exceeded = False - self._last_limit_check = time.time() + # Queue of (time stamp, delta) + # Allows you to specify that the msg or byte counts should be adjusted + # by a delta amount at some point in the future. + self._queue_msg_count = [] + self._queue_byte_count = [] @gen.coroutine def pre_get(self): @@ -280,63 +283,73 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): channel = getattr(stream, 'channel', None) if channel == 'iopub': - # If the elapsed time is double the check interval, reset the counts, - # lock flags, and reset the timer. - elapsed_time = (time.time() - self._last_limit_check) - if elapsed_time >= self.limit_poll_interval * 2.0: - self._iopub_msg_count = 0 - self._iopub_byte_count = 0 - if self._iopub_msgs_exceeded or self._iopub_data_exceeded: - self._iopub_data_exceeded = False - self._iopub_msgs_exceeded = False - self.log.warn("iopub messages resumed") - + # Remove the counts queued for removal. + while len(self._queue_msg_count) > 0: + queued = self._queue_msg_count[0] + if (IOLoop.current().time() >= queued[0]): + self._iopub_msg_count += queued[1] + del self._queue_msg_count[0] + else: + # This part of the queue hasn't be reached yet, so we can + # abort the loop. + break + + while len(self._queue_byte_count) > 0: + queued = self._queue_byte_count[0] + if (IOLoop.current().time() >= queued[0]): + self._iopub_byte_count += queued[1] + del self._queue_byte_count[0] + else: + # This part of the queue hasn't be reached yet, so we can + # abort the loop. + break + # Increment the bytes and message count self._iopub_msg_count += 1 - self._iopub_byte_count += sum([len(x) for x in msg_list]) + byte_count = sum([len(x) for x in msg_list]) + self._iopub_byte_count += byte_count + + # Queue a removal of the byte and message count for a time in the + # future, when we are no longer interested in it. + self._queue_msg_count.append((IOLoop.current().time() + self.limit_window, -1)) + self._queue_byte_count.append((IOLoop.current().time() + self.limit_window, -byte_count)) - # If the elapsed time is equal to or greater than the limiter - # interval, check the limits, set the limit flags, and reset the + # Check the limits, set the limit flags, and reset the # message and data counts. - if elapsed_time >= self.limit_poll_interval: - self._last_limit_check = time.time() - - msg_rate = float(self._iopub_msg_count) / elapsed_time - data_rate = float(self._iopub_byte_count) / elapsed_time - self._iopub_msg_count = 0 - self._iopub_byte_count = 0 - - # Check the msg rate - if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit: - if not self._iopub_msgs_exceeded: - self._iopub_msgs_exceeded = True - write_stderr("""iopub message rate exceeded. The - notebook server will temporarily stop sending iopub - messages to the client in order to avoid crashing it. - To change this limit, set the config variable - `--NotebookApp.iopub_msg_rate_limit`.""") - return - else: - if self._iopub_msgs_exceeded: - self._iopub_msgs_exceeded = False - if not self._iopub_data_exceeded: - self.log.warn("iopub messages resumed") - - # Check the data rate - if self.iopub_data_rate_limit is not None and data_rate > self.iopub_data_rate_limit: + msg_rate = float(self._iopub_msg_count) / self.limit_window + data_rate = float(self._iopub_byte_count) / self.limit_window + + # Check the msg rate + if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit and self.iopub_msg_rate_limit > 0: + if not self._iopub_msgs_exceeded: + self._iopub_msgs_exceeded = True + write_stderr("""iopub message rate exceeded. The + notebook server will temporarily stop sending iopub + messages to the client in order to avoid crashing it. + To change this limit, set the config variable + `--NotebookApp.iopub_msg_rate_limit`.""") + return + else: + if self._iopub_msgs_exceeded: + self._iopub_msgs_exceeded = False if not self._iopub_data_exceeded: - self._iopub_data_exceeded = True - write_stderr("""iopub data rate exceeded. The - notebook server will temporarily stop sending iopub - messages to the client in order to avoid crashing it. - To change this limit, set the config variable - `--NotebookApp.iopub_data_rate_limit`.""") - return - else: - if self._iopub_data_exceeded: - self._iopub_data_exceeded = False - if not self._iopub_msgs_exceeded: - self.log.warn("iopub messages resumed") + self.log.warn("iopub messages resumed") + + # Check the data rate + if self.iopub_data_rate_limit is not None and data_rate > self.iopub_data_rate_limit and self.iopub_data_rate_limit > 0: + if not self._iopub_data_exceeded: + self._iopub_data_exceeded = True + write_stderr("""iopub data rate exceeded. The + notebook server will temporarily stop sending iopub + messages to the client in order to avoid crashing it. + To change this limit, set the config variable + `--NotebookApp.iopub_data_rate_limit`.""") + return + else: + if self._iopub_data_exceeded: + self._iopub_data_exceeded = False + if not self._iopub_msgs_exceeded: + self.log.warn("iopub messages resumed") # If either of the limit flags are set, do not send the message. if self._iopub_msgs_exceeded or self._iopub_data_exceeded: From 7232a6a99a34610d5e271b12958e0fa2f1ccc9fc Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Fri, 15 Jan 2016 09:50:31 -0800 Subject: [PATCH 05/11] Clarification & cleanup --- notebook/notebookapp.py | 4 +-- notebook/services/kernels/handlers.py | 44 +++++++++++---------------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index b0c2d3150..98c7332b0 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -790,11 +790,11 @@ class NotebookApp(JupyterApp): help="Reraise exceptions encountered loading server extensions?", ) - iopub_msg_rate_limit = Float(0, config=True, allow_none=True, help="""(msg/sec) + iopub_msg_rate_limit = Float(0, config=True, help="""(msg/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") - iopub_data_rate_limit = Float(0, config=True, allow_none=True, help="""(bytes/sec) + iopub_data_rate_limit = Float(0, config=True, help="""(bytes/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 9a7a33a7c..afadecd34 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -196,15 +196,14 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self._kernel_info_future = Future() # Rate limiting code - self._iopub_msg_count = 0 - self._iopub_byte_count = 0 + self._iopub_window_msg_count = 0 + self._iopub_window_byte_count = 0 self._iopub_msgs_exceeded = False self._iopub_data_exceeded = False - # Queue of (time stamp, delta) - # Allows you to specify that the msg or byte counts should be adjusted + # Queue of (time stamp, byte count) + # Allows you to specify that the byte count should be lowered # by a delta amount at some point in the future. - self._queue_msg_count = [] - self._queue_byte_count = [] + self._iopub_window_byte_queue = [] @gen.coroutine def pre_get(self): @@ -284,40 +283,31 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): if channel == 'iopub': # Remove the counts queued for removal. - while len(self._queue_msg_count) > 0: - queued = self._queue_msg_count[0] - if (IOLoop.current().time() >= queued[0]): - self._iopub_msg_count += queued[1] - del self._queue_msg_count[0] - else: - # This part of the queue hasn't be reached yet, so we can - # abort the loop. - break - - while len(self._queue_byte_count) > 0: - queued = self._queue_byte_count[0] - if (IOLoop.current().time() >= queued[0]): - self._iopub_byte_count += queued[1] - del self._queue_byte_count[0] + now = IOLoop.current().time() + while len(self._iopub_window_byte_queue) > 0: + queued = self._iopub_window_byte_queue[0] + if (now >= queued[0]): + self._iopub_window_byte_count -= queued[1] + self._iopub_window_msg_count -= 1 + del self._iopub_window_byte_queue[0] else: # This part of the queue hasn't be reached yet, so we can # abort the loop. break # Increment the bytes and message count - self._iopub_msg_count += 1 + self._iopub_window_msg_count += 1 byte_count = sum([len(x) for x in msg_list]) - self._iopub_byte_count += byte_count + self._iopub_window_byte_count += byte_count # Queue a removal of the byte and message count for a time in the # future, when we are no longer interested in it. - self._queue_msg_count.append((IOLoop.current().time() + self.limit_window, -1)) - self._queue_byte_count.append((IOLoop.current().time() + self.limit_window, -byte_count)) + self._iopub_window_byte_queue.append((now + self.limit_window, byte_count)) # Check the limits, set the limit flags, and reset the # message and data counts. - msg_rate = float(self._iopub_msg_count) / self.limit_window - data_rate = float(self._iopub_byte_count) / self.limit_window + msg_rate = float(self._iopub_window_msg_count) / self.limit_window + data_rate = float(self._iopub_window_byte_count) / self.limit_window # Check the msg rate if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit and self.iopub_msg_rate_limit > 0: From acbbc60e0931ca0800007389bb2316ba98b438ff Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Fri, 15 Jan 2016 10:03:02 -0800 Subject: [PATCH 06/11] Change the window length to 1s --- notebook/notebookapp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 98c7332b0..cb0313b7c 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -798,7 +798,7 @@ class NotebookApp(JupyterApp): Maximum rate at which messages can be sent on iopub before they are limited.""") - limit_window = Float(0.1, config=True, help="""(sec) Time window used to + limit_window = Float(1.0, config=True, help="""(sec) Time window used to check the message and data rate limits.""") def parse_command_line(self, argv=None): From c47f3d021b64b035bd3fe487b6a66c76a6c8bd49 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Fri, 15 Jan 2016 15:14:14 -0800 Subject: [PATCH 07/11] Fix a bug in the code that parses the parent message from the msg_list --- notebook/services/kernels/handlers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index afadecd34..ae1d69b69 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -268,16 +268,17 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.session.send(stream, msg) def _on_zmq_reply(self, stream, msg_list): + idents, fed_msg_list = self.session.feed_identities(msg_list) def write_stderr(error_message): - parent = json.loads(msg_list[-3]) + self.log.warn(error_message) + parent = json.loads(fed_msg_list[2]) msg = self.session.msg("stream", content={"text": error_message, "name": "stderr"}, parent=parent ) msg['channel'] = 'iopub' self.write_message(json.dumps(msg, default=date_default)) - self.log.warn(error_message) channel = getattr(stream, 'channel', None) if channel == 'iopub': From 7aaccbd5456adf5f9792568a85363548b75e7e68 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Fri, 15 Jan 2016 15:21:44 -0800 Subject: [PATCH 08/11] Don't filter status, comm_info, kernel_info, and execute_request messages --- notebook/services/kernels/handlers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index ae1d69b69..0672788e6 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -269,7 +269,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): def _on_zmq_reply(self, stream, msg_list): idents, fed_msg_list = self.session.feed_identities(msg_list) - def write_stderr(error_message): self.log.warn(error_message) parent = json.loads(fed_msg_list[2]) @@ -281,7 +280,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.write_message(json.dumps(msg, default=date_default)) channel = getattr(stream, 'channel', None) - if channel == 'iopub': + msg_type = json.loads(fed_msg_list[1])['msg_type'] + if channel == 'iopub' and msg_type not in ['status', 'comm_info_reply', 'kernel_info_reply', 'execute_input']: # Remove the counts queued for removal. now = IOLoop.current().time() From eb30002a8217d39c823a4952255372108a3b0424 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Sun, 17 Jan 2016 08:40:59 -0800 Subject: [PATCH 09/11] Use a set for filter exceptions --- notebook/services/kernels/handlers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 0672788e6..1f20bb602 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -281,8 +281,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): channel = getattr(stream, 'channel', None) msg_type = json.loads(fed_msg_list[1])['msg_type'] - if channel == 'iopub' and msg_type not in ['status', 'comm_info_reply', 'kernel_info_reply', 'execute_input']: - + if channel == 'iopub' and msg_type not in {'status', 'comm_open', 'execute_input'}: + # Remove the counts queued for removal. now = IOLoop.current().time() while len(self._iopub_window_byte_queue) > 0: From 9e2c95dc0774c86a93b614d584bf9dc2ac224472 Mon Sep 17 00:00:00 2001 From: Jonathan Frederic Date: Mon, 18 Jan 2016 10:41:21 -0800 Subject: [PATCH 10/11] limit_window -> rate_limit_window --- notebook/notebookapp.py | 4 ++-- notebook/services/kernels/handlers.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index cb0313b7c..8ec62b4c1 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -191,7 +191,7 @@ class NotebookWebApplication(web.Application): # rate limits iopub_msg_rate_limit=ipython_app.iopub_msg_rate_limit, iopub_data_rate_limit=ipython_app.iopub_data_rate_limit, - limit_window=ipython_app.limit_window, + rate_limit_window=ipython_app.rate_limit_window, # authentication cookie_secret=ipython_app.cookie_secret, @@ -798,7 +798,7 @@ class NotebookApp(JupyterApp): Maximum rate at which messages can be sent on iopub before they are limited.""") - limit_window = Float(1.0, config=True, help="""(sec) Time window used to + rate_limit_window = Float(1.0, config=True, help="""(sec) Time window used to check the message and data rate limits.""") def parse_command_line(self, argv=None): diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 1f20bb602..fa7a9283b 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -110,8 +110,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): return self.settings.get('iopub_data_rate_limit', None) @property - def limit_window(self): - return self.settings.get('limit_window', 1.0) + def rate_limit_window(self): + return self.settings.get('rate_limit_window', 1.0) def __repr__(self): return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized')) @@ -303,12 +303,12 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # Queue a removal of the byte and message count for a time in the # future, when we are no longer interested in it. - self._iopub_window_byte_queue.append((now + self.limit_window, byte_count)) + self._iopub_window_byte_queue.append((now + self.rate_limit_window, byte_count)) # Check the limits, set the limit flags, and reset the # message and data counts. - msg_rate = float(self._iopub_window_msg_count) / self.limit_window - data_rate = float(self._iopub_window_byte_count) / self.limit_window + msg_rate = float(self._iopub_window_msg_count) / self.rate_limit_window + data_rate = float(self._iopub_window_byte_count) / self.rate_limit_window # Check the msg rate if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit and self.iopub_msg_rate_limit > 0: From c280b773fbe93681bb496acab1ef26c66b9e1aff Mon Sep 17 00:00:00 2001 From: Min RK Date: Thu, 21 Jan 2016 11:26:27 +0100 Subject: [PATCH 11/11] use session.deserialize to unpack message for rate limiting rather than hardcoding json.loads Messages should **never** be deserialized by any means other than the Session API. --- notebook/base/zmqhandlers.py | 19 +++++++++++++------ notebook/services/kernels/handlers.py | 7 ++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/notebook/base/zmqhandlers.py b/notebook/base/zmqhandlers.py index 6a7b5bdcb..1cfa8ecb3 100644 --- a/notebook/base/zmqhandlers.py +++ b/notebook/base/zmqhandlers.py @@ -218,16 +218,23 @@ class ZMQStreamHandler(WebSocketMixin, WebSocketHandler): self.stream.close() - def _reserialize_reply(self, msg_list, channel=None): + def _reserialize_reply(self, msg_or_list, channel=None): """Reserialize a reply message using JSON. - This takes the msg list from the ZMQ socket, deserializes it using - self.session and then serializes the result using JSON. This method - should be used by self._on_zmq_reply to build messages that can + msg_or_list can be an already-deserialized msg dict or the zmq buffer list. + If it is the zmq list, it will be deserialized with self.session. + + This takes the msg list from the ZMQ socket and serializes the result for the websocket. + This method should be used by self._on_zmq_reply to build messages that can be sent back to the browser. + """ - idents, msg_list = self.session.feed_identities(msg_list) - msg = self.session.deserialize(msg_list) + if isinstance(msg_or_list, dict): + # already unpacked + msg = msg_or_list + else: + idents, msg_list = self.session.feed_identities(msg_or_list) + msg = self.session.deserialize(msg_list) if channel: msg['channel'] = channel if msg['buffers']: diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index fa7a9283b..aa5b31979 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -269,9 +269,10 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): def _on_zmq_reply(self, stream, msg_list): idents, fed_msg_list = self.session.feed_identities(msg_list) + msg = self.session.deserialize(fed_msg_list) + parent = msg['parent_header'] def write_stderr(error_message): self.log.warn(error_message) - parent = json.loads(fed_msg_list[2]) msg = self.session.msg("stream", content={"text": error_message, "name": "stderr"}, parent=parent @@ -280,7 +281,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.write_message(json.dumps(msg, default=date_default)) channel = getattr(stream, 'channel', None) - msg_type = json.loads(fed_msg_list[1])['msg_type'] + msg_type = msg['header']['msg_type'] if channel == 'iopub' and msg_type not in {'status', 'comm_open', 'execute_input'}: # Remove the counts queued for removal. @@ -345,7 +346,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # If either of the limit flags are set, do not send the message. if self._iopub_msgs_exceeded or self._iopub_data_exceeded: return - super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg_list) + super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg) def on_close(self):