Merge pull request #5938 from minrk/contents2

add contents webservice
Min RK 12 years ago
commit 19557d37ab

@ -1,4 +1,4 @@
"""Base Tornado handlers for the notebook."""
"""Base Tornado handlers for the notebook server."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
@ -27,7 +27,7 @@ except ImportError:
from IPython.config import Application
from IPython.utils.path import filefind
from IPython.utils.py3compat import string_types
from IPython.html.utils import is_hidden
from IPython.html.utils import is_hidden, url_path_join, url_escape
#-----------------------------------------------------------------------------
# Top-level handlers
@ -141,8 +141,8 @@ class IPythonHandler(AuthenticatedHandler):
return self.settings['kernel_manager']
@property
def notebook_manager(self):
return self.settings['notebook_manager']
def contents_manager(self):
return self.settings['contents_manager']
@property
def cluster_manager(self):
@ -156,10 +156,6 @@ class IPythonHandler(AuthenticatedHandler):
def kernel_spec_manager(self):
return self.settings['kernel_spec_manager']
@property
def project_dir(self):
return self.notebook_manager.notebook_dir
#---------------------------------------------------------------
# CORS
#---------------------------------------------------------------
@ -409,6 +405,37 @@ class TrailingSlashHandler(web.RequestHandler):
def get(self):
self.redirect(self.request.uri.rstrip('/'))
class FilesRedirectHandler(IPythonHandler):
"""Handler for redirecting relative URLs to the /files/ handler"""
def get(self, path=''):
cm = self.contents_manager
if cm.path_exists(path):
# it's a *directory*, redirect to /tree
url = url_path_join(self.base_url, 'tree', path)
else:
orig_path = path
# otherwise, redirect to /files
parts = path.split('/')
path = '/'.join(parts[:-1])
name = parts[-1]
if not cm.file_exists(name=name, path=path) and 'files' in parts:
# redirect without files/ iff it would 404
# this preserves pre-2.0-style 'files/' links
self.log.warn("Deprecated files/ URL: %s", orig_path)
parts.remove('files')
path = '/'.join(parts[:-1])
if not cm.file_exists(name=name, path=path):
raise web.HTTPError(404)
url = url_path_join(self.base_url, 'files', path, name)
url = url_escape(url)
self.log.debug("Redirecting %s to %s", self.request.path, url)
self.redirect(url)
#-----------------------------------------------------------------------------
# URL pattern fragments for re-use
#-----------------------------------------------------------------------------
@ -416,6 +443,8 @@ class TrailingSlashHandler(web.RequestHandler):
path_regex = r"(?P<path>(?:/.*)*)"
notebook_name_regex = r"(?P<name>[^/]+\.ipynb)"
notebook_path_regex = "%s/%s" % (path_regex, notebook_name_regex)
file_name_regex = r"(?P<name>[^/]+)"
file_path_regex = "%s/%s" % (path_regex, file_name_regex)
#-----------------------------------------------------------------------------
# URL to handler mappings

@ -1,10 +1,18 @@
"""Tornado handlers for nbconvert."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import io
import os
import zipfile
from tornado import web
from ..base.handlers import IPythonHandler, notebook_path_regex
from ..base.handlers import (
IPythonHandler, FilesRedirectHandler,
notebook_path_regex, path_regex,
)
from IPython.nbformat.current import to_notebook_json
from IPython.utils.py3compat import cast_bytes
@ -73,7 +81,7 @@ class NbconvertFileHandler(IPythonHandler):
exporter = get_exporter(format, config=self.config, log=self.log)
path = path.strip('/')
model = self.notebook_manager.get_notebook(name=name, path=path)
model = self.contents_manager.get_model(name=name, path=path)
self.set_header('Last-Modified', model['last_modified'])
@ -123,6 +131,7 @@ class NbconvertPostHandler(IPythonHandler):
self.finish(output)
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
@ -134,4 +143,5 @@ default_handlers = [
(r"/nbconvert/%s%s" % (_format_regex, notebook_path_regex),
NbconvertFileHandler),
(r"/nbconvert/%s" % _format_regex, NbconvertPostHandler),
(r"/nbconvert/html%s" % path_regex, FilesRedirectHandler),
]

@ -106,7 +106,7 @@ class APITest(NotebookTestBase):
@onlyif_cmds_exist('pandoc')
def test_from_post(self):
nbmodel_url = url_path_join(self.base_url(), 'api/notebooks/foo/testnb.ipynb')
nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
nbmodel = requests.get(nbmodel_url).json()
r = self.nbconvert_api.from_post(format='html', nbmodel=nbmodel)
@ -121,7 +121,7 @@ class APITest(NotebookTestBase):
@onlyif_cmds_exist('pandoc')
def test_from_post_zip(self):
nbmodel_url = url_path_join(self.base_url(), 'api/notebooks/foo/testnb.ipynb')
nbmodel_url = url_path_join(self.base_url(), 'api/contents/foo/testnb.ipynb')
nbmodel = requests.get(nbmodel_url).json()
r = self.nbconvert_api.from_post(format='latex', nbmodel=nbmodel)

