diff --git a/IPython/frontend/html/notebook/handlers.py b/IPython/frontend/html/notebook/handlers.py index 7848c1dd7..31a56a4bf 100644 --- a/IPython/frontend/html/notebook/handlers.py +++ b/IPython/frontend/html/notebook/handlers.py @@ -519,23 +519,28 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler): self.on_message = self.save_on_message -class IOPubHandler(AuthenticatedZMQStreamHandler): - +class ZMQChannelHandler(AuthenticatedZMQStreamHandler): + + @property + def max_msg_size(self): + return self.settings.get('max_msg_size', 65535) + + def create_stream(self): + km = self.kernel_manager + meth = getattr(km, 'connect_%s' % self.channel) + self.zmq_stream = meth(self.kernel_id) + def initialize(self, *args, **kwargs): - self.iopub_stream = None - + self.zmq_stream = None + def on_first_message(self, msg): try: - super(IOPubHandler, self).on_first_message(msg) + super(ZMQChannelHandler, self).on_first_message(msg) except web.HTTPError: self.close() return - km = self.kernel_manager - kernel_id = self.kernel_id - km.add_restart_callback(kernel_id, self.on_kernel_restarted) - km.add_restart_callback(kernel_id, self.on_restart_failed, 'dead') try: - self.iopub_stream = km.connect_iopub(kernel_id) + self.create_stream() except web.HTTPError: # WebSockets don't response to traditional error codes so we # close the connection. @@ -543,29 +548,32 @@ class IOPubHandler(AuthenticatedZMQStreamHandler): self.stream.close() self.close() else: - self.iopub_stream.on_recv(self._on_zmq_reply) + self.zmq_stream.on_recv(self._on_zmq_reply) def on_message(self, msg): - pass - - def _send_status_message(self, status): - msg = self.session.msg("status", - {'execution_state': status} - ) - self.write_message(jsonapi.dumps(msg, default=date_default)) - - def on_kernel_restarted(self): - self.log.warn("kernel %s restarted", self.kernel_id) - self._send_status_message('restarting') - - def on_restart_failed(self): - self.log.error("kernel %s restarted failed!", self.kernel_id) - self._send_status_message('dead') + if len(msg) < self.max_msg_size: + msg = jsonapi.loads(msg) + self.session.send(self.zmq_stream, msg) def on_close(self): # This method can be called twice, once by self.kernel_died and once # from the WebSocket close event. If the WebSocket connection is # closed before the ZMQ streams are setup, they could be None. + if self.zmq_stream is not None and not self.zmq_stream.closed(): + self.zmq_stream.on_recv(None) + self.zmq_stream.close() + + +class IOPubHandler(ZMQChannelHandler): + channel = 'iopub' + + def create_stream(self): + super(IOPubHandler, self).create_stream() + km = self.kernel_manager + km.add_restart_callback(self.kernel_id, self.on_kernel_restarted) + km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead') + + def on_close(self): km = self.kernel_manager if self.kernel_id in km: km.remove_restart_callback( @@ -574,48 +582,27 @@ class IOPubHandler(AuthenticatedZMQStreamHandler): km.remove_restart_callback( self.kernel_id, self.on_restart_failed, 'dead', ) - if self.iopub_stream is not None and not self.iopub_stream.closed(): - self.iopub_stream.on_recv(None) - self.iopub_stream.close() - - -class ShellHandler(AuthenticatedZMQStreamHandler): + super(IOPubHandler, self).on_close() - @property - def max_msg_size(self): - return self.settings.get('max_msg_size', 65535) - - def initialize(self, *args, **kwargs): - self.shell_stream = None + def _send_status_message(self, status): + msg = self.session.msg("status", + {'execution_state': status} + ) + self.write_message(jsonapi.dumps(msg, default=date_default)) - def on_first_message(self, msg): - try: - super(ShellHandler, self).on_first_message(msg) - except web.HTTPError: - self.close() - return - km = self.kernel_manager - kernel_id = self.kernel_id - try: - self.shell_stream = km.connect_shell(kernel_id) - except web.HTTPError: - # WebSockets don't response to traditional error codes so we - # close the connection. - if not self.stream.closed(): - self.stream.close() - self.close() - else: - self.shell_stream.on_recv(self._on_zmq_reply) + def on_kernel_restarted(self): + logging.warn("kernel %s restarted", self.kernel_id) + self._send_status_message('restarting') - def on_message(self, msg): - if len(msg) < self.max_msg_size: - msg = jsonapi.loads(msg) - self.session.send(self.shell_stream, msg) + def on_restart_failed(self): + logging.error("kernel %s restarted failed!", self.kernel_id) + self._send_status_message('dead') + +class ShellHandler(ZMQChannelHandler): + channel = 'shell' - def on_close(self): - # Make sure the stream exists and is not already closed. - if self.shell_stream is not None and not self.shell_stream.closed(): - self.shell_stream.close() +class StdinHandler(ZMQChannelHandler): + channel = 'stdin' #-----------------------------------------------------------------------------