Merge pull request #1827 from minrk/kernel-activity

add activity watching to kernels
pull/2083/head
Thomas Kluyver 9 years ago committed by GitHub
commit f5072cf434

@ -0,0 +1,42 @@
# encoding: utf-8
"""
Timezone utilities
Just UTC-awareness right now
"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from datetime import tzinfo, timedelta, datetime
# constant for zero offset
ZERO = timedelta(0)
class tzUTC(tzinfo):
"""tzinfo object for UTC (zero offset)"""
def utcoffset(self, d):
return ZERO
def dst(self, d):
return ZERO
UTC = tzUTC()
def utc_aware(unaware):
"""decorator for adding UTC tzinfo to datetime's utcfoo methods"""
def utc_method(*args, **kwargs):
dt = unaware(*args, **kwargs)
return dt.replace(tzinfo=UTC)
return utc_method
utcfromtimestamp = utc_aware(datetime.utcfromtimestamp)
utcnow = utc_aware(datetime.utcnow)
def isoformat(dt):
"""Return iso-formatted timestamp
Like .isoformat(), but uses Z for UTC instead of +00:00
"""
return dt.isoformat().replace('+00:00', 'Z')

@ -30,6 +30,7 @@ from ipython_genutils.path import filefind
from ipython_genutils.py3compat import string_types
import notebook
from notebook._tz import utcnow
from notebook.utils import is_hidden, url_path_join, url_is_absolute, url_escape
from notebook.services.security import csp_report_uri
@ -432,8 +433,18 @@ class APIHandler(IPythonHandler):
"default-src 'none'",
])
return csp
# set _track_activity = False on API handlers that shouldn't track activity
_track_activity = True
def update_api_activity(self):
"""Update last_activity of API requests"""
# record activity of authenticated requests
if self._track_activity and self.get_current_user():
self.settings['api_last_activity'] = utcnow()
def finish(self, *args, **kwargs):
self.update_api_activity()
self.set_header('Content-Type', 'application/json')
return super(APIHandler, self).finish(*args, **kwargs)