@ -1,31 +1,17 @@
"""Tornado handlers for the live notebook view.
"""Tornado handlers for the live notebook view."""
Authors:
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import os
from tornado import web
HTTPError = web.HTTPError
from ..base.handlers import IPythonHandler, notebook_path_regex, path_regex
from ..utils import url_path_join, url_escape
#-----------------------------------------------------------------------------
# Handlers
#-----------------------------------------------------------------------------
from ..base.handlers import (
IPythonHandler, FilesRedirectHandler,
notebook_path_regex, path_regex,
)
from ..utils import url_escape
class NotebookHandler(IPythonHandler):
@ -35,17 +21,16 @@ class NotebookHandler(IPythonHandler):
"""get renders the notebook template if a name is given, or
redirects to the '/files/' handler if the name is not given."""
path = path.strip('/')
nbm = self.notebook_manager
cm = self.contents_manager
if name is None:
raise web.HTTPError(500, "This shouldn't be accessible: %s" % self.request.uri)
# a .ipynb filename was given
if not nbm.notebook_exists(name, path):
if not cm.file_exists(name, path):
raise web.HTTPError(404, u'Notebook does not exist: %s/%s' % (path, name))
name = url_escape(name)
path = url_escape(path)
self.write(self.render_template('notebook.html',
project=self.project_dir,
notebook_path=path,
notebook_name=name,
kill_kernel=False,
@ -53,30 +38,6 @@ class NotebookHandler(IPythonHandler):
)
)
class NotebookRedirectHandler(IPythonHandler):
def get(self, path=''):
nbm = self.notebook_manager
if nbm.path_exists(path):
# it's a *directory*, redirect to /tree
url = url_path_join(self.base_url, 'tree', path)
else:
# otherwise, redirect to /files
if '/files/' in path:
# redirect without files/ iff it would 404
# this preserves pre-2.0-style 'files/' links
# FIXME: this is hardcoded based on notebook_path,
# but so is the files handler itself,
# so it should work until both are cleaned up.
parts = path.split('/')
files_path = os.path.join(nbm.notebook_dir, *parts)
if not os.path.exists(files_path):
self.log.warn("Deprecated files/ URL: %s", path)
path = path.replace('/files/', '/', 1)
url = url_path_join(self.base_url, 'files', path)
url = url_escape(url)
self.log.debug("Redirecting %s to %s", self.request.path, url)
self.redirect(url)
#-----------------------------------------------------------------------------
# URL to handler mappings
@ -85,6 +46,6 @@ class NotebookRedirectHandler(IPythonHandler):
default_handlers = [
(r"/notebooks%s" % notebook_path_regex, NotebookHandler),
(r"/notebooks%s" % path_regex, NotebookRedirectHandler),
(r"/notebooks%s" % path_regex, FilesRedirectHandler),
]

@ -55,8 +55,8 @@ from IPython.html import DEFAULT_STATIC_FILES_PATH
from .base.handlers import Template404
from .log import log_request
from .services.kernels.kernelmanager import MappingKernelManager
from .services.notebooks.nbmanager import NotebookManager
from .services.notebooks.filenbmanager import FileNotebookManager
from .services.contents.manager import ContentsManager
from .services.contents.filemanager import FileContentsManager
from .services.clusters.clustermanager import ClusterManager
from .services.sessions.sessionmanager import SessionManager
@ -121,19 +121,19 @@ def load_handlers(name):
class NotebookWebApplication(web.Application):
def __init__(self, ipython_app, kernel_manager, notebook_manager,
def __init__(self, ipython_app, kernel_manager, contents_manager,
cluster_manager, session_manager, kernel_spec_manager, log,
base_url, settings_overrides, jinja_env_options):
settings = self.init_settings(
ipython_app, kernel_manager, notebook_manager, cluster_manager,
ipython_app, kernel_manager, contents_manager, cluster_manager,
session_manager, kernel_spec_manager, log, base_url,
settings_overrides, jinja_env_options)
handlers = self.init_handlers(settings)
super(NotebookWebApplication, self).__init__(handlers, **settings)
def init_settings(self, ipython_app, kernel_manager, notebook_manager,
def init_settings(self, ipython_app, kernel_manager, contents_manager,
cluster_manager, session_manager, kernel_spec_manager,
log, base_url, settings_overrides,
jinja_env_options=None):
@ -165,7 +165,7 @@ class NotebookWebApplication(web.Application):
# managers
kernel_manager=kernel_manager,
notebook_manager=notebook_manager,
contents_manager=contents_manager,
cluster_manager=cluster_manager,
session_manager=session_manager,
kernel_spec_manager=kernel_spec_manager,
@ -193,18 +193,20 @@ class NotebookWebApplication(web.Application):
handlers.extend(load_handlers('nbconvert.handlers'))
handlers.extend(load_handlers('kernelspecs.handlers'))
handlers.extend(load_handlers('services.kernels.handlers'))
handlers.extend(load_handlers('services.notebooks.handlers'))
handlers.extend(load_handlers('services.contents.handlers'))
handlers.extend(load_handlers('services.clusters.handlers'))
handlers.extend(load_handlers('services.sessions.handlers'))
handlers.extend(load_handlers('services.nbconvert.handlers'))
handlers.extend(load_handlers('services.kernelspecs.handlers'))
# FIXME: /files/ should be handled by the Contents service when it exists
nbm = settings['notebook_manager']
if hasattr(nbm, 'notebook_dir'):
handlers.extend([
(r"/files/(.*)", AuthenticatedFileHandler, {'path' : nbm.notebook_dir}),
cm = settings['contents_manager']
if hasattr(cm, 'root_dir'):
handlers.append(
(r"/files/(.*)", AuthenticatedFileHandler, {'path' : cm.root_dir}),
)
handlers.append(
(r"/nbextensions/(.*)", FileFindHandler, {'path' : settings['nbextensions_path']}),
])
)
# prepend base_url onto the patterns that we match
new_handlers = []
for handler in handlers:
@ -264,9 +266,9 @@ flags['no-mathjax']=(
)
# Add notebook manager flags
flags.update(boolean_flag('script', 'FileNotebookManager.save_script',
'Auto-save a .py script everytime the .ipynb notebook is saved',
'Do not auto-save .py scripts for every notebook'))
flags.update(boolean_flag('script', 'FileContentsManager.save_script',
'DEPRECATED, IGNORED',
'DEPRECATED, IGNORED'))
aliases = dict(base_aliases)
@ -302,7 +304,7 @@ class NotebookApp(BaseIPythonApplication):
classes = [
KernelManager, ProfileDir, Session, MappingKernelManager,
NotebookManager, FileNotebookManager, NotebookNotary,
ContentsManager, FileContentsManager, NotebookNotary,
]
flags = Dict(flags)
aliases = Dict(aliases)
@ -557,7 +559,7 @@ class NotebookApp(BaseIPythonApplication):
else:
self.log.info("Using MathJax: %s", new)
notebook_manager_class = DottedObjectName('IPython.html.services.notebooks.filenbmanager.FileNotebookManager',
contents_manager_class = DottedObjectName('IPython.html.services.contents.filemanager.FileContentsManager',
config=True,
help='The notebook manager class to use.'
)
@ -621,7 +623,7 @@ class NotebookApp(BaseIPythonApplication):
raise TraitError("No such notebook dir: %r" % new)
# setting App.notebook_dir implies setting notebook and kernel dirs as well
self.config.FileNotebookManager.notebook_dir = new
self.config.FileContentsManager.root_dir = new
self.config.MappingKernelManager.root_dir = new
@ -658,12 +660,12 @@ class NotebookApp(BaseIPythonApplication):
parent=self, log=self.log, kernel_argv=self.kernel_argv,
connection_dir = self.profile_dir.security_dir,
)
kls = import_item(self.notebook_manager_class)
self.notebook_manager = kls(parent=self, log=self.log)
kls = import_item(self.contents_manager_class)
self.contents_manager = kls(parent=self, log=self.log)
kls = import_item(self.session_manager_class)
self.session_manager = kls(parent=self, log=self.log,
kernel_manager=self.kernel_manager,
notebook_manager=self.notebook_manager)
contents_manager=self.contents_manager)
kls = import_item(self.cluster_manager_class)
self.cluster_manager = kls(parent=self, log=self.log)
self.cluster_manager.update_profiles()
@ -688,7 +690,7 @@ class NotebookApp(BaseIPythonApplication):
self.webapp_settings['allow_credentials'] = self.allow_credentials
self.web_app = NotebookWebApplication(
self, self.kernel_manager, self.notebook_manager,
self, self.kernel_manager, self.contents_manager,
self.cluster_manager, self.session_manager, self.kernel_spec_manager,
self.log, self.base_url, self.webapp_settings,
self.jinja_environment_options
@ -838,7 +840,7 @@ class NotebookApp(BaseIPythonApplication):
def notebook_info(self):
"Return the current working directory and the server url information"
info = self.notebook_manager.info_string() + "\n"
info = self.contents_manager.info_string() + "\n"
info += "%d active kernels \n" % len(self.kernel_manager._kernels)
return info + "The IPython Notebook is running at: %s" % self.display_url

@ -0,0 +1,531 @@
"""A contents manager that uses the local file system for storage."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
import io
import os
import glob
import shutil
from tornado import web
from .manager import ContentsManager
from IPython.nbformat import current
from IPython.utils.path import ensure_dir_exists
from IPython.utils.traitlets import Unicode, Bool, TraitError
from IPython.utils.py3compat import getcwd
from IPython.utils import tz
from IPython.html.utils import is_hidden, to_os_path, url_path_join
class FileContentsManager(ContentsManager):
root_dir = Unicode(getcwd(), config=True)
save_script = Bool(False, config=True, help='DEPRECATED, IGNORED')
def _save_script_changed(self):
self.log.warn("""
Automatically saving notebooks as scripts has been removed.
Use `ipython nbconvert --to python [notebook]` instead.
""")
def _root_dir_changed(self, name, old, new):
"""Do a bit of validation of the root_dir."""
if not os.path.isabs(new):
# If we receive a non-absolute path, make it absolute.
self.root_dir = os.path.abspath(new)
return
if not os.path.isdir(new):
raise TraitError("%r is not a directory" % new)
checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
help="""The directory name in which to keep file checkpoints
This is a path relative to the file's own directory.
By default, it is .ipynb_checkpoints
"""
)
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError as e:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, name=None, path=''):
"""Given a filename and API path, return its file system
path.
Parameters
----------
name : string
A filename
path : string
The relative API path to the named file.
Returns
-------
path : string
API path to be evaluated relative to root_dir.
"""
if name is not None:
path = url_path_join(path, name)
return to_os_path(path, self.root_dir)
def path_exists(self, path):
"""Does the API-style path refer to an extant directory?
API-style wrapper for os.path.isdir
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
exists : bool
Whether the path is indeed a directory.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.isdir(os_path)
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
exists : bool
Whether the path is hidden.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return is_hidden(os_path, self.root_dir)
def file_exists(self, name, path=''):
"""Returns True if the file exists, else returns False.
API-style wrapper for os.path.isfile
Parameters
----------
name : string
The name of the file you are checking.
path : string
The relative path to the file's directory (with '/' as separator)
Returns
-------
exists : bool
Whether the file exists.
"""
path = path.strip('/')
nbpath = self._get_os_path(name, path=path)
return os.path.isfile(nbpath)
def exists(self, name=None, path=''):
"""Returns True if the path [and name] exists, else returns False.
API-style wrapper for os.path.exists
Parameters
----------
name : string
The name of the file you are checking.
path : string
The relative path to the file's directory (with '/' as separator)
Returns
-------
exists : bool
Whether the target exists.
"""
path = path.strip('/')
os_path = self._get_os_path(name, path=path)
return os.path.exists(os_path)
def _base_model(self, name, path=''):
"""Build the common base of a contents model"""
os_path = self._get_os_path(name, path)
info = os.stat(os_path)
last_modified = tz.utcfromtimestamp(info.st_mtime)
created = tz.utcfromtimestamp(info.st_ctime)
# Create the base model.
model = {}
model['name'] = name
model['path'] = path
model['last_modified'] = last_modified
model['created'] = created
model['content'] = None
model['format'] = None
return model
def _dir_model(self, name, path='', content=True):
"""Build a model for a directory
if content is requested, will include a listing of the directory
"""
os_path = self._get_os_path(name, path)
four_o_four = u'directory does not exist: %r' % os_path
if not os.path.isdir(os_path):
raise web.HTTPError(404, four_o_four)
elif is_hidden(os_path, self.root_dir):
self.log.info("Refusing to serve hidden directory %r, via 404 Error",
os_path
)
raise web.HTTPError(404, four_o_four)
if name is None:
if '/' in path:
path, name = path.rsplit('/', 1)
else:
name = ''
model = self._base_model(name, path)
model['type'] = 'directory'
dir_path = u'{}/{}'.format(path, name)
if content:
model['content'] = contents = []
for os_path in glob.glob(self._get_os_path('*', dir_path)):
name = os.path.basename(os_path)
if self.should_list(name) and not is_hidden(os_path, self.root_dir):
contents.append(self.get_model(name=name, path=dir_path, content=False))
model['format'] = 'json'
return model
def _file_model(self, name, path='', content=True):
"""Build a model for a file
if content is requested, include the file contents.
UTF-8 text files will be unicode, binary files will be base64-encoded.
"""
model = self._base_model(name, path)
model['type'] = 'file'
if content:
os_path = self._get_os_path(name, path)
with io.open(os_path, 'rb') as f:
bcontent = f.read()
try:
model['content'] = bcontent.decode('utf8')
except UnicodeError as e:
model['content'] = base64.encodestring(bcontent).decode('ascii')
model['format'] = 'base64'
else:
model['format'] = 'text'
return model
def _notebook_model(self, name, path='', content=True):
"""Build a notebook model
if content is requested, the notebook content will be populated
as a JSON structure (not double-serialized)
"""
model = self._base_model(name, path)
model['type'] = 'notebook'
if content:
os_path = self._get_os_path(name, path)
with io.open(os_path, 'r', encoding='utf-8') as f:
try:
nb = current.read(f, u'json')
except Exception as e:
raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
self.mark_trusted_cells(nb, name, path)
model['content'] = nb
model['format'] = 'json'
return model
def get_model(self, name, path='', content=True):
""" Takes a path and name for an entity and returns its model
Parameters
----------
name : str
the name of the target
path : str
the API path that describes the relative path for the target
Returns
-------
model : dict
the contents model. If content=True, returns the contents
of the file or directory as well.
"""
path = path.strip('/')
if not self.exists(name=name, path=path):
raise web.HTTPError(404, u'No such file or directory: %s/%s' % (path, name))
os_path = self._get_os_path(name, path)
if os.path.isdir(os_path):
model = self._dir_model(name, path, content)
elif name.endswith('.ipynb'):
model = self._notebook_model(name, path, content)
else:
model = self._file_model(name, path, content)
return model
def _save_notebook(self, os_path, model, name='', path=''):
"""save a notebook file"""
# Save the notebook file
nb = current.to_notebook_json(model['content'])
self.check_and_sign(nb, name, path)
if 'name' in nb['metadata']:
nb['metadata']['name'] = u''
with io.open(os_path, 'w', encoding='utf-8') as f:
current.write(nb, f, u'json')
def _save_file(self, os_path, model, name='', path=''):
"""save a non-notebook file"""
fmt = model.get('format', None)
if fmt not in {'text', 'base64'}:
raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
try:
content = model['content']
if fmt == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
with io.open(os_path, 'wb') as f:
f.write(bcontent)
def _save_directory(self, os_path, model, name='', path=''):
"""create a directory"""
if is_hidden(os_path, self.root_dir):
raise web.HTTPError(400, u'Cannot create hidden directory %r' % os_path)
if not os.path.exists(os_path):
os.mkdir(os_path)
elif not os.path.isdir(os_path):
raise web.HTTPError(400, u'Not a directory: %s' % (os_path))
else:
self.log.debug("Directory %r already exists", os_path)
def save(self, model, name='', path=''):
"""Save the file model and return the model with no content."""
path = path.strip('/')
if 'type' not in model:
raise web.HTTPError(400, u'No file type provided')
if 'content' not in model and model['type'] != 'directory':
raise web.HTTPError(400, u'No file content provided')
# One checkpoint should always exist
if self.file_exists(name, path) and not self.list_checkpoints(name, path):
self.create_checkpoint(name, path)
new_path = model.get('path', path).strip('/')
new_name = model.get('name', name)
if path != new_path or name != new_name:
self.rename(name, path, new_name, new_path)
os_path = self._get_os_path(new_name, new_path)
self.log.debug("Saving %s", os_path)
try:
if model['type'] == 'notebook':
self._save_notebook(os_path, model, new_name, new_path)
elif model['type'] == 'file':
self._save_file(os_path, model, new_name, new_path)
elif model['type'] == 'directory':
self._save_directory(os_path, model, new_name, new_path)
else:
raise web.HTTPError(400, "Unhandled contents type: %s" % model['type'])
except web.HTTPError:
raise
except Exception as e:
raise web.HTTPError(400, u'Unexpected error while saving file: %s %s' % (os_path, e))
model = self.get_model(new_name, new_path, content=False)
return model
def update(self, model, name, path=''):
"""Update the file's path and/or name
For use in PATCH requests, to enable renaming a file without
re-uploading its contents. Only used for renaming at the moment.
"""
path = path.strip('/')
new_name = model.get('name', name)
new_path = model.get('path', path).strip('/')
if path != new_path or name != new_name:
self.rename(name, path, new_name, new_path)
model = self.get_model(new_name, new_path, content=False)
return model
def delete(self, name, path=''):
"""Delete file by name and path."""
path = path.strip('/')
os_path = self._get_os_path(name, path)
rm = os.unlink
if os.path.isdir(os_path):
listing = os.listdir(os_path)
# don't delete non-empty directories (checkpoints dir doesn't count)
if listing and listing != [self.checkpoint_dir]:
raise web.HTTPError(400, u'Directory %s not empty' % os_path)
elif not os.path.isfile(os_path):
raise web.HTTPError(404, u'File does not exist: %s' % os_path)
# clear checkpoints
for checkpoint in self.list_checkpoints(name, path):
checkpoint_id = checkpoint['id']
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if os.path.isfile(cp_path):
self.log.debug("Unlinking checkpoint %s", cp_path)
os.unlink(cp_path)
if os.path.isdir(os_path):
self.log.debug("Removing directory %s", os_path)
shutil.rmtree(os_path)
else:
self.log.debug("Unlinking file %s", os_path)
rm(os_path)
def rename(self, old_name, old_path, new_name, new_path):
"""Rename a file."""
old_path = old_path.strip('/')
new_path = new_path.strip('/')
if new_name == old_name and new_path == old_path:
return
new_os_path = self._get_os_path(new_name, new_path)
old_os_path = self._get_os_path(old_name, old_path)
# Should we proceed with the move?
if os.path.isfile(new_os_path):
raise web.HTTPError(409, u'File with name already exists: %s' % new_os_path)
# Move the file
try:
shutil.move(old_os_path, new_os_path)
except Exception as e:
raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_os_path, e))
# Move the checkpoints
old_checkpoints = self.list_checkpoints(old_name, old_path)
for cp in old_checkpoints:
checkpoint_id = cp['id']
old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
if os.path.isfile(old_cp_path):
self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
shutil.move(old_cp_path, new_cp_path)
# Checkpoint-related utilities
def get_checkpoint_path(self, checkpoint_id, name, path=''):
"""find the path to a checkpoint"""
path = path.strip('/')
basename, ext = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=ext,
)
os_path = self._get_os_path(path=path)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def get_checkpoint_model(self, checkpoint_id, name, path=''):
"""construct the info dict for a given checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
stats = os.stat(cp_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id = checkpoint_id,
last_modified = last_modified,
)
return info
# public checkpoint API
def create_checkpoint(self, name, path=''):
"""Create a checkpoint from the current state of a file"""
path = path.strip('/')
src_path = self._get_os_path(name, path)
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
self.log.debug("creating checkpoint for %s", name)
self._copy(src_path, cp_path)
# return the checkpoint info
return self.get_checkpoint_model(checkpoint_id, name, path)
def list_checkpoints(self, name, path=''):
"""list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.exists(os_path):
return []
else:
return [self.get_checkpoint_model(checkpoint_id, name, path)]
def restore_checkpoint(self, checkpoint_id, name, path=''):
"""restore a file to a checkpointed state"""
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", name, checkpoint_id)
nb_path = self._get_os_path(name, path)
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.isfile(cp_path):
self.log.debug("checkpoint file does not exist: %s", cp_path)
raise web.HTTPError(404,
u'checkpoint does not exist: %s-%s' % (name, checkpoint_id)
)
# ensure notebook is readable (never restore from an unreadable notebook)
if cp_path.endswith('.ipynb'):
with io.open(cp_path, 'r', encoding='utf-8') as f:
current.read(f, u'json')
self._copy(cp_path, nb_path)
self.log.debug("copying %s -> %s", cp_path, nb_path)
def delete_checkpoint(self, checkpoint_id, name, path=''):
"""delete a file's checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.isfile(cp_path):
raise web.HTTPError(404,
u'Checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
)
self.log.debug("unlinking %s", cp_path)
os.unlink(cp_path)
def info_string(self):
return "Serving notebooks from local directory: %s" % self.root_dir
def get_kernel_path(self, name, path='', model=None):
"""Return the initial working dir a kernel associated with a given notebook"""
return os.path.join(self.root_dir, path)

