diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index b0c2d3150..98c7332b0 100644 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -790,11 +790,11 @@ class NotebookApp(JupyterApp): help="Reraise exceptions encountered loading server extensions?", ) - iopub_msg_rate_limit = Float(0, config=True, allow_none=True, help="""(msg/sec) + iopub_msg_rate_limit = Float(0, config=True, help="""(msg/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") - iopub_data_rate_limit = Float(0, config=True, allow_none=True, help="""(bytes/sec) + iopub_data_rate_limit = Float(0, config=True, help="""(bytes/sec) Maximum rate at which messages can be sent on iopub before they are limited.""") diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index 9a7a33a7c..afadecd34 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -196,15 +196,14 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self._kernel_info_future = Future() # Rate limiting code - self._iopub_msg_count = 0 - self._iopub_byte_count = 0 + self._iopub_window_msg_count = 0 + self._iopub_window_byte_count = 0 self._iopub_msgs_exceeded = False self._iopub_data_exceeded = False - # Queue of (time stamp, delta) - # Allows you to specify that the msg or byte counts should be adjusted + # Queue of (time stamp, byte count) + # Allows you to specify that the byte count should be lowered # by a delta amount at some point in the future. - self._queue_msg_count = [] - self._queue_byte_count = [] + self._iopub_window_byte_queue = [] @gen.coroutine def pre_get(self): @@ -284,40 +283,31 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): if channel == 'iopub': # Remove the counts queued for removal. - while len(self._queue_msg_count) > 0: - queued = self._queue_msg_count[0] - if (IOLoop.current().time() >= queued[0]): - self._iopub_msg_count += queued[1] - del self._queue_msg_count[0] - else: - # This part of the queue hasn't be reached yet, so we can - # abort the loop. - break - - while len(self._queue_byte_count) > 0: - queued = self._queue_byte_count[0] - if (IOLoop.current().time() >= queued[0]): - self._iopub_byte_count += queued[1] - del self._queue_byte_count[0] + now = IOLoop.current().time() + while len(self._iopub_window_byte_queue) > 0: + queued = self._iopub_window_byte_queue[0] + if (now >= queued[0]): + self._iopub_window_byte_count -= queued[1] + self._iopub_window_msg_count -= 1 + del self._iopub_window_byte_queue[0] else: # This part of the queue hasn't be reached yet, so we can # abort the loop. break # Increment the bytes and message count - self._iopub_msg_count += 1 + self._iopub_window_msg_count += 1 byte_count = sum([len(x) for x in msg_list]) - self._iopub_byte_count += byte_count + self._iopub_window_byte_count += byte_count # Queue a removal of the byte and message count for a time in the # future, when we are no longer interested in it. - self._queue_msg_count.append((IOLoop.current().time() + self.limit_window, -1)) - self._queue_byte_count.append((IOLoop.current().time() + self.limit_window, -byte_count)) + self._iopub_window_byte_queue.append((now + self.limit_window, byte_count)) # Check the limits, set the limit flags, and reset the # message and data counts. - msg_rate = float(self._iopub_msg_count) / self.limit_window - data_rate = float(self._iopub_byte_count) / self.limit_window + msg_rate = float(self._iopub_window_msg_count) / self.limit_window + data_rate = float(self._iopub_window_byte_count) / self.limit_window # Check the msg rate if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit and self.iopub_msg_rate_limit > 0: