@ -15,7 +15,12 @@ from tornado.concurrent import Future
from tornado . ioloop import IOLoop
from jupyter_client import protocol_version as client_protocol_version
from jupyter_client . jsonutil import date_default
try :
from jupyter_client . jsonutil import json_default
except ImportError :
from jupyter_client . jsonutil import (
date_default as json_default
)
from ipython_genutils . py3compat import cast_unicode
from notebook . utils import maybe_future , url_path_join , url_escape
@ -29,7 +34,7 @@ class MainKernelHandler(APIHandler):
def get ( self ) :
km = self . kernel_manager
kernels = yield maybe_future ( km . list_kernels ( ) )
self . finish ( json . dumps ( kernels , default = date _default) )
self . finish ( json . dumps ( kernels , default = json _default) )
@web.authenticated
@gen.coroutine
@ -48,7 +53,7 @@ class MainKernelHandler(APIHandler):
location = url_path_join ( self . base_url , ' api ' , ' kernels ' , url_escape ( kernel_id ) )
self . set_header ( ' Location ' , location )
self . set_status ( 201 )
self . finish ( json . dumps ( model , default = date _default) )
self . finish ( json . dumps ( model , default = json _default) )
class KernelHandler ( APIHandler ) :
@ -58,7 +63,7 @@ class KernelHandler(APIHandler):
def get ( self , kernel_id ) :
km = self . kernel_manager
model = yield maybe_future ( km . kernel_model ( kernel_id ) )
self . finish ( json . dumps ( model , default = date _default) )
self . finish ( json . dumps ( model , default = json _default) )
@web.authenticated
@gen.coroutine
@ -87,7 +92,7 @@ class KernelActionHandler(APIHandler):
self . set_status ( 500 )
else :
model = yield maybe_future ( km . kernel_model ( kernel_id ) )
self . write ( json . dumps ( model , default = date _default) )
self . write ( json . dumps ( model , default = json _default) )
self . finish ( )
@ -95,7 +100,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
''' There is one ZMQChannelsHandler per running kernel and it oversees all
the sessions .
'''
# class-level registry of open sessions
# allows checking for conflict on session-id,
# which is used as a zmq identity and must be unique.
@ -128,7 +133,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
meth = getattr ( km , " connect_ " + channel )
self . channels [ channel ] = stream = meth ( self . kernel_id , identity = identity )
stream . channel = channel
def nudge ( self ) :
""" Nudge the zmq connections with kernel_info_requests
@ -268,7 +273,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
def _handle_kernel_info_reply ( self , msg ) :
""" process the kernel_info_reply
enabling msg spec adaptation , if necessary
"""
idents , msg = self . session . feed_identities ( msg )
@ -285,15 +290,15 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . log . error ( " Kernel info request failed, assuming current %s " , info )
info = { }
self . _finish_kernel_info ( info )
# close the kernel_info channel, we don't need it anymore
if self . kernel_info_channel :
self . kernel_info_channel . close ( )
self . kernel_info_channel = None
def _finish_kernel_info ( self , info ) :
""" Finish handling kernel_info reply
Set up protocol adaptation , if needed ,
and signal that connection can continue .
"""
@ -303,7 +308,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . log . info ( " Adapting from protocol version {protocol_version} (kernel {kernel_id} ) to {client_protocol_version} (client). " . format ( protocol_version = protocol_version , kernel_id = self . kernel_id , client_protocol_version = client_protocol_version ) )
if not self . _kernel_info_future . done ( ) :
self . _kernel_info_future . set_result ( info )
def initialize ( self ) :
super ( ) . initialize ( )
self . zmq_stream = None
@ -336,7 +341,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
kernel = self . kernel_manager . get_kernel ( self . kernel_id )
self . session . key = kernel . session . key
future = self . request_kernel_info ( )
def give_up ( ) :
""" Don ' t wait forever for the kernel to reply """
if future . done ( ) :
@ -347,16 +352,16 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
loop . add_timeout ( loop . time ( ) + self . kernel_info_timeout , give_up )
# actually wait for it
yield future
@gen.coroutine
def get ( self , kernel_id ) :
self . kernel_id = cast_unicode ( kernel_id , ' ascii ' )
yield super ( ) . get ( kernel_id = kernel_id )
@gen.coroutine
def _register_session ( self ) :
""" Ensure we aren ' t creating a duplicate session.
If a previous identical session is still open , close it to avoid collisions .
This is likely due to a client reconnecting from a lost network connection ,
where the socket on our side has not been cleaned up yet .
@ -379,7 +384,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . log . info ( " Restoring connection for %s " , self . session_key )
self . channels = buffer_info [ ' channels ' ]
connected = self . nudge ( )
def replay ( value ) :
replay_buffer = buffer_info [ ' buffer ' ]
if replay_buffer :
@ -449,7 +454,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
parent = parent
)
msg [ ' channel ' ] = ' iopub '
self . write_message ( json . dumps ( msg , default = date _default) )
self . write_message ( json . dumps ( msg , default = json _default) )
channel = getattr ( stream , ' channel ' , None )
msg_type = msg [ ' header ' ] [ ' msg_type ' ]
@ -463,7 +468,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . _iopub_data_exceeded = False
if channel == ' iopub ' and msg_type not in { ' status ' , ' comm_open ' , ' execute_input ' } :
# Remove the counts queued for removal.
now = IOLoop . current ( ) . time ( )
while len ( self . _iopub_window_byte_queue ) > 0 :
@ -484,16 +489,16 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
else :
byte_count = 0
self . _iopub_window_byte_count + = byte_count
# Queue a removal of the byte and message count for a time in the
# Queue a removal of the byte and message count for a time in the
# future, when we are no longer interested in it.
self . _iopub_window_byte_queue . append ( ( now + self . rate_limit_window , byte_count ) )
# Check the limits, set the limit flags, and reset the
# message and data counts.
msg_rate = float ( self . _iopub_window_msg_count ) / self . rate_limit_window
data_rate = float ( self . _iopub_window_byte_count ) / self . rate_limit_window
# Check the msg rate
if self . iopub_msg_rate_limit > 0 and msg_rate > self . iopub_msg_rate_limit :
if not self . _iopub_msgs_exceeded :
@ -504,7 +509,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
to the client in order to avoid crashing it .
To change this limit , set the config variable
` - - NotebookApp . iopub_msg_rate_limit ` .
Current values :
NotebookApp . iopub_msg_rate_limit = { } ( msgs / sec )
NotebookApp . rate_limit_window = { } ( secs )
@ -526,7 +531,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
to the client in order to avoid crashing it .
To change this limit , set the config variable
` - - NotebookApp . iopub_data_rate_limit ` .
Current values :
NotebookApp . iopub_data_rate_limit = { } ( bytes / sec )
NotebookApp . rate_limit_window = { } ( secs )
@ -537,7 +542,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self . _iopub_data_exceeded = False
if not self . _iopub_msgs_exceeded :
self . log . warning ( " iopub messages resumed " )
# If either of the limit flags are set, do not send the message.
if self . _iopub_msgs_exceeded or self . _iopub_data_exceeded :
# we didn't send it, remove the current message from the calculus
@ -595,7 +600,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
{ ' execution_state ' : status }
)
msg [ ' channel ' ] = ' iopub '
self . write_message ( json . dumps ( msg , default = date _default) )
self . write_message ( json . dumps ( msg , default = json _default) )
def on_kernel_restarted ( self ) :
logging . warn ( " kernel %s restarted " , self . kernel_id )