@ -0,0 +1,286 @@
"""Tornado handlers for the contents web service."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import json
from tornado import web
from IPython.html.utils import url_path_join, url_escape
from IPython.utils.jsonutil import date_default
from IPython.html.base.handlers import (IPythonHandler, json_errors,
file_path_regex, path_regex,
file_name_regex)
def sort_key(model):
"""key function for case-insensitive sort by name and type"""
iname = model['name'].lower()
type_key = {
'directory' : '0',
'notebook' : '1',
'file' : '2',
}.get(model['type'], '9')
return u'%s%s' % (type_key, iname)
class ContentsHandler(IPythonHandler):
SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
def location_url(self, name, path):
"""Return the full URL location of a file.
Parameters
----------
name : unicode
The base name of the file, such as "foo.ipynb".
path : unicode
The API path of the file, such as "foo/bar".
"""
return url_escape(url_path_join(
self.base_url, 'api', 'contents', path, name
))
def _finish_model(self, model, location=True):
"""Finish a JSON request with a model, setting relevant headers, etc."""
if location:
location = self.location_url(model['name'], model['path'])
self.set_header('Location', location)
self.set_header('Last-Modified', model['last_modified'])
self.finish(json.dumps(model, default=date_default))
@web.authenticated
@json_errors
def get(self, path='', name=None):
"""Return a model for a file or directory.
A directory model contains a list of models (without content)
of the files and directories it contains.
"""
path = path or ''
model = self.contents_manager.get_model(name=name, path=path)
if model['type'] == 'directory':
# group listing by type, then by name (case-insensitive)
# FIXME: sorting should be done in the frontends
model['content'].sort(key=sort_key)
self._finish_model(model, location=False)
@web.authenticated
@json_errors
def patch(self, path='', name=None):
"""PATCH renames a notebook without re-uploading content."""
cm = self.contents_manager
if name is None:
raise web.HTTPError(400, u'Filename missing')
model = self.get_json_body()
if model is None:
raise web.HTTPError(400, u'JSON body missing')
model = cm.update(model, name, path)
self._finish_model(model)
def _copy(self, copy_from, path, copy_to=None):
"""Copy a file, optionally specifying the new name.
"""
self.log.info(u"Copying {copy_from} to {path}/{copy_to}".format(
copy_from=copy_from,
path=path,
copy_to=copy_to or '',
))
model = self.contents_manager.copy(copy_from, copy_to, path)
self.set_status(201)
self._finish_model(model)
def _upload(self, model, path, name=None):
"""Handle upload of a new file
If name specified, create it in path/name,
otherwise create a new untitled file in path.
"""
self.log.info(u"Uploading file to %s/%s", path, name or '')
if name:
model['name'] = name
model = self.contents_manager.create_file(model, path)
self.set_status(201)
self._finish_model(model)
def _create_empty_file(self, path, name=None, ext='.ipynb'):
"""Create an empty file in path
If name specified, create it in path/name.
"""
self.log.info(u"Creating new file in %s/%s", path, name or '')
model = {}
if name:
model['name'] = name
model = self.contents_manager.create_file(model, path=path, ext=ext)
self.set_status(201)
self._finish_model(model)
def _save(self, model, path, name):
"""Save an existing file."""
self.log.info(u"Saving file at %s/%s", path, name)
model = self.contents_manager.save(model, name, path)
if model['path'] != path.strip('/') or model['name'] != name:
# a rename happened, set Location header
location = True
else:
location = False
self._finish_model(model, location)
@web.authenticated
@json_errors
def post(self, path='', name=None):
"""Create a new file or directory in the specified path.
POST creates new files or directories. The server always decides on the name.
POST /api/contents/path
New untitled notebook in path. If content specified, upload a
notebook, otherwise start empty.
POST /api/contents/path
with body {"copy_from" : "OtherNotebook.ipynb"}
New copy of OtherNotebook in path
"""
if name is not None:
path = u'{}/{}'.format(path, name)
cm = self.contents_manager
if cm.file_exists(path):
raise web.HTTPError(400, "Cannot POST to existing files, use PUT instead.")
if not cm.path_exists(path):
raise web.HTTPError(404, "No such directory: %s" % path)
model = self.get_json_body()
if model is not None:
copy_from = model.get('copy_from')
ext = model.get('ext', '.ipynb')
if model.get('content') is not None:
if copy_from:
raise web.HTTPError(400, "Can't upload and copy at the same time.")
self._upload(model, path)
elif copy_from:
self._copy(copy_from, path)
else:
self._create_empty_file(path, ext=ext)
else:
self._create_empty_file(path)
@web.authenticated
@json_errors
def put(self, path='', name=None):
"""Saves the file in the location specified by name and path.
PUT is very similar to POST, but the requester specifies the name,
whereas with POST, the server picks the name.
PUT /api/contents/path/Name.ipynb
Save notebook at ``path/Name.ipynb``. Notebook structure is specified
in `content` key of JSON request body. If content is not specified,
create a new empty notebook.
PUT /api/contents/path/Name.ipynb
with JSON body::
{
"copy_from" : "[path/to/]OtherNotebook.ipynb"
}
Copy OtherNotebook to Name
"""
if name is None:
raise web.HTTPError(400, "name must be specified with PUT.")
model = self.get_json_body()
if model:
copy_from = model.get('copy_from')
if copy_from:
if model.get('content'):
raise web.HTTPError(400, "Can't upload and copy at the same time.")
self._copy(copy_from, path, name)
elif self.contents_manager.file_exists(name, path):
self._save(model, path, name)
else:
self._upload(model, path, name)
else:
self._create_empty_file(path, name)
@web.authenticated
@json_errors
def delete(self, path='', name=None):
"""delete a file in the given path"""
cm = self.contents_manager
self.log.warn('delete %s:%s', path, name)
cm.delete(name, path)
self.set_status(204)
self.finish()
class CheckpointsHandler(IPythonHandler):
SUPPORTED_METHODS = ('GET', 'POST')
@web.authenticated
@json_errors
def get(self, path='', name=None):
"""get lists checkpoints for a file"""
cm = self.contents_manager
checkpoints = cm.list_checkpoints(name, path)
data = json.dumps(checkpoints, default=date_default)
self.finish(data)
@web.authenticated
@json_errors
def post(self, path='', name=None):
"""post creates a new checkpoint"""
cm = self.contents_manager
checkpoint = cm.create_checkpoint(name, path)
data = json.dumps(checkpoint, default=date_default)
location = url_path_join(self.base_url, 'api/contents',
path, name, 'checkpoints', checkpoint['id'])
self.set_header('Location', url_escape(location))
self.set_status(201)
self.finish(data)
class ModifyCheckpointsHandler(IPythonHandler):
SUPPORTED_METHODS = ('POST', 'DELETE')
@web.authenticated
@json_errors
def post(self, path, name, checkpoint_id):
"""post restores a file from a checkpoint"""
cm = self.contents_manager
cm.restore_checkpoint(checkpoint_id, name, path)
self.set_status(204)
self.finish()
@web.authenticated
@json_errors
def delete(self, path, name, checkpoint_id):
"""delete clears a checkpoint for a given file"""
cm = self.contents_manager
cm.delete_checkpoint(checkpoint_id, name, path)
self.set_status(204)
self.finish()
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
_checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
default_handlers = [
(r"/api/contents%s/checkpoints" % file_path_regex, CheckpointsHandler),
(r"/api/contents%s/checkpoints/%s" % (file_path_regex, _checkpoint_id_regex),
ModifyCheckpointsHandler),
(r"/api/contents%s" % file_path_regex, ContentsHandler),
(r"/api/contents%s" % path_regex, ContentsHandler),
]

@ -0,0 +1,333 @@
"""A base class for contents managers."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from fnmatch import fnmatch
import itertools
import os
from tornado.web import HTTPError
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import current, sign
from IPython.utils.traitlets import Instance, Unicode, List
class ContentsManager(LoggingConfigurable):
"""Base class for serving files and directories.
This serves any text or binary file,
as well as directories,
with special handling for JSON notebook documents.
Most APIs take a path argument,
which is always an API-style unicode path,
and always refers to a directory.
- unicode, not url-escaped
- '/'-separated
- leading and trailing '/' will be stripped
- if unspecified, path defaults to '',
indicating the root path.
name is also unicode, and refers to a specfic target:
- unicode, not url-escaped
- must not contain '/'
- It refers to an individual filename
- It may refer to a directory name,
in the case of listing or creating directories.
"""
notary = Instance(sign.NotebookNotary)
def _notary_default(self):
return sign.NotebookNotary(parent=self)
hide_globs = List(Unicode, [
u'__pycache__', '*.pyc', '*.pyo',
'.DS_Store', '*.so', '*.dylib', '*~',
], config=True, help="""
Glob patterns to hide in file and directory listings.
""")
untitled_notebook = Unicode("Untitled", config=True,
help="The base name used when creating untitled notebooks."
)
untitled_file = Unicode("untitled", config=True,
help="The base name used when creating untitled files."
)
untitled_directory = Unicode("Untitled Folder", config=True,
help="The base name used when creating untitled directories."
)
# ContentsManager API part 1: methods that must be
# implemented in subclasses.
def path_exists(self, path):
"""Does the API-style path (directory) actually exist?
Like os.path.isdir
Override this method in subclasses.
Parameters
----------
path : string
The path to check
Returns
-------
exists : bool
Whether the path does indeed exist.
"""
raise NotImplementedError
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root dir).
Returns
-------
hidden : bool
Whether the path is hidden.
"""
raise NotImplementedError
def file_exists(self, name, path=''):
"""Does a file exist at the given name and path?
Like os.path.isfile
Override this method in subclasses.
Parameters
----------
name : string
The name of the file you are checking.
path : string
The relative path to the file's directory (with '/' as separator)
Returns
-------
exists : bool
Whether the file exists.
"""
raise NotImplementedError('must be implemented in a subclass')
def exists(self, name, path=''):
"""Does a file or directory exist at the given name and path?
Like os.path.exists
Parameters
----------
name : string
The name of the file you are checking.
path : string
The relative path to the file's directory (with '/' as separator)
Returns
-------
exists : bool
Whether the target exists.
"""
return self.file_exists(name, path) or self.path_exists("%s/%s" % (path, name))
def get_model(self, name, path='', content=True):
"""Get the model of a file or directory with or without content."""
raise NotImplementedError('must be implemented in a subclass')
def save(self, model, name, path=''):
"""Save the file or directory and return the model with no content."""
raise NotImplementedError('must be implemented in a subclass')
def update(self, model, name, path=''):
"""Update the file or directory and return the model with no content.
For use in PATCH requests, to enable renaming a file without
re-uploading its contents. Only used for renaming at the moment.
"""
raise NotImplementedError('must be implemented in a subclass')
def delete(self, name, path=''):
"""Delete file or directory by name and path."""
raise NotImplementedError('must be implemented in a subclass')
def create_checkpoint(self, name, path=''):
"""Create a checkpoint of the current state of a file
Returns a checkpoint_id for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, name, path=''):
"""Return a list of checkpoints for a given file"""
return []
def restore_checkpoint(self, checkpoint_id, name, path=''):
"""Restore a file from one of its checkpoints"""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, name, path=''):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
# ContentsManager API part 2: methods that have useable default
# implementations, but can be overridden in subclasses.
def info_string(self):
return "Serving contents"
def get_kernel_path(self, name, path='', model=None):
""" Return the path to start kernel in """
return path
def increment_filename(self, filename, path=''):
"""Increment a filename until it is unique.
Parameters
----------
filename : unicode
The name of a file, including extension
path : unicode
The API path of the target's directory
Returns
-------
name : unicode
A filename that is unique, based on the input filename.
"""
path = path.strip('/')
basename, ext = os.path.splitext(filename)
for i in itertools.count():
name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
ext=ext)
if not self.file_exists(name, path):
break
return name
def create_file(self, model=None, path='', ext='.ipynb'):
"""Create a new file or directory and return its model with no content."""
path = path.strip('/')
if model is None:
model = {}
if 'content' not in model and model.get('type', None) != 'directory':
if ext == '.ipynb':
metadata = current.new_metadata(name=u'')
model['content'] = current.new_notebook(metadata=metadata)
model['type'] = 'notebook'
model['format'] = 'json'
else:
model['content'] = ''
model['type'] = 'file'
model['format'] = 'text'
if 'name' not in model:
if model['type'] == 'directory':
untitled = self.untitled_directory
elif model['type'] == 'notebook':
untitled = self.untitled_notebook
elif model['type'] == 'file':
untitled = self.untitled_file
else:
raise HTTPError(400, "Unexpected model type: %r" % model['type'])
model['name'] = self.increment_filename(untitled + ext, path)
model['path'] = path
model = self.save(model, model['name'], model['path'])
return model
def copy(self, from_name, to_name=None, path=''):
"""Copy an existing file and return its new model.
If to_name not specified, increment `from_name-Copy#.ext`.
copy_from can be a full path to a file,
or just a base name. If a base name, `path` is used.
"""
path = path.strip('/')
if '/' in from_name:
from_path, from_name = from_name.rsplit('/', 1)
else:
from_path = path
model = self.get_model(from_name, from_path)
if model['type'] == 'directory':
raise HTTPError(400, "Can't copy directories")
if not to_name:
base, ext = os.path.splitext(from_name)
copy_name = u'{0}-Copy{1}'.format(base, ext)
to_name = self.increment_filename(copy_name, path)
model['name'] = to_name
model['path'] = path
model = self.save(model, to_name, path)
return model
def log_info(self):
self.log.info(self.info_string())
def trust_notebook(self, name, path=''):
"""Explicitly trust a notebook
Parameters
----------
name : string
The filename of the notebook
path : string
The notebook's directory
"""
model = self.get_model(name, path)
nb = model['content']
self.log.warn("Trusting notebook %s/%s", path, name)
self.notary.mark_cells(nb, True)
self.save(model, name, path)
def check_and_sign(self, nb, name='', path=''):
"""Check for trusted cells, and sign the notebook.
Called as a part of saving notebooks.
Parameters
----------
nb : dict
The notebook object (in nbformat.current format)
name : string
The filename of the notebook (for logging)
path : string
The notebook's directory (for logging)
"""
if self.notary.check_cells(nb):
self.notary.sign(nb)
else:
self.log.warn("Saving untrusted notebook %s/%s", path, name)
def mark_trusted_cells(self, nb, name='', path=''):
"""Mark cells as trusted if the notebook signature matches.
Called as a part of loading notebooks.
Parameters
----------
nb : dict
The notebook object (in nbformat.current format)
name : string
The filename of the notebook (for logging)
path : string
The notebook's directory (for logging)
"""
trusted = self.notary.check_signature(nb)
if not trusted:
self.log.warn("Notebook %s/%s is not trusted", path, name)
self.notary.mark_cells(nb, trusted)
def should_list(self, name):
"""Should this file/directory name be displayed in a listing?"""
return not any(fnmatch(name, glob) for glob in self.hide_globs)

