Different clients now share a single zmq session.

Previously, each client (browser window) would open its own set
of ZMQ sockets to a kernel. Now one master set of connections
to the kernel is created and all clients share those connections.
In some ways, this simplifies the URL design.

I have also made kernel_ids server-side created.
pull/37/head
Brian Granger 15 years ago committed by Brian E. Granger
parent 00b857165e
commit 49c970cddf

@ -1,5 +1,6 @@
import signal
import sys
import uuid
from IPython.zmq.ipkernel import launch_kernel
from session import SessionManager
@ -30,9 +31,8 @@ class KernelManager(object):
else:
return False
def start_kernel(self, kernel_id):
if kernel_id in self._kernels:
raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
def start_kernel(self):
kernel_id = str(uuid.uuid4())
(process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
d = dict(
process = process,

@ -3,6 +3,8 @@ import json
import logging
import os
import urllib
import uuid
from Queue import Queue
import zmq
@ -21,8 +23,7 @@ from kernelmanager import KernelManager
options.define("port", default=8888, help="run on the given port", type=int)
_session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)"
_kernel_id_regex = r"(?P<kernel_id>\w+)"
_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
class MainHandler(web.RequestHandler):
@ -30,79 +31,79 @@ class MainHandler(web.RequestHandler):
self.render('notebook.html')
class BaseKernelHandler(object):
def get_kernel(self):
return self.application.kernel_manager
def get_session(self, kernel_id):
km = self.get_kernel()
sm = km.get_session_manager(kernel_id)
return sm
class KernelHandler(web.RequestHandler, BaseKernelHandler):
class KernelHandler(web.RequestHandler):
def get(self):
self.write(json.dumps(self.get_kernel().kernel_ids))
self.write(json.dumps(self.application.kernel_ids))
def post(self, *args, **kwargs):
kernel_id = kwargs['kernel_id']
self.get_kernel().start_kernel(kernel_id)
logging.info("Starting kernel: %s" % kernel_id)
def post(self):
kernel_id = self.application.start_kernel()
self.application.start_session(kernel_id)
self.write(json.dumps(kernel_id))
class SessionHandler(web.RequestHandler, BaseKernelHandler):
class ZMQStreamRouter(object):
def get(self, *args, **kwargs):
kernel_id = kwargs['kernel_id']
self.write(json.dumps(self.get_session(kernel_id).session_ids))
def __init__(self, zmq_stream):
self.zmq_stream = zmq_stream
self._clients = {}
self.zmq_stream.on_recv(self._on_zmq_reply)
def post(self, *args, **kwargs):
kernel_id = kwargs['kernel_id']
sm = self.get_session(kernel_id)
session_id = sm.start_session()
logging.info("Starting session: %s, %s" % (kernel_id, session_id))
self.write(json.dumps(session_id))
def register_client(self, client):
client_id = uuid.uuid4()
self._clients[client_id] = client
return client_id
def unregister_client(self, client_id):
del self._clients[client_id]
class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler):
stream_name = ''
class IOPubStreamRouter(ZMQStreamRouter):
def open(self, *args, **kwargs):
kernel_id = kwargs['kernel_id']
session_id = kwargs['session_id']
logging.info("Connection open: %s, %s" % (kernel_id,session_id))
sm = self.get_session(kernel_id)
method_name = "get_%s_stream" % self.stream_name
method = getattr(sm, method_name)
self.zmq_stream = method(session_id)
self.zmq_stream.on_recv(self._on_zmq_reply)
def _on_zmq_reply(self, msg_list):
for client_id, client in self._clients.items():
for msg in msg_list:
client.write_message(msg)
def on_message(self, msg):
logging.info("Message received: %r, %r" % (msg, self.__class__))
logging.info(self.zmq_stream)
self.zmq_stream.send_unicode(msg)
def forward_unicode(self, client_id, msg):
# This is a SUB stream that we should never write to.
pass
def on_close(self):
self.zmq_stream.close()
class ShellStreamRouter(ZMQStreamRouter):
def __init__(self, zmq_stream):
ZMQStreamRouter.__init__(self, zmq_stream)
self._request_queue = Queue()
def _on_zmq_reply(self, msg_list):
for msg in msg_list:
logging.info("Message reply: %r" % msg)
self.write_message(msg)
client_id = self._request_queue.get(block=False)
client = self._clients.get(client_id)
if client is not None:
for msg in msg_list:
client.write_message(msg)
def forward_unicode(self, client_id, msg):
self._request_queue.put(client_id)
self.zmq_stream.send_unicode(msg)
class IOPubStreamHandler(ZMQStreamHandler):
class ZMQStreamHandler(websocket.WebSocketHandler):
stream_name = 'iopub'
def initialize(self, stream_name):
self.stream_name = stream_name
def open(self, kernel_id):
self.router = self.application.get_router(kernel_id, self.stream_name)
self.client_id = self.router.register_client(self)
logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
class ShellStreamHandler(ZMQStreamHandler):
def on_message(self, msg):
self.router.forward_unicode(self.client_id, msg)
stream_name = 'shell'
def on_close(self):
self.router.unregister_client(self.client_id)
logging.info("Connection closed: %s" % self.client_id)
class NotebookRootHandler(web.RequestHandler):
@ -157,10 +158,9 @@ class NotebookApplication(web.Application):
def __init__(self):
handlers = [
(r"/", MainHandler),
(r"/kernels/%s" % (_kernel_id_regex,), KernelHandler),
(r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler),
(r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler),
(r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler),
(r"/kernels", KernelHandler),
(r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
(r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
(r"/notebooks", NotebookRootHandler),
(r"/notebooks/([^/]+)", NotebookHandler)
]
@ -169,8 +169,46 @@ class NotebookApplication(web.Application):
static_path=os.path.join(os.path.dirname(__file__), "static"),
)
web.Application.__init__(self, handlers, **settings)
self.context = zmq.Context()
self.kernel_manager = KernelManager(self.context)
self._session_dict = {}
self._routers = {}
#-------------------------------------------------------------------------
# Methods for managing kernels and sessions
#-------------------------------------------------------------------------
@property
def kernel_ids(self):
return self.kernel_manager.kernel_ids
def start_kernel(self):
kernel_id = self.kernel_manager.start_kernel()
logging.info("Kernel started: %s" % kernel_id)
return kernel_id
def start_session(self, kernel_id):
sm = self.kernel_manager.get_session_manager(kernel_id)
session_id = sm.start_session()
self._session_dict[kernel_id] = session_id
iopub_stream = sm.get_iopub_stream(session_id)
shell_stream = sm.get_shell_stream(session_id)
iopub_router = IOPubStreamRouter(iopub_stream)
shell_router = ShellStreamRouter(shell_stream)
self._routers[(kernel_id, session_id, 'iopub')] = iopub_router
self._routers[(kernel_id, session_id, 'shell')] = shell_router
logging.info("Session started: %s, %s" % (kernel_id, session_id))
def stop_session(self, kernel_id):
# TODO: finish this!
sm = self.kernel_manager.get_session_manager(kernel_id)
session_id = self._session_dict[kernel_id]
def get_router(self, kernel_id, stream_name):
session_id = self._session_dict[kernel_id]
router = self._routers[(kernel_id, session_id, stream_name)]
return router
def main():

@ -92,7 +92,6 @@ var Notebook = function (selector) {
this.element.scroll();
this.element.data("notebook", this);
this.next_prompt_number = 1;
this.next_kernel_number = 0;
this.kernel = null;
this.msg_cell_map = {};
this.bind_events();
@ -429,20 +428,13 @@ Notebook.prototype.expand = function (index) {
// Kernel related things
Notebook.prototype.start_kernel = function () {
this.kernel = new Kernel("kernel" + this.next_kernel_number);
this.next_kernel_number = this.next_kernel_number + 1;
this.kernel = new Kernel();
this.kernel.start_kernel(this._kernel_started, this);
};
Notebook.prototype._kernel_started = function () {
console.log("Kernel started: ", this.kernel.kernel_id);
this.kernel.start_session(this._session_started, this);
};
Notebook.prototype._session_started = function () {
console.log("Session started: ", this.kernel.session_id);
var that = this;
this.kernel.shell_channel.onmessage = function (e) {
@ -711,11 +703,10 @@ TextCell.prototype.config_mathjax = function () {
//============================================================================
var Kernel = function (kernel_id) {
this.kernel_id = kernel_id;
var Kernel = function () {
this.kernel_id = null;
this.base_url = "/kernels";
this.kernel_url = this.base_url + "/" + this.kernel_id
this.session_id = null;
this.kernel_url = null;
};
@ -734,32 +725,26 @@ Kernel.prototype.get_msg = function (msg_type, content) {
}
Kernel.prototype.start_kernel = function (callback, context) {
$.post(this.kernel_url, function () {
callback.call(context);
});
};
Kernel.prototype.start_session = function (callback, context) {
var that = this;
$.post(this.kernel_url + "/sessions",
function (session_id) {
that._handle_start_session(session_id, callback, context);
},
'json');
}
$.post(this.base_url,
function (kernel_id) {
that._handle_start_kernel(kernel_id, callback, context);
},
'json'
);
};
Kernel.prototype._handle_start_session = function (session_id, callback, context) {
this.session_id = session_id;
this.session_url = this.kernel_url + "/sessions/" + this.session_id;
Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) {
this.kernel_id = kernel_id;
this.kernel_url = this.base_url + "/" + this.kernel_id;
this._start_channels();
callback.call(context);
};
Kernel.prototype._start_channels = function () {
var ws_url = "ws://127.0.0.1:8888" + this.session_url;
var ws_url = "ws://127.0.0.1:8888" + this.kernel_url;
this.shell_channel = new WebSocket(ws_url + "/shell");
this.iopub_channel = new WebSocket(ws_url + "/iopub");
}

Loading…
Cancel
Save