@ -97,6 +97,7 @@ from ipython_genutils import py3compat
from jupyter_core.paths import jupyter_runtime_dir, jupyter_path
from notebook._sysinfo import get_sys_info
from ._tz import utcnow
from .utils import url_path_join, check_pid, url_escape
#-----------------------------------------------------------------------------
@ -198,6 +199,8 @@ class NotebookWebApplication(web.Application):
working on the notebook's Javascript and LESS""")
warnings.warn("The `ignore_minified_js` flag is deprecated and will be removed in Notebook 6.0", DeprecationWarning)
now = utcnow()
settings = dict(
# basics
log_function=log_request,
@ -236,7 +239,8 @@ class NotebookWebApplication(web.Application):
kernel_spec_manager=kernel_spec_manager,
config_manager=config_manager,
# IPython stuff
# Jupyter stuff
started=now,
jinja_template_vars=jupyter_app.jinja_template_vars,
nbextensions_path=jupyter_app.nbextensions_path,
websocket_url=jupyter_app.websocket_url,

@ -617,7 +617,36 @@ paths:
/status:
get:
summary: Get the current status / activity of the server
responses:
200:
description: The current status of the server
$ref: '#/definitions/APIStatus'
definitions:
APIStatus:
description: |
Notebook server API status.
Added in notebook 5.0.
properties:
started:
type: string
description: |
ISO8601 timestamp indicating when the notebook server started.
last_activity:
type: string
description: |
ISO8601 timestamp indicating the last activity on the server,
either on the REST API or kernel activity.
connections:
type: number
description: |
The total number of currently open connections to kernels.
kernels:
type: number
description: |
The total number of running kernels.
KernelSpec:
description: Kernel spec (contents of kernel.json)
properties:
@ -697,6 +726,23 @@ definitions:
name:
type: string
description: kernel spec name
last_activity:
type: string
description: |
ISO 8601 timestamp for the last-seen activity on this kernel.
Use this in combination with execution_state == 'idle' to identify
which kernels have been idle since a given time.
Timestamps will be UTC, indicated 'Z' suffix.
Added in notebook server 5.0.
connections:
type: number
description: |
The number of active connections to this kernel.
execution_state:
type: string
description: |
Current execution state of the kernel (typically 'idle' or 'busy', but may be other values, such as 'starting').
Added in notebook server 5.0.
Session:
description: A session
type: object

@ -3,8 +3,14 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
from tornado import web
from ...base.handlers import IPythonHandler
from itertools import chain
import json
from tornado import gen, web
from ...base.handlers import IPythonHandler, APIHandler, json_errors
from notebook._tz import utcfromtimestamp, isoformat
import os
class APISpecHandler(web.StaticFileHandler, IPythonHandler):
@ -18,6 +24,33 @@ class APISpecHandler(web.StaticFileHandler, IPythonHandler):
self.set_header('Content-Type', 'text/x-yaml')
return web.StaticFileHandler.get(self, 'api.yaml')
class APIStatusHandler(APIHandler):
_track_activity = False
@json_errors
@web.authenticated
@gen.coroutine
def get(self):
# if started was missing, use unix epoch
started = self.settings.get('started', utcfromtimestamp(0))
# if we've never seen API activity, use started date
api_last_activity = self.settings.get('api_last_activity', started)
started = isoformat(started)
api_last_activity = isoformat(api_last_activity)
kernels = yield gen.maybe_future(self.kernel_manager.list_kernels())
total_connections = sum(k['connections'] for k in kernels)
last_activity = max(chain([api_last_activity], [k['last_activity'] for k in kernels]))
model = {
'started': started,
'last_activity': last_activity,
'kernels': len(kernels),
'connections': total_connections,
}
self.finish(json.dumps(model, sort_keys=True))
default_handlers = [
(r"/api/spec.yaml", APISpecHandler)
(r"/api/spec.yaml", APISpecHandler),
(r"/api/status", APIStatusHandler),
]

@ -0,0 +1,32 @@
"""Test the basic /api endpoints"""
import requests
from notebook._tz import isoformat
from notebook.utils import url_path_join
from notebook.tests.launchnotebook import NotebookTestBase
class KernelAPITest(NotebookTestBase):
"""Test the kernels web service API"""
def _req(self, verb, path, **kwargs):
r = self.request(verb, url_path_join('api', path))
r.raise_for_status()
return r
def get(self, path, **kwargs):
return self._req('GET', path)
def test_get_spec(self):
r = self.get('spec.yaml')
assert r.text
def test_get_status(self):
r = self.get('status')
data = r.json()
assert data['connections'] == 0
assert data['kernels'] == 0
assert data['last_activity'].endswith('Z')
assert data['started'].endswith('Z')
assert data['started'] == isoformat(self.notebook.web_app.settings['started'])

@ -12,11 +12,12 @@ from .checkpoints import (
)
from .fileio import FileManagerMixin
from . import tz
from ipython_genutils.path import ensure_dir_exists
from ipython_genutils.py3compat import getcwd
from traitlets import Unicode
from notebook import _tz as tz
class FileCheckpoints(FileManagerMixin, Checkpoints):
"""

@ -22,7 +22,8 @@ from .manager import ContentsManager
from ipython_genutils.importstring import import_item
from traitlets import Any, Unicode, Bool, TraitError, observe, default, validate
from ipython_genutils.py3compat import getcwd, string_types
from . import tz
from notebook import _tz as tz
from notebook.utils import (
is_hidden, is_file_hidden,
to_api_path,

@ -1,46 +0,0 @@
# encoding: utf-8
"""
Timezone utilities
Just UTC-awareness right now
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from datetime import tzinfo, timedelta, datetime
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
# constant for zero offset
ZERO = timedelta(0)
class tzUTC(tzinfo):
"""tzinfo object for UTC (zero offset)"""
def utcoffset(self, d):
return ZERO
def dst(self, d):
return ZERO
UTC = tzUTC()
def utc_aware(unaware):
"""decorator for adding UTC tzinfo to datetime's utcfoo methods"""
def utc_method(*args, **kwargs):
dt = unaware(*args, **kwargs)
return dt.replace(tzinfo=UTC)
return utc_method
utcfromtimestamp = utc_aware(datetime.utcfromtimestamp)
utcnow = utc_aware(datetime.utcnow)

@ -31,7 +31,7 @@ class MainKernelHandler(APIHandler):
def get(self):
km = self.kernel_manager
kernels = yield gen.maybe_future(km.list_kernels())
self.finish(json.dumps(kernels))
self.finish(json.dumps(kernels, default=date_default))
@json_errors
@web.authenticated
@ -51,7 +51,7 @@ class MainKernelHandler(APIHandler):
location = url_path_join(self.base_url, 'api', 'kernels', url_escape(kernel_id))
self.set_header('Location', location)
self.set_status(201)
self.finish(json.dumps(model))
self.finish(json.dumps(model, default=date_default))
class KernelHandler(APIHandler):
@ -62,7 +62,7 @@ class KernelHandler(APIHandler):
km = self.kernel_manager
km._check_kernel_id(kernel_id)
model = km.kernel_model(kernel_id)
self.finish(json.dumps(model))
self.finish(json.dumps(model, default=date_default))
@json_errors
@web.authenticated
@ -93,7 +93,7 @@ class KernelActionHandler(APIHandler):
self.set_status(500)
else:
model = km.kernel_model(kernel_id)
self.write(json.dumps(model))
self.write(json.dumps(model, default=date_default))
self.finish()
@ -260,6 +260,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
def open(self, kernel_id):
super(ZMQChannelsHandler, self).open()
self.kernel_manager.notify_connect(kernel_id)
try:
self.create_stream()
except web.HTTPError as e:
@ -401,6 +402,7 @@ class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
self._open_sessions.pop(self.session_key)
km = self.kernel_manager
if self.kernel_id in km:
km.notify_disconnect(self.kernel_id)
km.remove_restart_callback(
self.kernel_id, self.on_kernel_restarted,
)

@ -14,9 +14,10 @@ from tornado.concurrent import Future
from tornado.ioloop import IOLoop
from jupyter_client.multikernelmanager import MultiKernelManager
from traitlets import List, Unicode, TraitError, default, validate
from traitlets import Dict, List, Unicode, TraitError, default, validate
from notebook.utils import to_os_path
from notebook._tz import utcnow, isoformat
from ipython_genutils.py3compat import getcwd
@ -30,6 +31,8 @@ class MappingKernelManager(MultiKernelManager):
kernel_argv = List(Unicode())
root_dir = Unicode(config=True)
_kernel_connections = Dict()
@default('root_dir')
def _default_root_dir(self):
@ -90,6 +93,8 @@ class MappingKernelManager(MultiKernelManager):
kernel_id = yield gen.maybe_future(
super(MappingKernelManager, self).start_kernel(**kwargs)
)
self._kernel_connections[kernel_id] = 0
self.start_watching_activity(kernel_id)
self.log.info("Kernel started: %s" % kernel_id)
self.log.debug("Kernel args: %r" % kwargs)
# register callback for failed auto-restart
@ -102,10 +107,12 @@ class MappingKernelManager(MultiKernelManager):
self.log.info("Using existing kernel: %s" % kernel_id)
# py2-compat
raise gen.Return(kernel_id)
def shutdown_kernel(self, kernel_id, now=False):
"""Shutdown a kernel by kernel_id"""
self._check_kernel_id(kernel_id)
self._kernels[kernel_id]._activity_stream.close()
self._kernel_connections.pop(kernel_id, None)
return super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now)
def restart_kernel(self, kernel_id):
@ -149,12 +156,31 @@ class MappingKernelManager(MultiKernelManager):
timeout = loop.add_timeout(loop.time() + 30, on_timeout)
return future
def notify_connect(self, kernel_id):
"""Notice a new connection to a kernel"""
if kernel_id in self._kernel_connections:
self._kernel_connections[kernel_id] += 1
def notify_disconnect(self, kernel_id):
"""Notice a disconnection from a kernel"""
if kernel_id in self._kernel_connections:
self._kernel_connections[kernel_id] -= 1
def kernel_model(self, kernel_id):
"""Return a dictionary of kernel information described in the
JSON standard model."""
"""Return a JSON-safe dict representing a kernel
For use in representing kernels in the JSON APIs.
"""
self._check_kernel_id(kernel_id)
model = {"id":kernel_id,
"name": self._kernels[kernel_id].kernel_name}
kernel = self._kernels[kernel_id]
model = {
"id":kernel_id,
"name": kernel.kernel_name,
"last_activity": isoformat(kernel.last_activity),
"execution_state": kernel.execution_state,
"connections": self._kernel_connections[kernel_id],
}
return model
def list_kernels(self):
@ -171,3 +197,31 @@ class MappingKernelManager(MultiKernelManager):
"""Check a that a kernel_id exists and raise 404 if not."""
if kernel_id not in self:
raise web.HTTPError(404, u'Kernel does not exist: %s' % kernel_id)
# monitoring activity:
def start_watching_activity(self, kernel_id):
"""Start watching IOPub messages on a kernel for activity.
- update last_activity on every message
- record execution_state from status messages
"""
kernel = self._kernels[kernel_id]
# add busy/activity markers:
kernel.execution_state = 'starting'
kernel.last_activity = utcnow()
kernel._activity_stream = kernel.connect_iopub()
def record_activity(msg_list):
"""Record an IOPub message arriving from a kernel"""
kernel.last_activity = utcnow()
idents, fed_msg_list = kernel.session.feed_identities(msg_list)
msg = kernel.session.deserialize(fed_msg_list)
msg_type = msg['header']['msg_type']
self.log.debug("activity on %s: %s", kernel_id, msg_type)
if msg_type == 'status':
kernel.execution_state = msg['content']['execution_state']
kernel._activity_stream.on_recv(record_activity)