@ -1,6 +1,7 @@
# coding: utf-8
"""Test the notebooks webservice API."""
"""Test the contents webservice API."""
import base64
import io
import json
import os
@ -21,23 +22,21 @@ from IPython.utils import py3compat
from IPython.utils.data import uniq_stable
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service.
def notebooks_only(nb_list):
return [nb for nb in nb_list if nb['type']=='notebook']
def notebooks_only(dir_model):
return [nb for nb in dir_model['content'] if nb['type']=='notebook']
def dirs_only(nb_list):
return [x for x in nb_list if x['type']=='directory']
def dirs_only(dir_model):
return [x for x in dir_model['content'] if x['type']=='directory']
class NBAPI(object):
"""Wrapper for notebook API calls."""
class API(object):
"""Wrapper for contents API calls."""
def __init__(self, base_url):
self.base_url = base_url
def _req(self, verb, path, body=None):
response = requests.request(verb,
url_path_join(self.base_url, 'api/notebooks', path),
url_path_join(self.base_url, 'api/contents', path),
data=body,
)
response.raise_for_status()
@ -49,8 +48,11 @@ class NBAPI(object):
def read(self, name, path='/'):
return self._req('GET', url_path_join(path, name))
def create_untitled(self, path='/'):
return self._req('POST', path)
def create_untitled(self, path='/', ext=None):
body = None
if ext:
body = json.dumps({'ext': ext})
return self._req('POST', path, body)
def upload_untitled(self, body, path='/'):
return self._req('POST', path, body)
@ -65,6 +67,9 @@ class NBAPI(object):
def upload(self, name, body, path='/'):
return self._req('PUT', url_path_join(path, name), body)
def mkdir(self, name, path='/'):
return self._req('PUT', url_path_join(path, name), json.dumps({'type': 'directory'}))
def copy(self, copy_from, copy_to, path='/'):
body = json.dumps({'copy_from':copy_from})
return self._req('PUT', url_path_join(path, copy_to), body)
@ -112,8 +117,20 @@ class APITest(NotebookTestBase):
del dirs[0] # remove ''
top_level_dirs = {normalize('NFC', d.split('/')[0]) for d in dirs}
@staticmethod
def _blob_for_name(name):
return name.encode('utf-8') + b'\xFF'
@staticmethod
def _txt_for_name(name):
return u'%s text file' % name
def setUp(self):
nbdir = self.notebook_dir.name
self.blob = os.urandom(100)
self.b64_blob = base64.encodestring(self.blob).decode('ascii')
for d in (self.dirs + self.hidden_dirs):
d.replace('/', os.sep)
@ -122,12 +139,22 @@ class APITest(NotebookTestBase):
for d, name in self.dirs_nbs:
d = d.replace('/', os.sep)
# create a notebook
with io.open(pjoin(nbdir, d, '%s.ipynb' % name), 'w',
encoding='utf-8') as f:
nb = new_notebook(name=name)
write(nb, f, format='ipynb')
self.nb_api = NBAPI(self.base_url())
# create a text file
with io.open(pjoin(nbdir, d, '%s.txt' % name), 'w',
encoding='utf-8') as f:
f.write(self._txt_for_name(name))
# create a binary file
with io.open(pjoin(nbdir, d, '%s.blob' % name), 'wb') as f:
f.write(self._blob_for_name(name))
self.api = API(self.base_url())
def tearDown(self):
nbdir = self.notebook_dir.name
@ -139,175 +166,287 @@ class APITest(NotebookTestBase):
os.unlink(pjoin(nbdir, 'inroot.ipynb'))
def test_list_notebooks(self):
nbs = notebooks_only(self.nb_api.list().json())
nbs = notebooks_only(self.api.list().json())
self.assertEqual(len(nbs), 1)
self.assertEqual(nbs[0]['name'], 'inroot.ipynb')
nbs = notebooks_only(self.nb_api.list('/Directory with spaces in/').json())
nbs = notebooks_only(self.api.list('/Directory with spaces in/').json())
self.assertEqual(len(nbs), 1)
self.assertEqual(nbs[0]['name'], 'inspace.ipynb')
nbs = notebooks_only(self.nb_api.list(u'/unicodé/').json())
nbs = notebooks_only(self.api.list(u'/unicodé/').json())
self.assertEqual(len(nbs), 1)
self.assertEqual(nbs[0]['name'], 'innonascii.ipynb')
self.assertEqual(nbs[0]['path'], u'unicodé')
nbs = notebooks_only(self.nb_api.list('/foo/bar/').json())
nbs = notebooks_only(self.api.list('/foo/bar/').json())
self.assertEqual(len(nbs), 1)
self.assertEqual(nbs[0]['name'], 'baz.ipynb')
self.assertEqual(nbs[0]['path'], 'foo/bar')
nbs = notebooks_only(self.nb_api.list('foo').json())
nbs = notebooks_only(self.api.list('foo').json())
self.assertEqual(len(nbs), 4)
nbnames = { normalize('NFC', n['name']) for n in nbs }
expected = [ u'a.ipynb', u'b.ipynb', u'name with spaces.ipynb', u'unicodé.ipynb']
expected = { normalize('NFC', name) for name in expected }
self.assertEqual(nbnames, expected)
nbs = notebooks_only(self.nb_api.list('ordering').json())
nbs = notebooks_only(self.api.list('ordering').json())
nbnames = [n['name'] for n in nbs]
expected = ['A.ipynb', 'b.ipynb', 'C.ipynb']
self.assertEqual(nbnames, expected)
def test_list_dirs(self):
dirs = dirs_only(self.nb_api.list().json())
dirs = dirs_only(self.api.list().json())
dir_names = {normalize('NFC', d['name']) for d in dirs}
self.assertEqual(dir_names, self.top_level_dirs) # Excluding hidden dirs
def test_list_nonexistant_dir(self):
with assert_http_error(404):
self.nb_api.list('nonexistant')
self.api.list('nonexistant')
def test_get_contents(self):
def test_get_nb_contents(self):
for d, name in self.dirs_nbs:
nb = self.nb_api.read('%s.ipynb' % name, d+'/').json()
nb = self.api.read('%s.ipynb' % name, d+'/').json()
self.assertEqual(nb['name'], u'%s.ipynb' % name)
self.assertEqual(nb['type'], 'notebook')
self.assertIn('content', nb)
self.assertEqual(nb['format'], 'json')
self.assertIn('content', nb)
self.assertIn('metadata', nb['content'])
self.assertIsInstance(nb['content']['metadata'], dict)
def test_get_contents_no_such_file(self):
# Name that doesn't exist - should be a 404
with assert_http_error(404):
self.api.read('q.ipynb', 'foo')
def test_get_text_file_contents(self):
for d, name in self.dirs_nbs:
model = self.api.read(u'%s.txt' % name, d+'/').json()
self.assertEqual(model['name'], u'%s.txt' % name)
self.assertIn('content', model)
self.assertEqual(model['format'], 'text')
self.assertEqual(model['type'], 'file')
self.assertEqual(model['content'], self._txt_for_name(name))
# Name that doesn't exist - should be a 404
with assert_http_error(404):
self.api.read('q.txt', 'foo')
def test_get_binary_file_contents(self):
for d, name in self.dirs_nbs:
model = self.api.read(u'%s.blob' % name, d+'/').json()
self.assertEqual(model['name'], u'%s.blob' % name)
self.assertIn('content', model)
self.assertEqual(model['format'], 'base64')
self.assertEqual(model['type'], 'file')
b64_data = base64.encodestring(self._blob_for_name(name)).decode('ascii')
self.assertEqual(model['content'], b64_data)
# Name that doesn't exist - should be a 404
with assert_http_error(404):
self.nb_api.read('q.ipynb', 'foo')
self.api.read('q.txt', 'foo')
def _check_nb_created(self, resp, name, path):
def _check_created(self, resp, name, path, type='notebook'):
self.assertEqual(resp.status_code, 201)
location_header = py3compat.str_to_unicode(resp.headers['Location'])
self.assertEqual(location_header, url_escape(url_path_join(u'/api/notebooks', path, name)))
self.assertEqual(resp.json()['name'], name)
assert os.path.isfile(pjoin(
self.assertEqual(location_header, url_escape(url_path_join(u'/api/contents', path, name)))
rjson = resp.json()
self.assertEqual(rjson['name'], name)
self.assertEqual(rjson['path'], path)
self.assertEqual(rjson['type'], type)
isright = os.path.isdir if type == 'directory' else os.path.isfile
assert isright(pjoin(
self.notebook_dir.name,
path.replace('/', os.sep),
name,
))
def test_create_untitled(self):
resp = self.nb_api.create_untitled(path=u'å b')
self._check_nb_created(resp, 'Untitled0.ipynb', u'å b')
resp = self.api.create_untitled(path=u'å b')
self._check_created(resp, 'Untitled0.ipynb', u'å b')
# Second time
resp = self.nb_api.create_untitled(path=u'å b')
self._check_nb_created(resp, 'Untitled1.ipynb', u'å b')
resp = self.api.create_untitled(path=u'å b')
self._check_created(resp, 'Untitled1.ipynb', u'å b')
# And two directories down
resp = self.nb_api.create_untitled(path='foo/bar')
self._check_nb_created(resp, 'Untitled0.ipynb', 'foo/bar')
resp = self.api.create_untitled(path='foo/bar')
self._check_created(resp, 'Untitled0.ipynb', 'foo/bar')
def test_create_untitled_txt(self):
resp = self.api.create_untitled(path='foo/bar', ext='.txt')
self._check_created(resp, 'untitled0.txt', 'foo/bar', type='file')
resp = self.api.read(path='foo/bar', name='untitled0.txt')
model = resp.json()
self.assertEqual(model['type'], 'file')
self.assertEqual(model['format'], 'text')
self.assertEqual(model['content'], '')
def test_upload_untitled(self):
nb = new_notebook(name='Upload test')
nbmodel = {'content': nb}
resp = self.nb_api.upload_untitled(path=u'å b',
nbmodel = {'content': nb, 'type': 'notebook'}
resp = self.api.upload_untitled(path=u'å b',
body=json.dumps(nbmodel))
self._check_nb_created(resp, 'Untitled0.ipynb', u'å b')
self._check_created(resp, 'Untitled0.ipynb', u'å b')
def test_upload(self):
nb = new_notebook(name=u'ignored')
nbmodel = {'content': nb}
resp = self.nb_api.upload(u'Upload tést.ipynb', path=u'å b',
nbmodel = {'content': nb, 'type': 'notebook'}
resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
body=json.dumps(nbmodel))
self._check_nb_created(resp, u'Upload tést.ipynb', u'å b')
self._check_created(resp, u'Upload tést.ipynb', u'å b')
def test_mkdir(self):
resp = self.api.mkdir(u'New ∂ir', path=u'å b')
self._check_created(resp, u'New ∂ir', u'å b', type='directory')
def test_mkdir_hidden_400(self):
with assert_http_error(400):
resp = self.api.mkdir(u'.hidden', path=u'å b')
def test_upload_txt(self):
body = u'ünicode téxt'
model = {
'content' : body,
'format' : 'text',
'type' : 'file',
}
resp = self.api.upload(u'Upload tést.txt', path=u'å b',
body=json.dumps(model))
# check roundtrip
resp = self.api.read(path=u'å b', name=u'Upload tést.txt')
model = resp.json()
self.assertEqual(model['type'], 'file')
self.assertEqual(model['format'], 'text')
self.assertEqual(model['content'], body)
def test_upload_b64(self):
body = b'\xFFblob'
b64body = base64.encodestring(body).decode('ascii')
model = {
'content' : b64body,
'format' : 'base64',
'type' : 'file',
}
resp = self.api.upload(u'Upload tést.blob', path=u'å b',
body=json.dumps(model))
# check roundtrip
resp = self.api.read(path=u'å b', name=u'Upload tést.blob')
model = resp.json()
self.assertEqual(model['type'], 'file')
self.assertEqual(model['format'], 'base64')
decoded = base64.decodestring(model['content'].encode('ascii'))
self.assertEqual(decoded, body)
def test_upload_v2(self):
nb = v2.new_notebook()
ws = v2.new_worksheet()
nb.worksheets.append(ws)
ws.cells.append(v2.new_code_cell(input='print("hi")'))
nbmodel = {'content': nb}
resp = self.nb_api.upload(u'Upload tést.ipynb', path=u'å b',
nbmodel = {'content': nb, 'type': 'notebook'}
resp = self.api.upload(u'Upload tést.ipynb', path=u'å b',
body=json.dumps(nbmodel))
self._check_nb_created(resp, u'Upload tést.ipynb', u'å b')
resp = self.nb_api.read(u'Upload tést.ipynb', u'å b')
self._check_created(resp, u'Upload tést.ipynb', u'å b')
resp = self.api.read(u'Upload tést.ipynb', u'å b')
data = resp.json()
self.assertEqual(data['content']['nbformat'], current.nbformat)
self.assertEqual(data['content']['orig_nbformat'], 2)
def test_copy_untitled(self):
resp = self.nb_api.copy_untitled(u'ç d.ipynb', path=u'å b')
self._check_nb_created(resp, u'ç d-Copy0.ipynb', u'å b')
resp = self.api.copy_untitled(u'ç d.ipynb', path=u'å b')
self._check_created(resp, u'ç d-Copy0.ipynb', u'å b')
def test_copy(self):
resp = self.nb_api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
self._check_nb_created(resp, u'cøpy.ipynb', u'å b')
resp = self.api.copy(u'ç d.ipynb', u'cøpy.ipynb', path=u'å b')
self._check_created(resp, u'cøpy.ipynb', u'å b')
def test_copy_path(self):
resp = self.api.copy(u'foo/a.ipynb', u'cøpyfoo.ipynb', path=u'å b')
self._check_created(resp, u'cøpyfoo.ipynb', u'å b')
def test_copy_dir_400(self):
# can't copy directories
with assert_http_error(400):
resp = self.api.copy(u'å b', u'å c')
def test_delete(self):
for d, name in self.dirs_nbs:
resp = self.nb_api.delete('%s.ipynb' % name, d)
resp = self.api.delete('%s.ipynb' % name, d)
self.assertEqual(resp.status_code, 204)
for d in self.dirs + ['/']:
nbs = notebooks_only(self.nb_api.list(d).json())
nbs = notebooks_only(self.api.list(d).json())
self.assertEqual(len(nbs), 0)
def test_delete_dirs(self):
# depth-first delete everything, so we don't try to delete empty directories
for name in sorted(self.dirs + ['/'], key=len, reverse=True):
listing = self.api.list(name).json()['content']
for model in listing:
self.api.delete(model['name'], model['path'])
listing = self.api.list('/').json()['content']
self.assertEqual(listing, [])
def test_delete_non_empty_dir(self):
"""delete non-empty dir raises 400"""
with assert_http_error(400):
self.api.delete(u'å b')
def test_rename(self):
resp = self.nb_api.rename('a.ipynb', 'foo', 'z.ipynb')
resp = self.api.rename('a.ipynb', 'foo', 'z.ipynb')
self.assertEqual(resp.headers['Location'].split('/')[-1], 'z.ipynb')
self.assertEqual(resp.json()['name'], 'z.ipynb')
assert os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'z.ipynb'))
nbs = notebooks_only(self.nb_api.list('foo').json())
nbs = notebooks_only(self.api.list('foo').json())
nbnames = set(n['name'] for n in nbs)
self.assertIn('z.ipynb', nbnames)
self.assertNotIn('a.ipynb', nbnames)
def test_rename_existing(self):
with assert_http_error(409):
self.nb_api.rename('a.ipynb', 'foo', 'b.ipynb')
self.api.rename('a.ipynb', 'foo', 'b.ipynb')
def test_save(self):
resp = self.nb_api.read('a.ipynb', 'foo')
resp = self.api.read('a.ipynb', 'foo')
nbcontent = json.loads(resp.text)['content']
nb = to_notebook_json(nbcontent)
ws = new_worksheet()
nb.worksheets = [ws]
ws.cells.append(new_heading_cell(u'Created by test ³'))
nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
nbfile = pjoin(self.notebook_dir.name, 'foo', 'a.ipynb')
with io.open(nbfile, 'r', encoding='utf-8') as f:
newnb = read(f, format='ipynb')
self.assertEqual(newnb.worksheets[0].cells[0].source,
u'Created by test ³')
nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
newnb = to_notebook_json(nbcontent)
self.assertEqual(newnb.worksheets[0].cells[0].source,
u'Created by test ³')
# Save and rename
nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb}
resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
nbmodel= {'name': 'a2.ipynb', 'path':'foo/bar', 'content': nb, 'type': 'notebook'}
resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
saved = resp.json()
self.assertEqual(saved['name'], 'a2.ipynb')
self.assertEqual(saved['path'], 'foo/bar')
assert os.path.isfile(pjoin(self.notebook_dir.name,'foo','bar','a2.ipynb'))
assert not os.path.isfile(pjoin(self.notebook_dir.name, 'foo', 'a.ipynb'))
with assert_http_error(404):
self.nb_api.read('a.ipynb', 'foo')
self.api.read('a.ipynb', 'foo')
def test_checkpoints(self):
resp = self.nb_api.read('a.ipynb', 'foo')
r = self.nb_api.new_checkpoint('a.ipynb', 'foo')
resp = self.api.read('a.ipynb', 'foo')
r = self.api.new_checkpoint('a.ipynb', 'foo')
self.assertEqual(r.status_code, 201)
cp1 = r.json()
self.assertEqual(set(cp1), {'id', 'last_modified'})
@ -321,27 +460,26 @@ class APITest(NotebookTestBase):
hcell = new_heading_cell('Created by test')
ws.cells.append(hcell)
# Save
nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb}
resp = self.nb_api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
nbmodel= {'name': 'a.ipynb', 'path':'foo', 'content': nb, 'type': 'notebook'}
resp = self.api.save('a.ipynb', path='foo', body=json.dumps(nbmodel))
# List checkpoints
cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
self.assertEqual(cps, [cp1])
nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
nb = to_notebook_json(nbcontent)
self.assertEqual(nb.worksheets[0].cells[0].source, 'Created by test')
# Restore cp1
r = self.nb_api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
r = self.api.restore_checkpoint('a.ipynb', 'foo', cp1['id'])
self.assertEqual(r.status_code, 204)
nbcontent = self.nb_api.read('a.ipynb', 'foo').json()['content']
nbcontent = self.api.read('a.ipynb', 'foo').json()['content']
nb = to_notebook_json(nbcontent)
self.assertEqual(nb.worksheets, [])
# Delete cp1
r = self.nb_api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
r = self.api.delete_checkpoint('a.ipynb', 'foo', cp1['id'])
self.assertEqual(r.status_code, 204)
cps = self.nb_api.get_checkpoints('a.ipynb', 'foo').json()
cps = self.api.get_checkpoints('a.ipynb', 'foo').json()
self.assertEqual(cps, [])

