diff --git a/notebook/services/kernels/handlers.py b/notebook/services/kernels/handlers.py index c64a9853c..1087aad18 100644 --- a/notebook/services/kernels/handlers.py +++ b/notebook/services/kernels/handlers.py @@ -100,7 +100,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # class-level registry of open sessions # allows checking for conflict on session-id, # which is used as a zmq identity and must be unique. - session_key = '' _open_sessions = {} @property @@ -200,6 +199,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.kernel_id = None self.kernel_info_channel = None self._kernel_info_future = Future() + self._close_future = Future() + self.session_key = '' # Rate limiting code self._iopub_window_msg_count = 0 @@ -216,7 +217,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): # authenticate first super(ZMQChannelsHandler, self).pre_get() # check session collision: - self._register_session() + yield self._register_session() # then request kernel info, waiting up to a certain time before giving up. # We don't want to wait forever, because browsers don't take it well when # servers never respond to websocket connection requests. @@ -240,11 +241,19 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): self.kernel_id = cast_unicode(kernel_id, 'ascii') yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id) + @gen.coroutine def _register_session(self): - """Ensure we aren't creating a duplicate session""" + """Ensure we aren't creating a duplicate session. + + If a previous identical session is still open, close it to avoid collisions. + This is likely due to a client reconnecting from a lost network connection, + where the socket on our side has not been cleaned up yet. + """ self.session_key = '%s:%s' % (self.kernel_id, self.session.session) - if self.session_key in self._open_sessions: - raise web.HTTPError(400, "Session %s already open." % self.session_key) + stale_handler = self._open_sessions.get(self.session_key) + if stale_handler: + self.log.warning("Replacing stale connection: %s", self.session_key) + yield stale_handler.close() self._open_sessions[self.session_key] = self def open(self, kernel_id): @@ -363,6 +372,9 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): return super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg) + def close(self): + super(ZMQChannelsHandler, self).close() + return self._close_future def on_close(self): self.log.debug("Websocket closed %s", self.session_key) @@ -389,6 +401,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler): socket.close() self.channels = {} + self._close_future.set_result(None) def _send_status_message(self, status): msg = self.session.msg("status",