@ -1,7 +1,11 @@
"""Test the kernels service API."""
import json
import requests
import time
from tornado.httpclient import HTTPRequest
from tornado.ioloop import IOLoop
from tornado.websocket import websocket_connect
from jupyter_client.kernelspec import NATIVE_KERNEL_NAME
@ -10,8 +14,10 @@ from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error
class KernelAPI(object):
"""Wrapper for kernel REST API requests"""
def __init__(self, request):
def __init__(self, request, base_url, headers):
self.request = request
self.base_url = base_url
self.headers = headers
def _req(self, verb, path, body=None):
response = self.request(verb,
@ -45,10 +51,23 @@ class KernelAPI(object):
def restart(self, id):
return self._req('POST', url_path_join(id, 'restart'))
def websocket(self, id):
loop = IOLoop()
req = HTTPRequest(
url_path_join(self.base_url.replace('http', 'ws', 1), 'api/kernels', id, 'channels'),
headers=self.headers,
)
f = websocket_connect(req, io_loop=loop)
return loop.run_sync(lambda : f)
class KernelAPITest(NotebookTestBase):
"""Test the kernels web service API"""
def setUp(self):
self.kern_api = KernelAPI(self.request)
self.kern_api = KernelAPI(self.request,
base_url=self.base_url(),
headers=self.auth_headers(),
)
def tearDown(self):
for k in self.kern_api.list().json():
@ -144,3 +163,22 @@ class KernelAPITest(NotebookTestBase):
bad_id = '111-111-111-111-111'
with assert_http_error(404, 'Kernel does not exist: ' + bad_id):
self.kern_api.shutdown(bad_id)
def test_connections(self):
kid = self.kern_api.start().json()['id']
model = self.kern_api.get(kid).json()
self.assertEqual(model['connections'], 0)
ws = self.kern_api.websocket(kid)
model = self.kern_api.get(kid).json()
self.assertEqual(model['connections'], 1)
ws.close()
# give it some time to close on the other side:
for i in range(10):
model = self.kern_api.get(kid).json()
if model['connections'] > 0:
time.sleep(0.1)
else:
break
model = self.kern_api.get(kid).json()
self.assertEqual(model['connections'], 0)

@ -11,6 +11,8 @@ from . import csp_report_uri
class CSPReportHandler(APIHandler):
'''Accepts a content security policy violation report'''
_track_activity = False
def skip_origin_check(self):
"""Don't check origin when reporting origin-check violations!"""
return True

@ -9,11 +9,15 @@ from tornado.ioloop import IOLoop
from ..sessionmanager import SessionManager
from notebook.services.kernels.kernelmanager import MappingKernelManager
from notebook.services.contents.manager import ContentsManager
from notebook._tz import utcnow, isoformat
class DummyKernel(object):
def __init__(self, kernel_name='python'):
self.kernel_name = kernel_name
dummy_date = utcnow()
dummy_date_s = isoformat(dummy_date)
class DummyMKM(MappingKernelManager):
"""MappingKernelManager interface that doesn't start kernels, for testing"""
def __init__(self, *args, **kwargs):
@ -25,7 +29,10 @@ class DummyMKM(MappingKernelManager):
def start_kernel(self, kernel_id=None, path=None, kernel_name='python', **kwargs):
kernel_id = kernel_id or self._new_id()
self._kernels[kernel_id] = DummyKernel(kernel_name=kernel_name)
k = self._kernels[kernel_id] = DummyKernel(kernel_name=kernel_name)
self._kernel_connections[kernel_id] = 0
k.last_activity = dummy_date
k.execution_state = 'idle'
return kernel_id
def shutdown_kernel(self, kernel_id, now=False):
@ -65,7 +72,13 @@ class TestSessionManager(TestCase):
'notebook': {'path': u'/path/to/test.ipynb', 'name': None},
'type': 'notebook',
'name': None,
'kernel': {'id':u'A', 'name': 'bar'}}
'kernel': {
'id': 'A',
'name': 'bar',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}}
self.assertEqual(model, expected)
def test_bad_get_session(self):
@ -102,19 +115,37 @@ class TestSessionManager(TestCase):
'type': 'notebook',
'notebook': {'path': u'/path/to/1/test1.ipynb', 'name': None},
'name': None,
'kernel':{'id':u'A', 'name':'python'}
'kernel': {
'id': 'A',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}, {
'id':sessions[1]['id'],
'path': u'/path/to/2/test2.py',
'type': 'file',
'name': None,
'kernel':{'id':u'B', 'name':'python'}
'kernel': {
'id': 'B',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}, {
'id':sessions[2]['id'],
'path': u'/path/to/3',
'type': 'console',
'name': 'foo',
'kernel':{'id':u'C', 'name':'python'}
'kernel': {
'id': 'C',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}
]
self.assertEqual(sessions, expected)
@ -136,8 +167,11 @@ class TestSessionManager(TestCase):
'name': None,
'notebook': {'path': u'/path/to/2/test2.ipynb', 'name': None},
'kernel': {
'id': u'B',
'id': 'B',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}
]
@ -154,7 +188,14 @@ class TestSessionManager(TestCase):
'type': 'notebook',
'name': None,
'notebook': {'path': u'/path/to/new_name.ipynb', 'name': None},
'kernel':{'id':u'A', 'name':'julia'}}
'kernel': {
'id': 'A',
'name':'julia',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}
self.assertEqual(model, expected)
def test_bad_update_session(self):
@ -179,13 +220,25 @@ class TestSessionManager(TestCase):
'type': 'notebook',
'name': None,
'notebook': {'path': u'/path/to/1/test1.ipynb', 'name': None},
'kernel': {'id':u'A', 'name':'python'}
'kernel': {
'id': 'A',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}, {
'id': sessions[2]['id'],
'type': 'console',
'path': u'/path/to/3',
'name': 'foo',
'kernel': {'id':u'C', 'name':'python'}
'kernel': {
'id': 'C',
'name':'python',
'connections': 0,
'last_activity': dummy_date_s,
'execution_state': 'idle',
}
}
]
self.assertEqual(new_sessions, expected)