@ -15,74 +15,74 @@ from IPython.utils.tempdir import TemporaryDirectory
from IPython.utils.traitlets import TraitError
from IPython.html.utils import url_path_join
from ..filenbmanager import FileNotebookManager
from ..nbmanager import NotebookManager
from ..filemanager import FileContentsManager
from ..manager import ContentsManager
class TestFileNotebookManager(TestCase):
class TestFileContentsManager(TestCase):
def test_nb_dir(self):
def test_root_dir(self):
with TemporaryDirectory() as td:
fm = FileNotebookManager(notebook_dir=td)
self.assertEqual(fm.notebook_dir, td)
fm = FileContentsManager(root_dir=td)
self.assertEqual(fm.root_dir, td)
def test_missing_nb_dir(self):
def test_missing_root_dir(self):
with TemporaryDirectory() as td:
nbdir = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
self.assertRaises(TraitError, FileNotebookManager, notebook_dir=nbdir)
root = os.path.join(td, 'notebook', 'dir', 'is', 'missing')
self.assertRaises(TraitError, FileContentsManager, root_dir=root)
def test_invalid_nb_dir(self):
def test_invalid_root_dir(self):
with NamedTemporaryFile() as tf:
self.assertRaises(TraitError, FileNotebookManager, notebook_dir=tf.name)
self.assertRaises(TraitError, FileContentsManager, root_dir=tf.name)
def test_get_os_path(self):
# full filesystem path should be returned with correct operating system
# separators.
with TemporaryDirectory() as td:
nbdir = td
fm = FileNotebookManager(notebook_dir=nbdir)
root = td
fm = FileContentsManager(root_dir=root)
path = fm._get_os_path('test.ipynb', '/path/to/notebook/')
rel_path_list = '/path/to/notebook/test.ipynb'.split('/')
fs_path = os.path.join(fm.notebook_dir, *rel_path_list)
fs_path = os.path.join(fm.root_dir, *rel_path_list)
self.assertEqual(path, fs_path)
fm = FileNotebookManager(notebook_dir=nbdir)
fm = FileContentsManager(root_dir=root)
path = fm._get_os_path('test.ipynb')
fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
fs_path = os.path.join(fm.root_dir, 'test.ipynb')
self.assertEqual(path, fs_path)
fm = FileNotebookManager(notebook_dir=nbdir)
fm = FileContentsManager(root_dir=root)
path = fm._get_os_path('test.ipynb', '////')
fs_path = os.path.join(fm.notebook_dir, 'test.ipynb')
fs_path = os.path.join(fm.root_dir, 'test.ipynb')
self.assertEqual(path, fs_path)
def test_checkpoint_subdir(self):
subd = u'sub ∂ir'
cp_name = 'test-cp.ipynb'
with TemporaryDirectory() as td:
nbdir = td
root = td
os.mkdir(os.path.join(td, subd))
fm = FileNotebookManager(notebook_dir=nbdir)
fm = FileContentsManager(root_dir=root)
cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb', '/')
cp_subdir = fm.get_checkpoint_path('cp', 'test.ipynb', '/%s/' % subd)
self.assertNotEqual(cp_dir, cp_subdir)
self.assertEqual(cp_dir, os.path.join(nbdir, fm.checkpoint_dir, cp_name))
self.assertEqual(cp_subdir, os.path.join(nbdir, subd, fm.checkpoint_dir, cp_name))
self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
class TestContentsManager(TestCase):
class TestNotebookManager(TestCase):
def setUp(self):
self._temp_dir = TemporaryDirectory()
self.td = self._temp_dir.name
self.notebook_manager = FileNotebookManager(
notebook_dir=self.td,
self.contents_manager = FileContentsManager(
root_dir=self.td,
log=logging.getLogger()
)
def tearDown(self):
self._temp_dir.cleanup()
def make_dir(self, abs_path, rel_path):
"""make subdirectory, rel_path is the relative path
to that directory from the location where the server started"""
@ -91,31 +91,31 @@ class TestNotebookManager(TestCase):
os.makedirs(os_path)
except OSError:
print("Directory already exists: %r" % os_path)
def add_code_cell(self, nb):
output = current.new_output("display_data", output_javascript="alert('hi');")
cell = current.new_code_cell("print('hi')", outputs=[output])
if not nb.worksheets:
nb.worksheets.append(current.new_worksheet())
nb.worksheets[0].cells.append(cell)
def new_notebook(self):
nbm = self.notebook_manager
model = nbm.create_notebook()
cm = self.contents_manager
model = cm.create_file()
name = model['name']
path = model['path']
full_model = nbm.get_notebook(name, path)
full_model = cm.get_model(name, path)
nb = full_model['content']
self.add_code_cell(nb)
nbm.save_notebook(full_model, name, path)
cm.save(full_model, name, path)
return nb, name, path
def test_create_notebook(self):
nm = self.notebook_manager
def test_create_file(self):
cm = self.contents_manager
# Test in root directory
model = nm.create_notebook()
model = cm.create_file()
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
@ -124,23 +124,23 @@ class TestNotebookManager(TestCase):
# Test in sub-directory
sub_dir = '/foo/'
self.make_dir(nm.notebook_dir, 'foo')
model = nm.create_notebook(None, sub_dir)
self.make_dir(cm.root_dir, 'foo')
model = cm.create_file(None, sub_dir)
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
self.assertEqual(model['name'], 'Untitled0.ipynb')
self.assertEqual(model['path'], sub_dir.strip('/'))
def test_get_notebook(self):
nm = self.notebook_manager
def test_get(self):
cm = self.contents_manager
# Create a notebook
model = nm.create_notebook()
model = cm.create_file()
name = model['name']
path = model['path']
# Check that we 'get' on the notebook we just created
model2 = nm.get_notebook(name, path)
model2 = cm.get_model(name, path)
assert isinstance(model2, dict)
self.assertIn('name', model2)
self.assertIn('path', model2)
@ -149,66 +149,66 @@ class TestNotebookManager(TestCase):
# Test in sub-directory
sub_dir = '/foo/'
self.make_dir(nm.notebook_dir, 'foo')
model = nm.create_notebook(None, sub_dir)
model2 = nm.get_notebook(name, sub_dir)
self.make_dir(cm.root_dir, 'foo')
model = cm.create_file(None, sub_dir)
model2 = cm.get_model(name, sub_dir)
assert isinstance(model2, dict)
self.assertIn('name', model2)
self.assertIn('path', model2)
self.assertIn('content', model2)
self.assertEqual(model2['name'], 'Untitled0.ipynb')
self.assertEqual(model2['path'], sub_dir.strip('/'))
def test_update_notebook(self):
nm = self.notebook_manager
def test_update(self):
cm = self.contents_manager
# Create a notebook
model = nm.create_notebook()
model = cm.create_file()
name = model['name']
path = model['path']
# Change the name in the model for rename
model['name'] = 'test.ipynb'
model = nm.update_notebook(model, name, path)
model = cm.update(model, name, path)
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
self.assertEqual(model['name'], 'test.ipynb')
# Make sure the old name is gone
self.assertRaises(HTTPError, nm.get_notebook, name, path)
self.assertRaises(HTTPError, cm.get_model, name, path)
# Test in sub-directory
# Create a directory and notebook in that directory
sub_dir = '/foo/'
self.make_dir(nm.notebook_dir, 'foo')
model = nm.create_notebook(None, sub_dir)
self.make_dir(cm.root_dir, 'foo')
model = cm.create_file(None, sub_dir)
name = model['name']
path = model['path']
# Change the name in the model for rename
model['name'] = 'test_in_sub.ipynb'
model = nm.update_notebook(model, name, path)
model = cm.update(model, name, path)
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
self.assertEqual(model['name'], 'test_in_sub.ipynb')
self.assertEqual(model['path'], sub_dir.strip('/'))
# Make sure the old name is gone
self.assertRaises(HTTPError, nm.get_notebook, name, path)
self.assertRaises(HTTPError, cm.get_model, name, path)
def test_save_notebook(self):
nm = self.notebook_manager
def test_save(self):
cm = self.contents_manager
# Create a notebook
model = nm.create_notebook()
model = cm.create_file()
name = model['name']
path = model['path']
# Get the model with 'content'
full_model = nm.get_notebook(name, path)
full_model = cm.get_model(name, path)
# Save the notebook
model = nm.save_notebook(full_model, name, path)
model = cm.save(full_model, name, path)
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
@ -218,103 +218,84 @@ class TestNotebookManager(TestCase):
# Test in sub-directory
# Create a directory and notebook in that directory
sub_dir = '/foo/'
self.make_dir(nm.notebook_dir, 'foo')
model = nm.create_notebook(None, sub_dir)
self.make_dir(cm.root_dir, 'foo')
model = cm.create_file(None, sub_dir)
name = model['name']
path = model['path']
model = nm.get_notebook(name, path)
model = cm.get_model(name, path)
# Change the name in the model for rename
model = nm.save_notebook(model, name, path)
model = cm.save(model, name, path)
assert isinstance(model, dict)
self.assertIn('name', model)
self.assertIn('path', model)
self.assertEqual(model['name'], 'Untitled0.ipynb')
self.assertEqual(model['path'], sub_dir.strip('/'))
def test_save_notebook_with_script(self):
nm = self.notebook_manager
# Create a notebook
model = nm.create_notebook()
nm.save_script = True
model = nm.create_notebook()
name = model['name']
path = model['path']
# Get the model with 'content'
full_model = nm.get_notebook(name, path)
# Save the notebook
model = nm.save_notebook(full_model, name, path)
# Check that the script was created
py_path = os.path.join(nm.notebook_dir, os.path.splitext(name)[0]+'.py')
assert os.path.exists(py_path), py_path
def test_delete_notebook(self):
nm = self.notebook_manager
def test_delete(self):
cm = self.contents_manager
# Create a notebook
nb, name, path = self.new_notebook()
# Delete the notebook
nm.delete_notebook(name, path)
cm.delete(name, path)
# Check that a 'get' on the deleted notebook raises and error
self.assertRaises(HTTPError, nm.get_notebook, name, path)
def test_copy_notebook(self):
nm = self.notebook_manager
self.assertRaises(HTTPError, cm.get_model, name, path)
def test_copy(self):
cm = self.contents_manager
path = u'å b'
name = u'nb √.ipynb'
os.mkdir(os.path.join(nm.notebook_dir, path))
orig = nm.create_notebook({'name' : name}, path=path)
os.mkdir(os.path.join(cm.root_dir, path))
orig = cm.create_file({'name' : name}, path=path)
# copy with unspecified name
copy = nm.copy_notebook(name, path=path)
copy = cm.copy(name, path=path)
self.assertEqual(copy['name'], orig['name'].replace('.ipynb', '-Copy0.ipynb'))
# copy with specified name
copy2 = nm.copy_notebook(name, u'copy 2.ipynb', path=path)
copy2 = cm.copy(name, u'copy 2.ipynb', path=path)
self.assertEqual(copy2['name'], u'copy 2.ipynb')
def test_trust_notebook(self):
nbm = self.notebook_manager
cm = self.contents_manager
nb, name, path = self.new_notebook()
untrusted = nbm.get_notebook(name, path)['content']
assert not nbm.notary.check_cells(untrusted)
untrusted = cm.get_model(name, path)['content']
assert not cm.notary.check_cells(untrusted)
# print(untrusted)
nbm.trust_notebook(name, path)
trusted = nbm.get_notebook(name, path)['content']
cm.trust_notebook(name, path)
trusted = cm.get_model(name, path)['content']
# print(trusted)
assert nbm.notary.check_cells(trusted)
assert cm.notary.check_cells(trusted)
def test_mark_trusted_cells(self):
nbm = self.notebook_manager
cm = self.contents_manager
nb, name, path = self.new_notebook()
nbm.mark_trusted_cells(nb, name, path)
cm.mark_trusted_cells(nb, name, path)
for cell in nb.worksheets[0].cells:
if cell.cell_type == 'code':
assert not cell.trusted
nbm.trust_notebook(name, path)
nb = nbm.get_notebook(name, path)['content']
cm.trust_notebook(name, path)
nb = cm.get_model(name, path)['content']
for cell in nb.worksheets[0].cells:
if cell.cell_type == 'code':
assert cell.trusted
def test_check_and_sign(self):
nbm = self.notebook_manager
cm = self.contents_manager
nb, name, path = self.new_notebook()
nbm.mark_trusted_cells(nb, name, path)
nbm.check_and_sign(nb, name, path)
assert not nbm.notary.check_signature(nb)
nbm.trust_notebook(name, path)
nb = nbm.get_notebook(name, path)['content']
nbm.mark_trusted_cells(nb, name, path)
nbm.check_and_sign(nb, name, path)
assert nbm.notary.check_signature(nb)
cm.mark_trusted_cells(nb, name, path)
cm.check_and_sign(nb, name, path)
assert not cm.notary.check_signature(nb)
cm.trust_notebook(name, path)
nb = cm.get_model(name, path)['content']
cm.mark_trusted_cells(nb, name, path)
cm.check_and_sign(nb, name, path)
assert cm.notary.check_signature(nb)

@ -1,470 +0,0 @@
"""A notebook manager that uses the local file system for storage."""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import io
import os
import glob
import shutil
from tornado import web
from .nbmanager import NotebookManager
from IPython.nbformat import current
from IPython.utils.path import ensure_dir_exists
from IPython.utils.traitlets import Unicode, Bool, TraitError
from IPython.utils.py3compat import getcwd
from IPython.utils import tz
from IPython.html.utils import is_hidden, to_os_path
def sort_key(item):
"""Case-insensitive sorting."""
return item['name'].lower()
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class FileNotebookManager(NotebookManager):
save_script = Bool(False, config=True,
help="""Automatically create a Python script when saving the notebook.
For easier use of import, %run and %load across notebooks, a
<notebook-name>.py script will be created next to any
<notebook-name>.ipynb on each save. This can also be set with the
short `--script` flag.
"""
)
notebook_dir = Unicode(getcwd(), config=True)
def _notebook_dir_changed(self, name, old, new):
"""Do a bit of validation of the notebook dir."""
if not os.path.isabs(new):
# If we receive a non-absolute path, make it absolute.
self.notebook_dir = os.path.abspath(new)
return
if not os.path.exists(new) or not os.path.isdir(new):
raise TraitError("notebook dir %r is not a directory" % new)
checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
help="""The directory name in which to keep notebook checkpoints
This is a path relative to the notebook's own directory.
By default, it is .ipynb_checkpoints
"""
)
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError as e:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def get_notebook_names(self, path=''):
"""List all notebook names in the notebook dir and path."""
path = path.strip('/')
if not os.path.isdir(self._get_os_path(path=path)):
raise web.HTTPError(404, 'Directory not found: ' + path)
names = glob.glob(self._get_os_path('*'+self.filename_ext, path))
names = [os.path.basename(name)
for name in names]
return names
def path_exists(self, path):
"""Does the API-style path (directory) actually exist?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to base notebook-dir).
Returns
-------
exists : bool
Whether the path is indeed a directory.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.isdir(os_path)
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to base notebook-dir).
Returns
-------
exists : bool
Whether the path is hidden.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return is_hidden(os_path, self.notebook_dir)
def _get_os_path(self, name=None, path=''):
"""Given a notebook name and a URL path, return its file system
path.
Parameters
----------
name : string
The name of a notebook file with the .ipynb extension
path : string
The relative URL path (with '/' as separator) to the named
notebook.
Returns
-------
path : string
A file system path that combines notebook_dir (location where
server started), the relative path, and the filename with the
current operating system's url.
"""
if name is not None:
path = path + '/' + name
return to_os_path(path, self.notebook_dir)
def notebook_exists(self, name, path=''):
"""Returns a True if the notebook exists. Else, returns False.
Parameters
----------
name : string
The name of the notebook you are checking.
path : string
The relative path to the notebook (with '/' as separator)
Returns
-------
bool
"""
path = path.strip('/')
nbpath = self._get_os_path(name, path=path)
return os.path.isfile(nbpath)
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service.
def list_dirs(self, path):
"""List the directories for a given API style path."""
path = path.strip('/')
os_path = self._get_os_path('', path)
if not os.path.isdir(os_path):
raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
elif is_hidden(os_path, self.notebook_dir):
self.log.info("Refusing to serve hidden directory, via 404 Error")
raise web.HTTPError(404, u'directory does not exist: %r' % os_path)
dir_names = os.listdir(os_path)
dirs = []
for name in dir_names:
os_path = self._get_os_path(name, path)
if os.path.isdir(os_path) and not is_hidden(os_path, self.notebook_dir)\
and self.should_list(name):
try:
model = self.get_dir_model(name, path)
except IOError:
pass
dirs.append(model)
dirs = sorted(dirs, key=sort_key)
return dirs
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service.
def get_dir_model(self, name, path=''):
"""Get the directory model given a directory name and its API style path"""
path = path.strip('/')
os_path = self._get_os_path(name, path)
if not os.path.isdir(os_path):
raise IOError('directory does not exist: %r' % os_path)
info = os.stat(os_path)
last_modified = tz.utcfromtimestamp(info.st_mtime)
created = tz.utcfromtimestamp(info.st_ctime)
# Create the notebook model.
model ={}
model['name'] = name
model['path'] = path
model['last_modified'] = last_modified
model['created'] = created
model['type'] = 'directory'
return model
def list_notebooks(self, path):
"""Returns a list of dictionaries that are the standard model
for all notebooks in the relative 'path'.
Parameters
----------
path : str
the URL path that describes the relative path for the
listed notebooks
Returns
-------
notebooks : list of dicts
a list of the notebook models without 'content'
"""
path = path.strip('/')
notebook_names = self.get_notebook_names(path)
notebooks = [self.get_notebook(name, path, content=False)
for name in notebook_names if self.should_list(name)]
notebooks = sorted(notebooks, key=sort_key)
return notebooks
def get_notebook(self, name, path='', content=True):
""" Takes a path and name for a notebook and returns its model
Parameters
----------
name : str
the name of the notebook
path : str
the URL path that describes the relative path for
the notebook
Returns
-------
model : dict
the notebook model. If contents=True, returns the 'contents'
dict in the model as well.
"""
path = path.strip('/')
if not self.notebook_exists(name=name, path=path):
raise web.HTTPError(404, u'Notebook does not exist: %s' % name)
os_path = self._get_os_path(name, path)
info = os.stat(os_path)
last_modified = tz.utcfromtimestamp(info.st_mtime)
created = tz.utcfromtimestamp(info.st_ctime)
# Create the notebook model.
model ={}
model['name'] = name
model['path'] = path
model['last_modified'] = last_modified
model['created'] = created
model['type'] = 'notebook'
if content:
with io.open(os_path, 'r', encoding='utf-8') as f:
try:
nb = current.read(f, u'json')
except Exception as e:
raise web.HTTPError(400, u"Unreadable Notebook: %s %s" % (os_path, e))
self.mark_trusted_cells(nb, name, path)
model['content'] = nb
return model
def save_notebook(self, model, name='', path=''):
"""Save the notebook model and return the model with no content."""
path = path.strip('/')
if 'content' not in model:
raise web.HTTPError(400, u'No notebook JSON data provided')
# One checkpoint should always exist
if self.notebook_exists(name, path) and not self.list_checkpoints(name, path):
self.create_checkpoint(name, path)
new_path = model.get('path', path).strip('/')
new_name = model.get('name', name)
if path != new_path or name != new_name:
self.rename_notebook(name, path, new_name, new_path)
# Save the notebook file
os_path = self._get_os_path(new_name, new_path)
nb = current.to_notebook_json(model['content'])
self.check_and_sign(nb, new_name, new_path)
if 'name' in nb['metadata']:
nb['metadata']['name'] = u''
try:
self.log.debug("Autosaving notebook %s", os_path)
with io.open(os_path, 'w', encoding='utf-8') as f:
current.write(nb, f, u'json')
except Exception as e:
raise web.HTTPError(400, u'Unexpected error while autosaving notebook: %s %s' % (os_path, e))
# Save .py script as well
if self.save_script:
py_path = os.path.splitext(os_path)[0] + '.py'
self.log.debug("Writing script %s", py_path)
try:
with io.open(py_path, 'w', encoding='utf-8') as f:
current.write(nb, f, u'py')
except Exception as e:
raise web.HTTPError(400, u'Unexpected error while saving notebook as script: %s %s' % (py_path, e))
model = self.get_notebook(new_name, new_path, content=False)
return model
def update_notebook(self, model, name, path=''):
"""Update the notebook's path and/or name"""
path = path.strip('/')
new_name = model.get('name', name)
new_path = model.get('path', path).strip('/')
if path != new_path or name != new_name:
self.rename_notebook(name, path, new_name, new_path)
model = self.get_notebook(new_name, new_path, content=False)
return model
def delete_notebook(self, name, path=''):
"""Delete notebook by name and path."""
path = path.strip('/')
os_path = self._get_os_path(name, path)
if not os.path.isfile(os_path):
raise web.HTTPError(404, u'Notebook does not exist: %s' % os_path)
# clear checkpoints
for checkpoint in self.list_checkpoints(name, path):
checkpoint_id = checkpoint['id']
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if os.path.isfile(cp_path):
self.log.debug("Unlinking checkpoint %s", cp_path)
os.unlink(cp_path)
self.log.debug("Unlinking notebook %s", os_path)
os.unlink(os_path)
def rename_notebook(self, old_name, old_path, new_name, new_path):
"""Rename a notebook."""
old_path = old_path.strip('/')
new_path = new_path.strip('/')
if new_name == old_name and new_path == old_path:
return
new_os_path = self._get_os_path(new_name, new_path)
old_os_path = self._get_os_path(old_name, old_path)
# Should we proceed with the move?
if os.path.isfile(new_os_path):
raise web.HTTPError(409, u'Notebook with name already exists: %s' % new_os_path)
if self.save_script:
old_py_path = os.path.splitext(old_os_path)[0] + '.py'
new_py_path = os.path.splitext(new_os_path)[0] + '.py'
if os.path.isfile(new_py_path):
raise web.HTTPError(409, u'Python script with name already exists: %s' % new_py_path)
# Move the notebook file
try:
shutil.move(old_os_path, new_os_path)
except Exception as e:
raise web.HTTPError(500, u'Unknown error renaming notebook: %s %s' % (old_os_path, e))
# Move the checkpoints
old_checkpoints = self.list_checkpoints(old_name, old_path)
for cp in old_checkpoints:
checkpoint_id = cp['id']
old_cp_path = self.get_checkpoint_path(checkpoint_id, old_name, old_path)
new_cp_path = self.get_checkpoint_path(checkpoint_id, new_name, new_path)
if os.path.isfile(old_cp_path):
self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
shutil.move(old_cp_path, new_cp_path)
# Move the .py script
if self.save_script:
shutil.move(old_py_path, new_py_path)
# Checkpoint-related utilities
def get_checkpoint_path(self, checkpoint_id, name, path=''):
"""find the path to a checkpoint"""
path = path.strip('/')
basename, _ = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=self.filename_ext,
)
os_path = self._get_os_path(path=path)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def get_checkpoint_model(self, checkpoint_id, name, path=''):
"""construct the info dict for a given checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
stats = os.stat(cp_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id = checkpoint_id,
last_modified = last_modified,
)
return info
# public checkpoint API
def create_checkpoint(self, name, path=''):
"""Create a checkpoint from the current state of a notebook"""
path = path.strip('/')
nb_path = self._get_os_path(name, path)
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
self.log.debug("creating checkpoint for notebook %s", name)
self._copy(nb_path, cp_path)
# return the checkpoint info
return self.get_checkpoint_model(checkpoint_id, name, path)
def list_checkpoints(self, name, path=''):
"""list the checkpoints for a given notebook
This notebook manager currently only supports one checkpoint per notebook.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.exists(os_path):
return []
else:
return [self.get_checkpoint_model(checkpoint_id, name, path)]
def restore_checkpoint(self, checkpoint_id, name, path=''):
"""restore a notebook to a checkpointed state"""
path = path.strip('/')
self.log.info("restoring Notebook %s from checkpoint %s", name, checkpoint_id)
nb_path = self._get_os_path(name, path)
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.isfile(cp_path):
self.log.debug("checkpoint file does not exist: %s", cp_path)
raise web.HTTPError(404,
u'Notebook checkpoint does not exist: %s-%s' % (name, checkpoint_id)
)
# ensure notebook is readable (never restore from an unreadable notebook)
with io.open(cp_path, 'r', encoding='utf-8') as f:
current.read(f, u'json')
self._copy(cp_path, nb_path)
self.log.debug("copying %s -> %s", cp_path, nb_path)
def delete_checkpoint(self, checkpoint_id, name, path=''):
"""delete a notebook's checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, name, path)
if not os.path.isfile(cp_path):
raise web.HTTPError(404,
u'Notebook checkpoint does not exist: %s%s-%s' % (path, name, checkpoint_id)
)
self.log.debug("unlinking %s", cp_path)
os.unlink(cp_path)
def info_string(self):
return "Serving notebooks from local directory: %s" % self.notebook_dir
def get_kernel_path(self, name, path='', model=None):
""" Return the path to start kernel in """
return os.path.join(self.notebook_dir, path)

@ -1,288 +0,0 @@
"""Tornado handlers for the notebooks web service.
Authors:
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import json
from tornado import web
from IPython.html.utils import url_path_join, url_escape
from IPython.utils.jsonutil import date_default
from IPython.html.base.handlers import (IPythonHandler, json_errors,
notebook_path_regex, path_regex,
notebook_name_regex)
#-----------------------------------------------------------------------------
# Notebook web service handlers
#-----------------------------------------------------------------------------
class NotebookHandler(IPythonHandler):
SUPPORTED_METHODS = (u'GET', u'PUT', u'PATCH', u'POST', u'DELETE')
def notebook_location(self, name, path=''):
"""Return the full URL location of a notebook based.
Parameters
----------
name : unicode
The base name of the notebook, such as "foo.ipynb".
path : unicode
The URL path of the notebook.
"""
return url_escape(url_path_join(
self.base_url, 'api', 'notebooks', path, name
))
def _finish_model(self, model, location=True):
"""Finish a JSON request with a model, setting relevant headers, etc."""
if location:
location = self.notebook_location(model['name'], model['path'])
self.set_header('Location', location)
self.set_header('Last-Modified', model['last_modified'])
self.finish(json.dumps(model, default=date_default))
@web.authenticated
@json_errors
def get(self, path='', name=None):
"""Return a Notebook or list of notebooks.
* GET with path and no notebook name lists notebooks in a directory
* GET with path and notebook name returns notebook JSON
"""
nbm = self.notebook_manager
# Check to see if a notebook name was given
if name is None:
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service. This should only handle notebooks
# and not directories.
dirs = nbm.list_dirs(path)
notebooks = []
index = []
for nb in nbm.list_notebooks(path):
if nb['name'].lower() == 'index.ipynb':
index.append(nb)
else:
notebooks.append(nb)
notebooks = index + dirs + notebooks
self.finish(json.dumps(notebooks, default=date_default))
return
# get and return notebook representation
model = nbm.get_notebook(name, path)
self._finish_model(model, location=False)
@web.authenticated
@json_errors
def patch(self, path='', name=None):
"""PATCH renames a notebook without re-uploading content."""
nbm = self.notebook_manager
if name is None:
raise web.HTTPError(400, u'Notebook name missing')
model = self.get_json_body()
if model is None:
raise web.HTTPError(400, u'JSON body missing')
model = nbm.update_notebook(model, name, path)
self._finish_model(model)
def _copy_notebook(self, copy_from, path, copy_to=None):
"""Copy a notebook in path, optionally specifying the new name.
Only support copying within the same directory.
"""
self.log.info(u"Copying notebook from %s/%s to %s/%s",
path, copy_from,
path, copy_to or '',
)
model = self.notebook_manager.copy_notebook(copy_from, copy_to, path)
self.set_status(201)
self._finish_model(model)
def _upload_notebook(self, model, path, name=None):
"""Upload a notebook
If name specified, create it in path/name.
"""
self.log.info(u"Uploading notebook to %s/%s", path, name or '')
if name:
model['name'] = name
model = self.notebook_manager.create_notebook(model, path)
self.set_status(201)
self._finish_model(model)
def _create_empty_notebook(self, path, name=None):
"""Create an empty notebook in path
If name specified, create it in path/name.
"""
self.log.info(u"Creating new notebook in %s/%s", path, name or '')
model = {}
if name:
model['name'] = name
model = self.notebook_manager.create_notebook(model, path=path)
self.set_status(201)
self._finish_model(model)
def _save_notebook(self, model, path, name):
"""Save an existing notebook."""
self.log.info(u"Saving notebook at %s/%s", path, name)
model = self.notebook_manager.save_notebook(model, name, path)
if model['path'] != path.strip('/') or model['name'] != name:
# a rename happened, set Location header
location = True
else:
location = False
self._finish_model(model, location)
@web.authenticated
@json_errors
def post(self, path='', name=None):
"""Create a new notebook in the specified path.
POST creates new notebooks. The server always decides on the notebook name.
POST /api/notebooks/path
New untitled notebook in path. If content specified, upload a
notebook, otherwise start empty.
POST /api/notebooks/path?copy=OtherNotebook.ipynb
New copy of OtherNotebook in path
"""
if name is not None:
raise web.HTTPError(400, "Only POST to directories. Use PUT for full names.")
model = self.get_json_body()
if model is not None:
copy_from = model.get('copy_from')
if copy_from:
if model.get('content'):
raise web.HTTPError(400, "Can't upload and copy at the same time.")
self._copy_notebook(copy_from, path)
else:
self._upload_notebook(model, path)
else:
self._create_empty_notebook(path)
@web.authenticated
@json_errors
def put(self, path='', name=None):
"""Saves the notebook in the location specified by name and path.
PUT is very similar to POST, but the requester specifies the name,
whereas with POST, the server picks the name.
PUT /api/notebooks/path/Name.ipynb
Save notebook at ``path/Name.ipynb``. Notebook structure is specified
in `content` key of JSON request body. If content is not specified,
create a new empty notebook.
PUT /api/notebooks/path/Name.ipynb?copy=OtherNotebook.ipynb
Copy OtherNotebook to Name
"""
if name is None:
raise web.HTTPError(400, "Only PUT to full names. Use POST for directories.")
model = self.get_json_body()
if model:
copy_from = model.get('copy_from')
if copy_from:
if model.get('content'):
raise web.HTTPError(400, "Can't upload and copy at the same time.")
self._copy_notebook(copy_from, path, name)
elif self.notebook_manager.notebook_exists(name, path):
self._save_notebook(model, path, name)
else:
self._upload_notebook(model, path, name)
else:
self._create_empty_notebook(path, name)
@web.authenticated
@json_errors
def delete(self, path='', name=None):
"""delete the notebook in the given notebook path"""
nbm = self.notebook_manager
nbm.delete_notebook(name, path)
self.set_status(204)
self.finish()
class NotebookCheckpointsHandler(IPythonHandler):
SUPPORTED_METHODS = ('GET', 'POST')
@web.authenticated
@json_errors
def get(self, path='', name=None):
"""get lists checkpoints for a notebook"""
nbm = self.notebook_manager
checkpoints = nbm.list_checkpoints(name, path)
data = json.dumps(checkpoints, default=date_default)
self.finish(data)
@web.authenticated
@json_errors
def post(self, path='', name=None):
"""post creates a new checkpoint"""
nbm = self.notebook_manager
checkpoint = nbm.create_checkpoint(name, path)
data = json.dumps(checkpoint, default=date_default)
location = url_path_join(self.base_url, 'api/notebooks',
path, name, 'checkpoints', checkpoint['id'])
self.set_header('Location', url_escape(location))
self.set_status(201)
self.finish(data)
class ModifyNotebookCheckpointsHandler(IPythonHandler):
SUPPORTED_METHODS = ('POST', 'DELETE')
@web.authenticated
@json_errors
def post(self, path, name, checkpoint_id):
"""post restores a notebook from a checkpoint"""
nbm = self.notebook_manager
nbm.restore_checkpoint(checkpoint_id, name, path)
self.set_status(204)
self.finish()
@web.authenticated
@json_errors
def delete(self, path, name, checkpoint_id):
"""delete clears a checkpoint for a given notebook"""
nbm = self.notebook_manager
nbm.delete_checkpoint(checkpoint_id, name, path)
self.set_status(204)
self.finish()
#-----------------------------------------------------------------------------
# URL to handler mappings
#-----------------------------------------------------------------------------
_checkpoint_id_regex = r"(?P<checkpoint_id>[\w-]+)"
default_handlers = [
(r"/api/notebooks%s/checkpoints" % notebook_path_regex, NotebookCheckpointsHandler),
(r"/api/notebooks%s/checkpoints/%s" % (notebook_path_regex, _checkpoint_id_regex),
ModifyNotebookCheckpointsHandler),
(r"/api/notebooks%s" % notebook_path_regex, NotebookHandler),
(r"/api/notebooks%s" % path_regex, NotebookHandler),
]

