Allow replacing stale websocket connections

instead of refusing duplecate connections,
since reconnect on flaky network is more likely.
Min RK 10 years ago
parent 6c4687bc8e
commit b6e8a3732e

@ -100,7 +100,6 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
# class-level registry of open sessions
# allows checking for conflict on session-id,
# which is used as a zmq identity and must be unique.
session_key = ''
_open_sessions = {}
@property
@ -200,6 +199,8 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self.kernel_id = None
self.kernel_info_channel = None
self._kernel_info_future = Future()
self._close_future = Future()
self.session_key = ''
# Rate limiting code
self._iopub_window_msg_count = 0
@ -216,7 +217,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
# authenticate first
super(ZMQChannelsHandler, self).pre_get()
# check session collision:
self._register_session()
yield self._register_session()
# then request kernel info, waiting up to a certain time before giving up.
# We don't want to wait forever, because browsers don't take it well when
# servers never respond to websocket connection requests.
@ -240,11 +241,19 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self.kernel_id = cast_unicode(kernel_id, 'ascii')
yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
@gen.coroutine
def _register_session(self):
"""Ensure we aren't creating a duplicate session"""
"""Ensure we aren't creating a duplicate session.
If a previous identical session is still open, close it to avoid collisions.
This is likely due to a client reconnecting from a lost network connection,
where the socket on our side has not been cleaned up yet.
"""
self.session_key = '%s:%s' % (self.kernel_id, self.session.session)
if self.session_key in self._open_sessions:
raise web.HTTPError(400, "Session %s already open." % self.session_key)
stale_handler = self._open_sessions.get(self.session_key)
if stale_handler:
self.log.warning("Replacing stale connection: %s", self.session_key)
yield stale_handler.close()
self._open_sessions[self.session_key] = self
def open(self, kernel_id):
@ -363,6 +372,9 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
return
super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg)
def close(self):
super(ZMQChannelsHandler, self).close()
return self._close_future
def on_close(self):
self.log.debug("Websocket closed %s", self.session_key)
@ -389,6 +401,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
socket.close()
self.channels = {}
self._close_future.set_result(None)
def _send_status_message(self, status):
msg = self.session.msg("status",

Loading…
Cancel
Save