|
|
|
|
@ -200,7 +200,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
self._iopub_msg_count = 0
|
|
|
|
|
self._iopub_byte_count = 0
|
|
|
|
|
self._iopub_msgs_exceeded = False
|
|
|
|
|
self._iopub_bytes_exceeded = False
|
|
|
|
|
self._iopub_data_exceeded = False
|
|
|
|
|
self._last_limit_check = time.time()
|
|
|
|
|
|
|
|
|
|
@gen.coroutine
|
|
|
|
|
@ -251,7 +251,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
# already closed, ignore the message
|
|
|
|
|
self.log.debug("Received message on closed websocket %r", msg)
|
|
|
|
|
return
|
|
|
|
|
length = len(msg)
|
|
|
|
|
if isinstance(msg, bytes):
|
|
|
|
|
msg = deserialize_binary_message(msg)
|
|
|
|
|
else:
|
|
|
|
|
@ -263,57 +262,88 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
|
|
|
|
|
if channel not in self.channels:
|
|
|
|
|
self.log.warn("No such channel: %r", channel)
|
|
|
|
|
return
|
|
|
|
|
stream = self.channels[channel]
|
|
|
|
|
self.session.send(stream, msg)
|
|
|
|
|
|
|
|
|
|
def _on_zmq_reply(self, stream, msg_list):
|
|
|
|
|
|
|
|
|
|
def write_stderr(error_message):
|
|
|
|
|
parent = json.loads(msg_list[-3])
|
|
|
|
|
msg = self.session.msg("stream",
|
|
|
|
|
content={"text": error_message, "name": "stderr"},
|
|
|
|
|
parent=parent
|
|
|
|
|
)
|
|
|
|
|
msg['channel'] = 'iopub'
|
|
|
|
|
self.write_message(json.dumps(msg, default=date_default))
|
|
|
|
|
self.log.warn(error_message)
|
|
|
|
|
|
|
|
|
|
channel = getattr(stream, 'channel', None)
|
|
|
|
|
if channel == 'iopub':
|
|
|
|
|
|
|
|
|
|
# If the elapsed time is double the check interval, reset the counts,
|
|
|
|
|
# lock flags, and reset the timer.
|
|
|
|
|
elapsed_time = (time.time() - self._last_limit_check)
|
|
|
|
|
if elapsed_time >= self.limit_poll_interval * 2.0:
|
|
|
|
|
self._iopub_msg_count = 0
|
|
|
|
|
self._iopub_byte_count = 0
|
|
|
|
|
if self._iopub_msgs_exceeded or self._iopub_data_exceeded:
|
|
|
|
|
self._iopub_data_exceeded = False
|
|
|
|
|
self._iopub_msgs_exceeded = False
|
|
|
|
|
self.log.warn("iopub messages resumed")
|
|
|
|
|
|
|
|
|
|
# Increment the bytes and message count
|
|
|
|
|
self._iopub_msg_count += 1
|
|
|
|
|
self._iopub_byte_count += length
|
|
|
|
|
|
|
|
|
|
self._iopub_byte_count += sum([len(x) for x in msg_list])
|
|
|
|
|
|
|
|
|
|
# If the elapsed time is equal to or greater than the limiter
|
|
|
|
|
# interval, check the limits, set the limit flags, and reset the
|
|
|
|
|
# message and data counts.
|
|
|
|
|
elapsed_time = (time.time() - self._last_limit_check).seconds()
|
|
|
|
|
if elapsed_time >= self.limit_poll_interval:
|
|
|
|
|
self._last_limit_check = time.time()
|
|
|
|
|
|
|
|
|
|
msg_rate = self._iopub_msg_count / elapsed_time
|
|
|
|
|
data_rate = self._iopub_byte_count / elapsed_time
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
msg_rate = float(self._iopub_msg_count) / elapsed_time
|
|
|
|
|
data_rate = float(self._iopub_byte_count) / elapsed_time
|
|
|
|
|
self.log.warn('check msg/s{} & bytes/s{}'.format(msg_rate, data_rate))
|
|
|
|
|
self._iopub_msg_count = 0
|
|
|
|
|
self._iopub_byte_count = 0
|
|
|
|
|
|
|
|
|
|
# Check the msg rate
|
|
|
|
|
if self.iopub_msg_rate_limit is not None and msg_rate > self.iopub_msg_rate_limit:
|
|
|
|
|
if not self._iopub_msgs_exceeded:
|
|
|
|
|
self._iopub_msgs_exceeded = True
|
|
|
|
|
self.log.warn("""iopub message rate exceeded. The
|
|
|
|
|
write_stderr("""iopub message rate exceeded. The
|
|
|
|
|
notebook server will temporarily stop sending iopub
|
|
|
|
|
messages to the client in order to avoid crashing it.
|
|
|
|
|
To change this limit, set the config variable
|
|
|
|
|
`--NotebookApp.iopub_msg_rate_limit`.""")
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
if self._iopub_msgs_exceeded:
|
|
|
|
|
self._iopub_msgs_exceeded = False
|
|
|
|
|
if not self._iopub_data_exceeded:
|
|
|
|
|
self.log.warn("""iopub messages resumed""")
|
|
|
|
|
|
|
|
|
|
self.log.warn("iopub messages resumed")
|
|
|
|
|
|
|
|
|
|
# Check the data rate
|
|
|
|
|
if self.iopub_data_rate_limit is not None and data_rate > self.iopub_data_rate_limit:
|
|
|
|
|
if not self._iopub_data_exceeded:
|
|
|
|
|
self._iopub_data_exceeded = True
|
|
|
|
|
self.log.warn("""iopub data rate exceeded. The
|
|
|
|
|
write_stderr("""iopub data rate exceeded. The
|
|
|
|
|
notebook server will temporarily stop sending iopub
|
|
|
|
|
messages to the client in order to avoid crashing it.
|
|
|
|
|
To change this limit, set the config variable
|
|
|
|
|
`--NotebookApp.iopub_data_rate_limit`.""")
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
if self._iopub_data_exceeded:
|
|
|
|
|
self._iopub_data_exceeded = False
|
|
|
|
|
if not self._iopub_msgs_exceeded:
|
|
|
|
|
self.log.warn("""iopub messages resumed""")
|
|
|
|
|
|
|
|
|
|
self.log.warn("iopub messages resumed")
|
|
|
|
|
|
|
|
|
|
# If either of the limit flags are set, do not send the message.
|
|
|
|
|
if self._iopub_msgs_exceeded or self._iopub_data_exceeded:
|
|
|
|
|
return
|
|
|
|
|
super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg_list)
|
|
|
|
|
|
|
|
|
|
stream = self.channels[channel]
|
|
|
|
|
self.session.send(stream, msg)
|
|
|
|
|
|
|
|
|
|
def on_close(self):
|
|
|
|
|
km = self.kernel_manager
|
|
|
|
|
|