@ -4,9 +4,11 @@
import os
import json
from tornado import gen
from socket import gaierror
from tornado import gen , web
from tornado . escape import json_encode , json_decode , url_escape
from tornado . httpclient import HTTPClient , AsyncHTTPClient , HTTPError
from tornado . simple_httpclient import HTTPTimeoutError
from . . services . kernels . kernelmanager import MappingKernelManager
from . . services . sessions . sessionmanager import SessionManager
@ -18,22 +20,10 @@ from traitlets import Instance, Unicode, Float, Bool, default, validate, TraitEr
from traitlets . config import SingletonConfigurable
@gen.coroutine
def fetch_gateway ( endpoint , * * kwargs ) :
""" Make an async request to kernel gateway endpoint. """
client = AsyncHTTPClient ( )
kwargs = Gateway . instance ( ) . load_connection_args ( * * kwargs )
response = yield client . fetch ( endpoint , * * kwargs )
raise gen . Return ( response )
class Gateway ( SingletonConfigurable ) :
""" This class manages the configuration. It ' s its own class so that we can avoid having command
line options of the likes ` - - GatewayKernelManager . connect_timeout ` and use the shorter and more
applicable ` - - Gateway . connect_timeout ` , etc . It also contains some helper methods to build
request arguments out of the various config options .
class GatewayClient ( SingletonConfigurable ) :
""" This class manages the configuration. It ' s its own singleton class so that we
can share these values across all objects . It also contains some helper methods
to build request arguments out of the various config options .
"""
@ -56,7 +46,7 @@ class Gateway(SingletonConfigurable):
# Ensure value, if present, starts with 'http'
if value is not None and len ( value ) > 0 :
if not str ( value ) . lower ( ) . startswith ( ' http ' ) :
raise TraitError ( " Gateway url must start with ' http ' : ' %r ' " % value )
raise TraitError ( " Gateway Client url must start with ' http ' : ' %r ' " % value )
return value
ws_url = Unicode ( default_value = None , allow_none = True , config = True ,
@ -80,7 +70,7 @@ class Gateway(SingletonConfigurable):
# Ensure value, if present, starts with 'ws'
if value is not None and len ( value ) > 0 :
if not str ( value ) . lower ( ) . startswith ( ' ws ' ) :
raise TraitError ( " Gateway ws_url must start with ' ws ' : ' %r ' " % value )
raise TraitError ( " Gateway Client ws_url must start with ' ws ' : ' %r ' " % value )
return value
kernels_endpoint_default_value = ' /api/kernels '
@ -204,9 +194,21 @@ class Gateway(SingletonConfigurable):
return bool ( os . environ . get ( self . validate_cert_env , str ( self . validate_cert_default_value ) ) not in [ ' no ' , ' false ' ] )
def __init__ ( self , * * kwargs ) :
super ( Gateway , self ) . __init__ ( * * kwargs )
super ( Gateway Client , self ) . __init__ ( * * kwargs )
self . _static_args = { } # initialized on first use
env_whitelist_default_value = ' '
env_whitelist_env = ' JUPYTER_GATEWAY_ENV_WHITELIST '
env_whitelist = Unicode ( default_value = env_whitelist_default_value , config = True ,
help = """ A comma-separated list of environment variable names that will be included, along with
their values , in the kernel startup request . The corresponding ` env_whitelist ` configuration
value must also be set on the Gateway server - since that configuration value indicates which
environmental values to make available to the kernel . ( JUPYTER_GATEWAY_ENV_WHITELIST env var ) """ )
@default ( ' env_whitelist ' )
def _env_whitelist_default ( self ) :
return os . environ . get ( self . env_whitelist_env , self . env_whitelist_default_value )
@property
def gateway_enabled ( self ) :
return bool ( self . url is not None and len ( self . url ) > 0 )
@ -242,6 +244,34 @@ class Gateway(SingletonConfigurable):
kwargs . update ( self . _static_args )
return kwargs
@gen.coroutine
def gateway_request ( endpoint , * * kwargs ) :
""" Make an async request to kernel gateway endpoint, returns a response """
client = AsyncHTTPClient ( )
kwargs = GatewayClient . instance ( ) . load_connection_args ( * * kwargs )
try :
response = yield client . fetch ( endpoint , * * kwargs )
# Trap a set of common exceptions so that we can inform the user that their Gateway url is incorrect
# or the server is not running.
# NOTE: We do this here since this handler is called during the Notebook's startup and subsequent refreshes
# of the tree view.
except ConnectionRefusedError :
raise web . HTTPError ( 503 , " Connection refused from Gateway server url ' {} ' . "
" Check to be sure the Gateway instance is running. " . format ( GatewayClient . instance ( ) . url ) )
except HTTPTimeoutError :
# This can occur if the host is valid (e.g., foo.com) but there's nothing there.
raise web . HTTPError ( 504 , " Timeout error attempting to connect to Gateway server url ' {} ' . " \
" Ensure gateway url is valid and the Gateway instance is running. " . format (
GatewayClient . instance ( ) . url ) )
except gaierror as e :
raise web . HTTPError ( 404 , " The Gateway server specified in the gateway_url ' {} ' doesn ' t appear to be valid. "
" Ensure gateway url is valid and the Gateway instance is running. " . format (
GatewayClient . instance ( ) . url ) )
raise gen . Return ( response )
class GatewayKernelManager ( MappingKernelManager ) :
""" Kernel manager that supports remote kernels hosted by Jupyter Kernel or Enterprise Gateway. """
@ -250,7 +280,7 @@ class GatewayKernelManager(MappingKernelManager):
def __init__ ( self , * * kwargs ) :
super ( GatewayKernelManager , self ) . __init__ ( * * kwargs )
self . base_endpoint = url_path_join ( Gateway . instance ( ) . url , Gateway . instance ( ) . kernels_endpoint )
self . base_endpoint = url_path_join ( Gateway Client . instance ( ) . url , Gateway Client . instance ( ) . kernels_endpoint )
def __contains__ ( self , kernel_id ) :
return kernel_id in self . _kernels
@ -291,18 +321,25 @@ class GatewayKernelManager(MappingKernelManager):
self . log . info ( ' Request start kernel: kernel_id= %s , path= " %s " ' , kernel_id , path )
if kernel_id is None :
if path is not None :
kwargs [ ' cwd ' ] = self . cwd_for_path ( path )
kernel_name = kwargs . get ( ' kernel_name ' , ' python3 ' )
kernel_url = self . _get_kernel_endpoint_url ( )
self . log . debug ( " Request new kernel at: %s " % kernel_url )
# Let KERNEL_USERNAME take precedent over http_user config option.
if os . environ . get ( ' KERNEL_USERNAME ' ) is None and GatewayClient . instance ( ) . http_user :
os . environ [ ' KERNEL_USERNAME ' ] = GatewayClient . instance ( ) . http_user
kernel_env = { k : v for ( k , v ) in dict ( os . environ ) . items ( ) if k . startswith ( ' KERNEL_ ' )
or k in os . environ . get ( ' GATEWAY_ENV_WHITELIST ' , ' ' ) . split ( " , " ) }
or k in GatewayClient. instance ( ) . env_whitelist . split ( " , " ) }
json_body = json_encode ( { ' name ' : kernel_name , ' env ' : kernel_env } )
response = yield fetch_ gateway( kernel_url , method = ' POST ' , body = json_body )
response = yield gateway_request ( kernel_url , method = ' POST ' , body = json_body )
kernel = json_decode ( response . body )
kernel_id = kernel [ ' id ' ]
self . log . info ( " Kernel started: %s " % kernel_id )
self . log . debug ( " Kernel args: %r " % kwargs )
else :
kernel = yield self . get_kernel ( kernel_id )
kernel_id = kernel [ ' id ' ]
@ -323,7 +360,7 @@ class GatewayKernelManager(MappingKernelManager):
kernel_url = self . _get_kernel_endpoint_url ( kernel_id )
self . log . debug ( " Request kernel at: %s " % kernel_url )
try :
response = yield fetch_ gateway( kernel_url , method = ' GET ' )
response = yield gateway_request ( kernel_url , method = ' GET ' )
except HTTPError as error :
if error . code == 404 :
self . log . warn ( " Kernel not found at: %s " % kernel_url )
@ -334,7 +371,7 @@ class GatewayKernelManager(MappingKernelManager):
else :
kernel = json_decode ( response . body )
self . _kernels [ kernel_id ] = kernel
self . log . info ( " Kernel retrieved: %s " % kernel )
self . log . debug ( " Kernel retrieved: %s " % kernel )
raise gen . Return ( kernel )
@gen.coroutine
@ -356,13 +393,13 @@ class GatewayKernelManager(MappingKernelManager):
""" Get a list of kernels. """
kernel_url = self . _get_kernel_endpoint_url ( )
self . log . debug ( " Request list kernels: %s " , kernel_url )
response = yield fetch_ gateway( kernel_url , method = ' GET ' )
response = yield gateway_request ( kernel_url , method = ' GET ' )
kernels = json_decode ( response . body )
self . _kernels = { x [ ' id ' ] : x for x in kernels }
raise gen . Return ( kernels )
@gen.coroutine
def shutdown_kernel ( self , kernel_id ):
def shutdown_kernel ( self , kernel_id , now = False , restart = False ):
""" Shutdown a kernel by its kernel uuid.
Parameters
@ -372,7 +409,7 @@ class GatewayKernelManager(MappingKernelManager):
"""
kernel_url = self . _get_kernel_endpoint_url ( kernel_id )
self . log . debug ( " Request shutdown kernel at: %s " , kernel_url )
response = yield fetch_ gateway( kernel_url , method = ' DELETE ' )
response = yield gateway_request ( kernel_url , method = ' DELETE ' )
self . log . debug ( " Shutdown kernel response: %d %s " , response . code , response . reason )
self . remove_kernel ( kernel_id )
@ -387,7 +424,7 @@ class GatewayKernelManager(MappingKernelManager):
"""
kernel_url = self . _get_kernel_endpoint_url ( kernel_id ) + ' /restart '
self . log . debug ( " Request restart kernel at: %s " , kernel_url )
response = yield fetch_ gateway( kernel_url , method = ' POST ' , body = json_encode ( { } ) )
response = yield gateway_request ( kernel_url , method = ' POST ' , body = json_encode ( { } ) )
self . log . debug ( " Restart kernel response: %d %s " , response . code , response . reason )
@gen.coroutine
@ -401,14 +438,15 @@ class GatewayKernelManager(MappingKernelManager):
"""
kernel_url = self . _get_kernel_endpoint_url ( kernel_id ) + ' /interrupt '
self . log . debug ( " Request interrupt kernel at: %s " , kernel_url )
response = yield fetch_ gateway( kernel_url , method = ' POST ' , body = json_encode ( { } ) )
response = yield gateway_request ( kernel_url , method = ' POST ' , body = json_encode ( { } ) )
self . log . debug ( " Interrupt kernel response: %d %s " , response . code , response . reason )
def shutdown_all ( self ):
def shutdown_all ( self , now = False ):
""" Shutdown all kernels. """
# Note: We have to make this sync because the NotebookApp does not wait for async.
shutdown_kernels = [ ]
kwargs = { ' method ' : ' DELETE ' }
kwargs = Gateway . instance ( ) . load_connection_args ( * * kwargs )
kwargs = Gateway Client . instance ( ) . load_connection_args ( * * kwargs )
client = HTTPClient ( )
for kernel_id in self . _kernels . keys ( ) :
kernel_url = self . _get_kernel_endpoint_url ( kernel_id )
@ -417,16 +455,19 @@ class GatewayKernelManager(MappingKernelManager):
response = client . fetch ( kernel_url , * * kwargs )
except HTTPError :
pass
self . log . debug ( " Delete kernel response: %d %s " , response . code , response . reason )
self . remove_kernel ( kernel_id )
else :
self . log . debug ( " Delete kernel response: %d %s " , response . code , response . reason )
shutdown_kernels . append ( kernel_id ) # avoid changing dict size during iteration
client . close ( )
for kernel_id in shutdown_kernels :
self . remove_kernel ( kernel_id )
class GatewayKernelSpecManager ( KernelSpecManager ) :
def __init__ ( self , * * kwargs ) :
super ( GatewayKernelSpecManager , self ) . __init__ ( * * kwargs )
self . base_endpoint = url_path_join ( Gateway . instance ( ) . url , Gateway . instance ( ) . kernelspecs_endpoint )
self . base_endpoint = url_path_join ( Gateway Client . instance ( ) . url , Gateway Client . instance ( ) . kernelspecs_endpoint )
def _get_kernelspecs_endpoint_url ( self , kernel_name = None ) :
""" Builds a url for the kernels endpoint
@ -440,12 +481,39 @@ class GatewayKernelSpecManager(KernelSpecManager):
return self . base_endpoint
@gen.coroutine
def get_all_specs ( self ) :
fetched_kspecs = yield self . list_kernel_specs ( )
# get the default kernel name and compare to that of this server.
# If different log a warning and reset the default. However, the
# caller of this method will still return this server's value until
# the next fetch of kernelspecs - at which time they'll match.
km = self . parent . kernel_manager
remote_default_kernel_name = fetched_kspecs . get ( ' default ' )
if remote_default_kernel_name != km . default_kernel_name :
self . log . info ( " Default kernel name on Gateway server ( {gateway_default} ) differs from "
" Notebook server ( {notebook_default} ). Updating to Gateway server ' s value. " .
format ( gateway_default = remote_default_kernel_name ,
notebook_default = km . default_kernel_name ) )
km . default_kernel_name = remote_default_kernel_name
# gateway doesn't support resources (requires transfer for use by NB client)
# so add `resource_dir` to each kernelspec and value of 'not supported in gateway mode'
remote_kspecs = fetched_kspecs . get ( ' kernelspecs ' )
for kernel_name , kspec_info in remote_kspecs . items ( ) :
if not kspec_info . get ( ' resource_dir ' ) :
kspec_info [ ' resource_dir ' ] = ' not supported in gateway mode '
remote_kspecs [ kernel_name ] . update ( kspec_info )
raise gen . Return ( remote_kspecs )
@gen.coroutine
def list_kernel_specs ( self ) :
""" Get a list of kernel specs. """
kernel_spec_url = self . _get_kernelspecs_endpoint_url ( )
self . log . debug ( " Request list kernel specs at: %s " , kernel_spec_url )
response = yield fetch_gateway ( kernel_spec_url , method = ' GET ' )
response = yield gateway_request ( kernel_spec_url , method = ' GET ' )
kernel_specs = json_decode ( response . body )
raise gen . Return ( kernel_specs )
@ -461,189 +529,27 @@ class GatewayKernelSpecManager(KernelSpecManager):
kernel_spec_url = self . _get_kernelspecs_endpoint_url ( kernel_name = str ( kernel_name ) )
self . log . debug ( " Request kernel spec at: %s " % kernel_spec_url )
try :
response = yield fetch_ gateway( kernel_spec_url , method = ' GET ' )
response = yield gateway_request ( kernel_spec_url , method = ' GET ' )
except HTTPError as error :
if error . code == 404 :
self . log . warn ( " Kernel spec not found at: %s " % kernel_spec_url )
kernel_spec = None
# Convert not found to KeyError since that's what the Notebook handler expects
# message is not used, but might as well make it useful for troubleshooting
raise KeyError ( ' kernelspec {kernel_name} not found on Gateway server at: {gateway_url} ' .
format ( kernel_name = kernel_name , gateway_url = GatewayClient . instance ( ) . url ) )
else :
raise
else :
kernel_spec = json_decode ( response . body )
raise gen . Return ( kernel_spec )
# Convert to instance of Kernelspec
kspec_instance = self . kernel_spec_class ( resource_dir = u ' ' , * * kernel_spec [ ' spec ' ] )
raise gen . Return ( kspec_instance )
class GatewaySessionManager ( SessionManager ) :
kernel_manager = Instance ( ' notebook.gateway.managers.GatewayKernelManager ' )
@gen.coroutine
def create_session ( self , path = None , name = None , type = None ,
kernel_name = None , kernel_id = None ) :
""" Creates a session and returns its model.
Overrides base class method to turn into an async operation .
"""
session_id = self . new_session_id ( )
kernel = None
if kernel_id is not None :
# This is now an async operation
kernel = yield self . kernel_manager . get_kernel ( kernel_id )
if kernel is not None :
pass
else :
kernel_id = yield self . start_kernel_for_session (
session_id , path , name , type , kernel_name ,
)
result = yield self . save_session (
session_id , path = path , name = name , type = type , kernel_id = kernel_id ,
)
raise gen . Return ( result )
@gen.coroutine
def save_session ( self , session_id , path = None , name = None , type = None ,
kernel_id = None ) :
""" Saves the items for the session with the given session_id
Given a session_id ( and any other of the arguments ) , this method
creates a row in the sqlite session database that holds the information
for a session .
Parameters
- - - - - - - - - -
session_id : str
uuid for the session ; this method must be given a session_id
path : str
the path for the given notebook
kernel_id : str
a uuid for the kernel associated with this session
Returns
- - - - - - -
model : dict
a dictionary of the session model
"""
# This is now an async operation
session = yield super ( GatewaySessionManager , self ) . save_session (
session_id , path = path , name = name , type = type , kernel_id = kernel_id
)
raise gen . Return ( session )
@gen.coroutine
def get_session ( self , * * kwargs ) :
""" Returns the model for a particular session.
Takes a keyword argument and searches for the value in the session
database , then returns the rest of the session ' s info.
Overrides base class method to turn into an async operation .
Parameters
- - - - - - - - - -
* * kwargs : keyword argument
must be given one of the keywords and values from the session database
( i . e . session_id , path , kernel_id )
Returns
- - - - - - -
model : dict
returns a dictionary that includes all the information from the
session described by the kwarg .
"""
# This is now an async operation
session = yield super ( GatewaySessionManager , self ) . get_session ( * * kwargs )
raise gen . Return ( session )
@gen.coroutine
def update_session ( self , session_id , * * kwargs ) :
""" Updates the values in the session database.
Changes the values of the session with the given session_id
with the values from the keyword arguments .
Overrides base class method to turn into an async operation .
Parameters
- - - - - - - - - -
session_id : str
a uuid that identifies a session in the sqlite3 database
* * kwargs : str
the key must correspond to a column title in session database ,
and the value replaces the current value in the session
with session_id .
"""
# This is now an async operation
session = yield self . get_session ( session_id = session_id )
if not kwargs :
# no changes
return
sets = [ ]
for column in kwargs . keys ( ) :
if column not in self . _columns :
raise TypeError ( " No such column: %r " % column )
sets . append ( " %s =? " % column )
query = " UPDATE session SET %s WHERE session_id=? " % ( ' , ' . join ( sets ) )
self . cursor . execute ( query , list ( kwargs . values ( ) ) + [ session_id ] )
@gen.coroutine
def row_to_model ( self , row ) :
""" Takes sqlite database session row and turns it into a dictionary.
Overrides base class method to turn into an async operation .
"""
# Retrieve kernel for session, which is now an async operation
kernel = yield self . kernel_manager . get_kernel ( row [ ' kernel_id ' ] )
if kernel is None :
# The kernel was killed or died without deleting the session.
# We can't use delete_session here because that tries to find
# and shut down the kernel.
self . cursor . execute ( " DELETE FROM session WHERE session_id=? " ,
( row [ ' session_id ' ] , ) )
raise KeyError
model = {
' id ' : row [ ' session_id ' ] ,
' path ' : row [ ' path ' ] ,
' name ' : row [ ' name ' ] ,
' type ' : row [ ' type ' ] ,
' kernel ' : kernel
}
if row [ ' type ' ] == ' notebook ' : # Provide the deprecated API.
model [ ' notebook ' ] = { ' path ' : row [ ' path ' ] , ' name ' : row [ ' name ' ] }
raise gen . Return ( model )
@gen.coroutine
def list_sessions ( self ) :
""" Returns a list of dictionaries containing all the information from
the session database .
Overrides base class method to turn into an async operation .
"""
c = self . cursor . execute ( " SELECT * FROM session " )
result = [ ]
# We need to use fetchall() here, because row_to_model can delete rows,
# which messes up the cursor if we're iterating over rows.
for row in c . fetchall ( ) :
try :
# This is now an async operation
model = yield self . row_to_model ( row )
result . append ( model )
except KeyError :
pass
raise gen . Return ( result )
@gen.coroutine
def delete_session ( self , session_id ) :
""" Deletes the row in the session database with given session_id.
Overrides base class method to turn into an async operation .
"""
# This is now an async operation
session = yield self . get_session ( session_id = session_id )
yield gen . maybe_future ( self . kernel_manager . shutdown_kernel ( session [ ' kernel ' ] [ ' id ' ] ) )
self . cursor . execute ( " DELETE FROM session WHERE session_id=? " , ( session_id , ) )
def kernel_culled ( self , kernel_id ) :
""" Checks if the kernel is still considered alive and returns true if its not found. """
kernel = yield self . kernel_manager . get_kernel ( kernel_id )
raise gen . Return ( kernel is None )