@ -1,287 +0,0 @@
"""A base class notebook manager.
Authors:
* Brian Granger
* Zach Sailer
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from fnmatch import fnmatch
import itertools
import os
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import current, sign
from IPython.utils.traitlets import Instance, Unicode, List
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class NotebookManager(LoggingConfigurable):
filename_ext = Unicode(u'.ipynb')
notary = Instance(sign.NotebookNotary)
def _notary_default(self):
return sign.NotebookNotary(parent=self)
hide_globs = List(Unicode, [u'__pycache__'], config=True, help="""
Glob patterns to hide in file and directory listings.
""")
# NotebookManager API part 1: methods that must be
# implemented in subclasses.
def path_exists(self, path):
"""Does the API-style path (directory) actually exist?
Override this method in subclasses.
Parameters
----------
path : string
The path to check
Returns
-------
exists : bool
Whether the path does indeed exist.
"""
raise NotImplementedError
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to base notebook-dir).
Returns
-------
exists : bool
Whether the path is hidden.
"""
raise NotImplementedError
def notebook_exists(self, name, path=''):
"""Returns a True if the notebook exists. Else, returns False.
Parameters
----------
name : string
The name of the notebook you are checking.
path : string
The relative path to the notebook (with '/' as separator)
Returns
-------
bool
"""
raise NotImplementedError('must be implemented in a subclass')
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service.
def list_dirs(self, path):
"""List the directory models for a given API style path."""
raise NotImplementedError('must be implemented in a subclass')
# TODO: Remove this after we create the contents web service and directories are
# no longer listed by the notebook web service.
def get_dir_model(self, name, path=''):
"""Get the directory model given a directory name and its API style path.
The keys in the model should be:
* name
* path
* last_modified
* created
* type='directory'
"""
raise NotImplementedError('must be implemented in a subclass')
def list_notebooks(self, path=''):
"""Return a list of notebook dicts without content.
This returns a list of dicts, each of the form::
dict(notebook_id=notebook,name=name)
This list of dicts should be sorted by name::
data = sorted(data, key=lambda item: item['name'])
"""
raise NotImplementedError('must be implemented in a subclass')
def get_notebook(self, name, path='', content=True):
"""Get the notebook model with or without content."""
raise NotImplementedError('must be implemented in a subclass')
def save_notebook(self, model, name, path=''):
"""Save the notebook and return the model with no content."""
raise NotImplementedError('must be implemented in a subclass')
def update_notebook(self, model, name, path=''):
"""Update the notebook and return the model with no content."""
raise NotImplementedError('must be implemented in a subclass')
def delete_notebook(self, name, path=''):
"""Delete notebook by name and path."""
raise NotImplementedError('must be implemented in a subclass')
def create_checkpoint(self, name, path=''):
"""Create a checkpoint of the current state of a notebook
Returns a checkpoint_id for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, name, path=''):
"""Return a list of checkpoints for a given notebook"""
return []
def restore_checkpoint(self, checkpoint_id, name, path=''):
"""Restore a notebook from one of its checkpoints"""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, name, path=''):
"""delete a checkpoint for a notebook"""
raise NotImplementedError("must be implemented in a subclass")
def info_string(self):
return "Serving notebooks"
# NotebookManager API part 2: methods that have useable default
# implementations, but can be overridden in subclasses.
def get_kernel_path(self, name, path='', model=None):
""" Return the path to start kernel in """
return path
def increment_filename(self, basename, path=''):
"""Increment a notebook filename without the .ipynb to make it unique.
Parameters
----------
basename : unicode
The name of a notebook without the ``.ipynb`` file extension.
path : unicode
The URL path of the notebooks directory
Returns
-------
name : unicode
A notebook name (with the .ipynb extension) that starts
with basename and does not refer to any existing notebook.
"""
path = path.strip('/')
for i in itertools.count():
name = u'{basename}{i}{ext}'.format(basename=basename, i=i,
ext=self.filename_ext)
if not self.notebook_exists(name, path):
break
return name
def create_notebook(self, model=None, path=''):
"""Create a new notebook and return its model with no content."""
path = path.strip('/')
if model is None:
model = {}
if 'content' not in model:
metadata = current.new_metadata(name=u'')
model['content'] = current.new_notebook(metadata=metadata)
if 'name' not in model:
model['name'] = self.increment_filename('Untitled', path)
model['path'] = path
model = self.save_notebook(model, model['name'], model['path'])
return model
def copy_notebook(self, from_name, to_name=None, path=''):
"""Copy an existing notebook and return its new model.
If to_name not specified, increment `from_name-Copy#.ipynb`.
"""
path = path.strip('/')
model = self.get_notebook(from_name, path)
if not to_name:
base = os.path.splitext(from_name)[0] + '-Copy'
to_name = self.increment_filename(base, path)
model['name'] = to_name
model = self.save_notebook(model, to_name, path)
return model
def log_info(self):
self.log.info(self.info_string())
def trust_notebook(self, name, path=''):
"""Explicitly trust a notebook
Parameters
----------
name : string
The filename of the notebook
path : string
The notebook's directory
"""
model = self.get_notebook(name, path)
nb = model['content']
self.log.warn("Trusting notebook %s/%s", path, name)
self.notary.mark_cells(nb, True)
self.save_notebook(model, name, path)
def check_and_sign(self, nb, name, path=''):
"""Check for trusted cells, and sign the notebook.
Called as a part of saving notebooks.
Parameters
----------
nb : dict
The notebook structure
name : string
The filename of the notebook
path : string
The notebook's directory
"""
if self.notary.check_cells(nb):
self.notary.sign(nb)
else:
self.log.warn("Saving untrusted notebook %s/%s", path, name)
def mark_trusted_cells(self, nb, name, path=''):
"""Mark cells as trusted if the notebook signature matches.
Called as a part of loading notebooks.
Parameters
----------
nb : dict
The notebook structure
name : string
The filename of the notebook
path : string
The notebook's directory
"""
trusted = self.notary.check_signature(nb)
if not trusted:
self.log.warn("Notebook %s/%s is not trusted", path, name)
self.notary.mark_cells(nb, trusted)
def should_list(self, name):
"""Should this file/directory name be displayed in a listing?"""
return not any(fnmatch(name, glob) for glob in self.hide_globs)

@ -1,20 +1,7 @@
"""Tornado handlers for the sessions web service.
"""Tornado handlers for the sessions web service."""
Authors:
* Zach Sailer
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import json
@ -24,10 +11,6 @@ from ...base.handlers import IPythonHandler, json_errors
from IPython.utils.jsonutil import date_default
from IPython.html.utils import url_path_join, url_escape
#-----------------------------------------------------------------------------
# Session web service handlers
#-----------------------------------------------------------------------------
class SessionRootHandler(IPythonHandler):
@ -45,6 +28,8 @@ class SessionRootHandler(IPythonHandler):
# Creates a new session
#(unless a session already exists for the named nb)
sm = self.session_manager
cm = self.contents_manager
km = self.kernel_manager
model = self.get_json_body()
if model is None:

@ -32,7 +32,7 @@ from IPython.utils.traitlets import Instance
class SessionManager(LoggingConfigurable):
kernel_manager = Instance('IPython.html.services.kernels.kernelmanager.MappingKernelManager')
notebook_manager = Instance('IPython.html.services.notebooks.nbmanager.NotebookManager', args=())
contents_manager = Instance('IPython.html.services.contents.manager.ContentsManager', args=())
# Session database initialized below
_cursor = None
@ -77,7 +77,7 @@ class SessionManager(LoggingConfigurable):
"""Creates a session and returns its model"""
session_id = self.new_session_id()
# allow nbm to specify kernels cwd
kernel_path = self.notebook_manager.get_kernel_path(name=name, path=path)
kernel_path = self.contents_manager.get_kernel_path(name=name, path=path)
kernel_id = self.kernel_manager.start_kernel(path=kernel_path,
kernel_name=kernel_name)
return self.save_session(session_id, name=name, path=path,

@ -1885,6 +1885,8 @@ define([
var model = {};
model.name = this.notebook_name;
model.path = this.notebook_path;
model.type = 'notebook';
model.format = 'json';
model.content = this.toJSON();
model.content.nbformat = this.nbformat;
model.content.nbformat_minor = this.nbformat_minor;
@ -1908,7 +1910,7 @@ define([
this.events.trigger('notebook_saving.Notebook');
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name
);
@ -2041,7 +2043,7 @@ define([
};
var url = utils.url_join_encode(
base_url,
'api/notebooks',
'api/contents',
path
);
$.ajax(url,settings);
@ -2070,7 +2072,7 @@ define([
};
var url = utils.url_join_encode(
base_url,
'api/notebooks',
'api/contents',
path
);
$.ajax(url,settings);
@ -2095,7 +2097,7 @@ define([
this.events.trigger('rename_notebook.Notebook', data);
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name
);
@ -2113,7 +2115,7 @@ define([
};
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name
);
@ -2182,7 +2184,7 @@ define([
this.events.trigger('notebook_loading.Notebook');
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name
);
@ -2345,7 +2347,7 @@ define([
Notebook.prototype.list_checkpoints = function () {
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name,
'checkpoints'
@ -2396,7 +2398,7 @@ define([
Notebook.prototype.create_checkpoint = function () {
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name,
'checkpoints'
@ -2485,7 +2487,7 @@ define([
this.events.trigger('notebook_restoring.Notebook', checkpoint);
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name,
'checkpoints',
@ -2533,7 +2535,7 @@ define([
this.events.trigger('notebook_restoring.Notebook', checkpoint);
var url = utils.url_join_encode(
this.base_url,
'api/notebooks',
'api/contents',
this.notebook_path,
this.notebook_name,
'checkpoints',

@ -7955,6 +7955,22 @@ input.engine_num_input {
.notebook_icon:before.pull-right {
margin-left: .3em;
}
.file_icon:before {
display: inline-block;
font-family: FontAwesome;
font-style: normal;
font-weight: normal;
line-height: 1;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
content: "\f016";
}
.file_icon:before.pull-left {
margin-right: .3em;
}
.file_icon:before.pull-right {
margin-left: .3em;
}
/*!
*
* IPython notebook

@ -19,7 +19,7 @@ define([
// base_url: string
// notebook_path: string
notebooklist.NotebookList.call(this, selector, $.extend({
element_name: 'running'},
element_name: 'running'},
options));
};
@ -28,13 +28,20 @@ define([
KernelList.prototype.sessions_loaded = function (d) {
this.sessions = d;
this.clear_list();
var item;
for (var path in d) {
item = this.new_notebook_item(-1);
this.add_link('', path, item);
this.add_shutdown_button(item, this.sessions[path]);
var item, path_name;
for (path_name in d) {
if (!d.hasOwnProperty(path_name)) {
// nothing is safe in javascript
continue;
}
item = this.new_item(-1);
this.add_link({
name: path_name,
path: '',
type: 'notebook',
}, item);
this.add_shutdown_button(item, this.sessions[path_name]);
}
$('#running_list_header').toggle($.isEmptyObject(d));
};

@ -75,30 +75,27 @@ define([
}
for (var i = 0; i < files.length; i++) {
var f = files[i];
var reader = new FileReader();
reader.readAsText(f);
var name_and_ext = utils.splitext(f.name);
var file_ext = name_and_ext[1];
var reader = new FileReader();
if (file_ext === '.ipynb') {
var item = that.new_notebook_item(0);
item.addClass('new-file');
that.add_name_input(f.name, item);
// Store the notebook item in the reader so we can use it later
// to know which item it belongs to.
$(reader).data('item', item);
reader.onload = function (event) {
var nbitem = $(event.target).data('item');
that.add_notebook_data(event.target.result, nbitem);
that.add_upload_button(nbitem);
};
reader.readAsText(f);
} else {
var dialog_body = 'Uploaded notebooks must be .ipynb files';
dialog.modal({
title : 'Invalid file type',
body : dialog_body,
buttons : {'OK' : {'class' : 'btn-primary'}}
});
// read non-notebook files as binary
reader.readAsArrayBuffer(f);
}
var item = that.new_item(0);
item.addClass('new-file');
that.add_name_input(f.name, item);
// Store the list item in the reader so we can use it later
// to know which item it belongs to.
$(reader).data('item', item);
reader.onload = function (event) {
var item = $(event.target).data('item');
that.add_file_data(event.target.result, item);
that.add_upload_button(item);
};
}
// Replace the file input form wth a clone of itself. This is required to
// reset the form. Otherwise, if you upload a file, delete it and try to
@ -148,7 +145,7 @@ define([
var url = utils.url_join_encode(
this.base_url,
'api',
'notebooks',
'contents',
this.notebook_path
);
$.ajax(url, settings);
@ -161,10 +158,12 @@ define([
message = param.msg;
}
var item = null;
var len = data.length;
var model = null;
var list = data.content;
var len = list.length;
this.clear_list();
if (len === 0) {
item = this.new_notebook_item(0);
item = this.new_item(0);
var span12 = item.children().first();
span12.empty();
span12.append($('<div style="margin:auto;text-align:center;color:grey"/>').text(message));
@ -172,31 +171,24 @@ define([
var path = this.notebook_path;
var offset = 0;
if (path !== '') {
item = this.new_notebook_item(0);
this.add_dir(path, '..', item);
item = this.new_item(0);
model = {
type: 'directory',
name: '..',
path: path,
};
this.add_link(model, item);
offset = 1;
}
for (var i=0; i<len; i++) {
if (data[i].type === 'directory') {
var name = data[i].name;
item = this.new_notebook_item(i+offset);
this.add_dir(path, name, item);
} else {
var name = data[i].name;
item = this.new_notebook_item(i+offset);
this.add_link(path, name, item);
name = utils.url_path_join(path, name);
if(this.sessions[name] === undefined){
this.add_delete_button(item);
} else {
this.add_shutdown_button(item,this.sessions[name]);
}
}
model = list[i];
item = this.new_item(i+offset);
this.add_link(model, item);
}
};
NotebookList.prototype.new_notebook_item = function (index) {
NotebookList.prototype.new_item = function (index) {
var item = $('<div/>').addClass("list_item").addClass("row");
// item.addClass('list_item ui-widget ui-widget-content ui-helper-clearfix');
// item.css('border-top-style','none');
@ -219,55 +211,70 @@ define([
};
NotebookList.prototype.add_dir = function (path, name, item) {
NotebookList.icons = {
directory: 'folder_icon',
notebook: 'notebook_icon',
file: 'file_icon',
};
NotebookList.uri_prefixes = {
directory: 'tree',
notebook: 'notebooks',
file: 'files',
};
NotebookList.prototype.add_link = function (model, item) {
var path = model.path,
name = model.name;
item.data('name', name);
item.data('path', path);
item.find(".item_name").text(name);
item.find(".item_icon").addClass('folder_icon').addClass('icon-fixed-width');
item.find("a.item_link")
var icon = NotebookList.icons[model.type];
var uri_prefix = NotebookList.uri_prefixes[model.type];
item.find(".item_icon").addClass(icon).addClass('icon-fixed-width');
var link = item.find("a.item_link")
.attr('href',
utils.url_join_encode(
this.base_url,
"tree",
uri_prefix,
path,
name
)
);
// directory nav doesn't open new tabs
// files, notebooks do
if (model.type !== "directory") {
link.attr('target','_blank');
}
var path_name = utils.url_path_join(path, name);
if (model.type == 'file') {
this.add_delete_button(item);
} else if (model.type == 'notebook') {
if(this.sessions[path_name] === undefined){
this.add_delete_button(item);
} else {
this.add_shutdown_button(item, this.sessions[path_name]);
}
}
};
NotebookList.prototype.add_link = function (path, nbname, item) {
item.data('nbname', nbname);
item.data('path', path);
item.find(".item_name").text(nbname);
item.find(".item_icon").addClass('notebook_icon').addClass('icon-fixed-width');
item.find("a.item_link")
.attr('href',
utils.url_join_encode(
this.base_url,
"notebooks",
path,
nbname
)
).attr('target','_blank');
};
NotebookList.prototype.add_name_input = function (nbname, item) {
item.data('nbname', nbname);
NotebookList.prototype.add_name_input = function (name, item) {
item.data('name', name);
item.find(".item_icon").addClass('notebook_icon').addClass('icon-fixed-width');
item.find(".item_name").empty().append(
$('<input/>')
.addClass("nbname_input")
.attr('value', utils.splitext(nbname)[0])
.addClass("filename_input")
.attr('value', name)
.attr('size', '30')
.attr('type', 'text')
);
};
NotebookList.prototype.add_notebook_data = function (data, item) {
item.data('nbdata', data);
NotebookList.prototype.add_file_data = function (data, item) {
item.data('filedata', data);
};
@ -304,13 +311,13 @@ define([
click(function (e) {
// $(this) is the button that was clicked.
var that = $(this);
// We use the nbname and notebook_id from the parent notebook_item element's
// data because the outer scopes values change as we iterate through the loop.
// We use the filename from the parent list_item element's
// data because the outer scope's values change as we iterate through the loop.
var parent_item = that.parents('div.list_item');
var nbname = parent_item.data('nbname');
var message = 'Are you sure you want to permanently delete the notebook: ' + nbname + '?';
var name = parent_item.data('name');
var message = 'Are you sure you want to permanently delete the file: ' + name + '?';
dialog.modal({
title : "Delete notebook",
title : "Delete file",
body : message,
buttons : {
Delete : {
@ -328,9 +335,9 @@ define([
};
var url = utils.url_join_encode(
notebooklist.base_url,
'api/notebooks',
'api/contents',
notebooklist.notebook_path,
nbname
name
);
$.ajax(url, settings);
}
@ -344,30 +351,69 @@ define([
};
NotebookList.prototype.add_upload_button = function (item) {
NotebookList.prototype.add_upload_button = function (item, type) {
var that = this;
var upload_button = $('<button/>').text("Upload")
.addClass('btn btn-primary btn-xs upload_button')
.click(function (e) {
var nbname = item.find('.item_name > input').val();
if (nbname.slice(nbname.length-6, nbname.length) != ".ipynb") {
nbname = nbname + ".ipynb";
}
var path = that.notebook_path;
var nbdata = item.data('nbdata');
var content_type = 'application/json';
var filename = item.find('.item_name > input').val();
var filedata = item.data('filedata');
var format = 'text';
if (filedata instanceof ArrayBuffer) {
// base64-encode binary file data
var bytes = '';
var buf = new Uint8Array(filedata);
var nbytes = buf.byteLength;
for (var i=0; i<nbytes; i++) {
bytes += String.fromCharCode(buf[i]);
}
filedata = btoa(bytes);
format = 'base64';
}
var model = {
content : JSON.parse(nbdata),
path: path,
name: filename
};
var name_and_ext = utils.splitext(filename);
var file_ext = name_and_ext[1];
var content_type;
if (file_ext === '.ipynb') {
model.type = 'notebook';
model.format = 'json';
try {
model.content = JSON.parse(filedata);
} catch (e) {
dialog.modal({
title : 'Cannot upload invalid Notebook',
body : "The error was: " + e,
buttons : {'OK' : {
'class' : 'btn-primary',
click: function () {
item.remove();
}
}}
});
}
content_type = 'application/json';
} else {
model.type = 'file';
model.format = format;
model.content = filedata;
content_type = 'application/octet-stream';
}
var filedata = item.data('filedata');
var settings = {
processData : false,
cache : false,
type : 'PUT',
dataType : 'json',
data : JSON.stringify(model),
headers : {'Content-Type': content_type},
success : function (data, status, xhr) {
that.add_link(path, nbname, item);
item.removeClass('new-file');
that.add_link(model, item);
that.add_delete_button(item);
},
error : utils.log_ajax_error,
@ -375,9 +421,9 @@ define([
var url = utils.url_join_encode(
that.base_url,
'api/notebooks',
'api/contents',
that.notebook_path,
nbname
filename
);
$.ajax(url, settings);
return false;
@ -385,7 +431,6 @@ define([
var cancel_button = $('<button/>').text("Cancel")
.addClass("btn btn-default btn-xs")
.click(function (e) {
console.log('cancel click');
item.remove();
return false;
});
@ -419,7 +464,7 @@ define([
};
var url = utils.url_join_encode(
base_url,
'api/notebooks',
'api/contents',
path
);
$.ajax(url, settings);
@ -441,7 +486,8 @@ define([
});
};
// Backwards compatability.
// Backwards compatability.
IPython.NotebookList = NotebookList;
return {'NotebookList': NotebookList};

@ -147,3 +147,7 @@ input.engine_num_input {
.notebook_icon:before {
.icon(@fa-var-book)
}
.file_icon:before {
.icon(@fa-var-file-o)
}

@ -10,7 +10,6 @@
{% block params %}
data-project="{{project}}"
data-base-url="{{base_url}}"
data-notebook-path="{{notebook_path}}"

@ -33,7 +33,7 @@ class NotebookTestBase(TestCase):
@classmethod
def wait_until_alive(cls):
"""Wait for the server to be alive"""
url = 'http://localhost:%i/api/notebooks' % cls.port
url = 'http://localhost:%i/api/contents' % cls.port
for _ in range(int(MAX_WAITTIME/POLL_INTERVAL)):
try:
requests.get(url)

@ -1,28 +1,12 @@
"""Tornado handlers for the tree view.
"""Tornado handlers for the tree view."""
Authors:
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from tornado import web
from ..base.handlers import IPythonHandler, notebook_path_regex, path_regex
from ..utils import url_path_join, url_escape
#-----------------------------------------------------------------------------
# Handlers
#-----------------------------------------------------------------------------
class TreeHandler(IPythonHandler):
"""Render the tree view, listing notebooks, clusters, etc."""
@ -51,7 +35,7 @@ class TreeHandler(IPythonHandler):
@web.authenticated
def get(self, path='', name=None):
path = path.strip('/')
nbm = self.notebook_manager
cm = self.contents_manager
if name is not None:
# is a notebook, redirect to notebook handler
url = url_escape(url_path_join(
@ -60,16 +44,15 @@ class TreeHandler(IPythonHandler):
self.log.debug("Redirecting %s to %s", self.request.path, url)
self.redirect(url)
else:
if not nbm.path_exists(path=path):
if not cm.path_exists(path=path):
# Directory is hidden or does not exist.
raise web.HTTPError(404)
elif nbm.is_hidden(path):
elif cm.is_hidden(path):
self.log.info("Refusing to serve hidden directory, via 404 Error")
raise web.HTTPError(404)
breadcrumbs = self.generate_breadcrumbs(path)
page_title = self.generate_page_title(path)
self.write(self.render_template('tree.html',
project=self.project_dir,
page_title=page_title,
notebook_path=path,
breadcrumbs=breadcrumbs

@ -113,6 +113,9 @@ def is_hidden(abs_path, abs_root=''):
# check UF_HIDDEN on any location up to root
path = abs_path
while path and path.startswith(abs_root) and path != abs_root:
if not os.path.exists(path):
path = os.path.dirname(path)
continue
try:
# may fail on Windows junctions
st = os.stat(path)

Loading…
Cancel
Save