@ -225,6 +225,8 @@ class SessionAPITest(NotebookTestBase):
r = self.request('GET', 'api/kernels')
r.raise_for_status()
kernel_list = r.json()
after['kernel'].pop('last_activity')
[ k.pop('last_activity') for k in kernel_list ]
self.assertEqual(kernel_list, [after['kernel']])
def test_modify_kernel_id(self):
@ -248,4 +250,7 @@ class SessionAPITest(NotebookTestBase):
r = self.request('GET', 'api/kernels')
r.raise_for_status()
kernel_list = r.json()
kernel.pop('last_activity')
[ k.pop('last_activity') for k in kernel_list ]
self.assertEqual(kernel_list, [kernel])

@ -72,16 +72,22 @@ class NotebookTestBase(TestCase):
raise TimeoutError("Undead notebook server")
@classmethod
def request(self, verb, path, **kwargs):
def auth_headers(cls):
headers = {}
if cls.token:
headers['Authorization'] = 'token %s' % cls.token
return headers
@classmethod
def request(cls, verb, path, **kwargs):
"""Send a request to my server
with authentication and everything.
"""
headers = kwargs.setdefault('headers', {})
# kwargs.setdefault('allow_redirects', False)
headers.setdefault('Authorization', 'token %s' % self.token)
headers.update(cls.auth_headers())
response = requests.request(verb,
url_path_join(self.base_url(), path),
url_path_join(cls.base_url(), path),
**kwargs)
return response

Loading…
Cancel
Save