Merge pull request #7324 from quantopian/separate-checkpoint-manager

DEV: Refactor checkpoint logic out of FileContentsManager into a separate class.
Min RK 12 years ago
commit 3cf6eacc86

@ -0,0 +1,127 @@
"""
Classes for managing Checkpoints.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from tornado.web import HTTPError
from IPython.config.configurable import LoggingConfigurable
class Checkpoints(LoggingConfigurable):
"""
Base class for managing checkpoints for a ContentsManager.
Subclasses are required to implement:
create_checkpoint(self, contents_mgr, path)
restore_checkpoint(self, contents_mgr, checkpoint_id, path)
rename_checkpoint(self, checkpoint_id, old_path, new_path)
delete_checkpoint(self, checkpoint_id, path)
list_checkpoints(self, path)
"""
def create_checkpoint(self, contents_mgr, path):
"""Create a checkpoint."""
raise NotImplementedError("must be implemented in a subclass")
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint"""
raise NotImplementedError("must be implemented in a subclass")
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a single checkpoint from old_path to new_path."""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, path):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, path):
"""Return a list of checkpoints for a given file"""
raise NotImplementedError("must be implemented in a subclass")
def rename_all_checkpoints(self, old_path, new_path):
"""Rename all checkpoints for old_path to new_path."""
for cp in self.list_checkpoints(old_path):
self.rename_checkpoint(cp['id'], old_path, new_path)
def delete_all_checkpoints(self, path):
"""Delete all checkpoints for the given path."""
for checkpoint in self.list_checkpoints(path):
self.delete_checkpoint(checkpoint['id'], path)
class GenericCheckpointsMixin(object):
"""
Helper for creating Checkpoints subclasses that can be used with any
ContentsManager.
Provides a ContentsManager-agnostic implementation of `create_checkpoint`
and `restore_checkpoint` in terms of the following operations:
- create_file_checkpoint(self, content, format, path)
- create_notebook_checkpoint(self, nb, path)
- get_file_checkpoint(self, checkpoint_id, path)
- get_notebook_checkpoint(self, checkpoint_id, path)
To create a generic CheckpointManager, add this mixin to a class that
implement the above three methods plus the remaining Checkpoints API
methods:
- delete_checkpoint(self, checkpoint_id, path)
- list_checkpoints(self, path)
- rename_checkpoint(self, checkpoint_id, old_path, new_path)
"""
def create_checkpoint(self, contents_mgr, path):
model = contents_mgr.get(path, content=True)
type = model['type']
if type == 'notebook':
return self.create_notebook_checkpoint(
model['content'],
path,
)
elif type == 'file':
return self.create_file_checkpoint(
model['content'],
model['format'],
path,
)
else:
raise HTTPError(500, u'Unexpected type %s' % type)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
type = contents_mgr.get(path, content=False)['type']
if type == 'notebook':
model = self.get_notebook_checkpoint(checkpoint_id, path)
elif type == 'file':
model = self.get_file_checkpoint(checkpoint_id, path)
else:
raise HTTPError(500, u'Unexpected type %s' % type)
contents_mgr.save(model, path)
# Required Methods
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def get_checkpoint(self, checkpoint_id, path, type):
"""Get the content of a checkpoint.
Returns an unvalidated model with the same structure as
the return value of ContentsManager.get
"""
raise NotImplementedError("must be implemented in a subclass")

@ -0,0 +1,200 @@
"""
File-based Checkpoints implementations.
"""
import os
import shutil
from tornado.web import HTTPError
from .checkpoints import (
Checkpoints,
GenericCheckpointsMixin,
)
from .fileio import FileManagerMixin
from IPython.utils import tz
from IPython.utils.path import ensure_dir_exists
from IPython.utils.py3compat import getcwd
from IPython.utils.traitlets import Unicode
class FileCheckpoints(FileManagerMixin, Checkpoints):
"""
A Checkpoints that caches checkpoints for files in adjacent
directories.
Only works with FileContentsManager. Use GenericFileCheckpoints if
you want file-based checkpoints with another ContentsManager.
"""
checkpoint_dir = Unicode(
'.ipynb_checkpoints',
config=True,
help="""The directory name in which to keep file checkpoints
This is a path relative to the file's own directory.
By default, it is .ipynb_checkpoints
""",
)
root_dir = Unicode(config=True)
def _root_dir_default(self):
try:
return self.parent.root_dir
except AttributeError:
return getcwd()
# ContentsManager-dependent checkpoint API
def create_checkpoint(self, contents_mgr, path):
"""Create a checkpoint."""
checkpoint_id = u'checkpoint'
src_path = contents_mgr._get_os_path(path)
dest_path = self.checkpoint_path(checkpoint_id, path)
self._copy(src_path, dest_path)
return self.checkpoint_model(checkpoint_id, dest_path)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
src_path = self.checkpoint_path(checkpoint_id, path)
dest_path = contents_mgr._get_os_path(path)
self._copy(src_path, dest_path)
# ContentsManager-independent checkpoint API
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a checkpoint from old_path to new_path."""
old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
if os.path.isfile(old_cp_path):
self.log.debug(
"Renaming checkpoint %s -> %s",
old_cp_path,
new_cp_path,
)
with self.perm_to_403():
shutil.move(old_cp_path, new_cp_path)
def delete_checkpoint(self, checkpoint_id, path):
"""delete a file's checkpoint"""
path = path.strip('/')
cp_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(cp_path):
self.no_such_checkpoint(path, checkpoint_id)
self.log.debug("unlinking %s", cp_path)
with self.perm_to_403():
os.unlink(cp_path)
def list_checkpoints(self, path):
"""list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_path):
return []
else:
return [self.checkpoint_model(checkpoint_id, os_path)]
# Checkpoint-related utilities
def checkpoint_path(self, checkpoint_id, path):
"""find the path to a checkpoint"""
path = path.strip('/')
parent, name = ('/' + path).rsplit('/', 1)
parent = parent.strip('/')
basename, ext = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=ext,
)
os_path = self._get_os_path(path=parent)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
with self.perm_to_403():
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def checkpoint_model(self, checkpoint_id, os_path):
"""construct the info dict for a given checkpoint"""
stats = os.stat(os_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id=checkpoint_id,
last_modified=last_modified,
)
return info
# Error Handling
def no_such_checkpoint(self, path, checkpoint_id):
raise HTTPError(
404,
u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
)
class GenericFileCheckpoints(GenericCheckpointsMixin, FileCheckpoints):
"""
Local filesystem Checkpoints that works with any conforming
ContentsManager.
"""
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_file(os_checkpoint_path, content, format=format)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_notebook(os_checkpoint_path, nb)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def get_notebook_checkpoint(self, checkpoint_id, path):
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_checkpoint_path):
self.no_such_checkpoint(path, checkpoint_id)
return {
'type': 'notebook',
'content': self._read_notebook(
os_checkpoint_path,
as_version=4,
),
}
def get_file_checkpoint(self, checkpoint_id, path):
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_checkpoint_path):
self.no_such_checkpoint(path, checkpoint_id)
content, format = self._read_file(os_checkpoint_path, format=None)
return {
'type': 'file',
'content': content,
'format': format,
}

@ -0,0 +1,166 @@
"""
Utilities for file-based Contents/Checkpoints managers.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
import errno
import io
import os
import shutil
from tornado.web import HTTPError
from IPython.html.utils import (
to_api_path,
to_os_path,
)
from IPython import nbformat
from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import str_to_unicode
class FileManagerMixin(object):
"""
Mixin for ContentsAPI classes that interact with the filesystem.
Provides facilities for reading, writing, and copying both notebooks and
generic files.
Shared by FileContentsManager and FileCheckpoints.
Note
----
Classes using this mixin must provide the following attributes:
root_dir : unicode
A directory against against which API-style paths are to be resolved.
log : logging.Logger
"""
@contextmanager
def open(self, os_path, *args, **kwargs):
"""wrapper around io.open that turns permission errors into 403"""
with self.perm_to_403(os_path):
with io.open(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def atomic_writing(self, os_path, *args, **kwargs):
"""wrapper around atomic_writing that turns permission errors to 403"""
with self.perm_to_403(os_path):
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def perm_to_403(self, os_path=''):
"""context manager for turning permission errors into 403."""
try:
yield
except OSError as e:
if e.errno in {errno.EPERM, errno.EACCES}:
# make 403 error message without root prefix
# this may not work perfectly on unicode paths on Python 2,
# but nobody should be doing that anyway.
if not os_path:
os_path = str_to_unicode(e.filename or 'unknown file')
path = to_api_path(os_path, root=self.root_dir)
raise HTTPError(403, u'Permission denied: %s' % path)
else:
raise
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, path):
"""Given an API path, return its file system path.
Parameters
----------
path : string
The relative API path to the named file.
Returns
-------
path : string
Native, absolute OS path to for a file.
"""
return to_os_path(path, self.root_dir)
def _read_notebook(self, os_path, as_version=4):
"""Read a notebook from an os path."""
with self.open(os_path, 'r', encoding='utf-8') as f:
try:
return nbformat.read(f, as_version=as_version)
except Exception as e:
raise HTTPError(
400,
u"Unreadable Notebook: %s %r" % (os_path, e),
)
def _save_notebook(self, os_path, nb):
"""Save a notebook to an os_path."""
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
def _read_file(self, os_path, format):
"""Read a non-notebook file.
os_path: The path to be read.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
if not os.path.isfile(os_path):
raise HTTPError(400, "Cannot read non-file %s" % os_path)
with self.open(os_path, 'rb') as f:
bcontent = f.read()
if format is None or format == 'text':
# Try to interpret as unicode if format is unknown or if unicode
# was explicitly requested.
try:
return bcontent.decode('utf8'), 'text'
except UnicodeError:
if format == 'text':
raise HTTPError(
400,
"%s is not UTF-8 encoded" % os_path,
reason='bad format',
)
return base64.encodestring(bcontent).decode('ascii'), 'base64'
def _save_file(self, os_path, content, format):
"""Save content of a generic file."""
if format not in {'text', 'base64'}:
raise HTTPError(
400,
"Must specify format of file contents as 'text' or 'base64'",
)
try:
if format == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise HTTPError(
400, u'Encoding error saving %s: %s' % (os_path, e)
)
with self.atomic_writing(os_path, text=False) as f:
f.write(bcontent)

@ -3,28 +3,31 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
import errno
import io
import os
import shutil
from contextlib import contextmanager
import mimetypes
from tornado import web
from .filecheckpoints import FileCheckpoints
from .fileio import FileManagerMixin
from .manager import ContentsManager
from IPython import nbformat
from IPython.utils.io import atomic_writing
from IPython.utils.importstring import import_item
from IPython.utils.path import ensure_dir_exists
from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
from IPython.utils.py3compat import getcwd, str_to_unicode, string_types
from IPython.utils.py3compat import getcwd, string_types
from IPython.utils import tz
from IPython.html.utils import is_hidden, to_os_path, to_api_path
from IPython.html.utils import (
is_hidden,
to_api_path,
)
_script_exporter = None
def _post_save_script(model, os_path, contents_manager, **kwargs):
"""convert notebooks to Python script after save with nbconvert
@ -48,7 +51,8 @@ def _post_save_script(model, os_path, contents_manager, **kwargs):
with io.open(script_fname, 'w', encoding='utf-8') as f:
f.write(script)
class FileContentsManager(ContentsManager):
class FileContentsManager(FileManagerMixin, ContentsManager):
root_dir = Unicode(config=True)
@ -57,38 +61,7 @@ class FileContentsManager(ContentsManager):
return self.parent.notebook_dir
except AttributeError:
return getcwd()
@contextmanager
def perm_to_403(self, os_path=''):
"""context manager for turning permission errors into 403"""
try:
yield
except OSError as e:
if e.errno in {errno.EPERM, errno.EACCES}:
# make 403 error message without root prefix
# this may not work perfectly on unicode paths on Python 2,
# but nobody should be doing that anyway.
if not os_path:
os_path = str_to_unicode(e.filename or 'unknown file')
path = to_api_path(os_path, self.root_dir)
raise web.HTTPError(403, u'Permission denied: %s' % path)
else:
raise
@contextmanager
def open(self, os_path, *args, **kwargs):
"""wrapper around io.open that turns permission errors into 403"""
with self.perm_to_403(os_path):
with io.open(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def atomic_writing(self, os_path, *args, **kwargs):
"""wrapper around atomic_writing that turns permission errors into 403"""
with self.perm_to_403(os_path):
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook')
def _save_script_changed(self):
self.log.warn("""
@ -148,60 +121,8 @@ class FileContentsManager(ContentsManager):
if not os.path.isdir(new):
raise TraitError("%r is not a directory" % new)
checkpoint_dir = Unicode('.ipynb_checkpoints', config=True,
help="""The directory name in which to keep file checkpoints
This is a path relative to the file's own directory.
By default, it is .ipynb_checkpoints
"""
)
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError as e:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, path):
"""Given an API path, return its file system path.
Parameters
----------
path : string
The relative API path to the named file.
Returns
-------
path : string
Native, absolute OS path to for a file.
"""
return to_os_path(path, self.root_dir)
def dir_exists(self, path):
"""Does the API-style path refer to an extant directory?
API-style wrapper for os.path.isdir
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
exists : bool
Whether the path is indeed a directory.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.isdir(os_path)
def _checkpoints_class_default(self):
return FileCheckpoints
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?
@ -240,6 +161,26 @@ class FileContentsManager(ContentsManager):
os_path = self._get_os_path(path)
return os.path.isfile(os_path)
def dir_exists(self, path):
"""Does the API-style path refer to an extant directory?
API-style wrapper for os.path.isdir
Parameters
----------
path : string
The path to check. This is an API path (`/` separated,
relative to root_dir).
Returns
-------
exists : bool
Whether the path is indeed a directory.
"""
path = path.strip('/')
os_path = self._get_os_path(path=path)
return os.path.isdir(os_path)
def exists(self, path):
"""Returns True if the path exists, else returns False.
@ -338,33 +279,20 @@ class FileContentsManager(ContentsManager):
os_path = self._get_os_path(path)
if content:
if not os.path.isfile(os_path):
# could be FIFO
raise web.HTTPError(400, "Cannot get content of non-file %s" % os_path)
with self.open(os_path, 'rb') as f:
bcontent = f.read()
if format != 'base64':
try:
model['content'] = bcontent.decode('utf8')
except UnicodeError as e:
if format == 'text':
raise web.HTTPError(400, "%s is not UTF-8 encoded" % path, reason='bad format')
else:
model['format'] = 'text'
default_mime = 'text/plain'
if model['content'] is None:
model['content'] = base64.encodestring(bcontent).decode('ascii')
model['format'] = 'base64'
if model['format'] == 'base64':
default_mime = 'application/octet-stream'
model['mimetype'] = mimetypes.guess_type(os_path)[0] or default_mime
content, format = self._read_file(os_path, format)
default_mime = {
'text': 'text/plain',
'base64': 'application/octet-stream'
}[format]
model.update(
content=content,
format=format,
mimetype=mimetypes.guess_type(os_path)[0] or default_mime,
)
return model
def _notebook_model(self, path, content=True):
"""Build a notebook model
@ -375,11 +303,7 @@ class FileContentsManager(ContentsManager):
model['type'] = 'notebook'
if content:
os_path = self._get_os_path(path)
with self.open(os_path, 'r', encoding='utf-8') as f:
try:
nb = nbformat.read(f, as_version=4)
except Exception as e:
raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
nb = self._read_notebook(os_path, as_version=4)
self.mark_trusted_cells(nb, path)
model['content'] = nb
model['format'] = 'json'
@ -428,33 +352,6 @@ class FileContentsManager(ContentsManager):
model = self._file_model(path, content=content, format=format)
return model
def _save_notebook(self, os_path, model, path=''):
"""save a notebook file"""
# Save the notebook file
nb = nbformat.from_dict(model['content'])
self.check_and_sign(nb, path)
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
def _save_file(self, os_path, model, path=''):
"""save a non-notebook file"""
fmt = model.get('format', None)
if fmt not in {'text', 'base64'}:
raise web.HTTPError(400, "Must specify format of file contents as 'text' or 'base64'")
try:
content = model['content']
if fmt == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
with self.atomic_writing(os_path, text=False) as f:
f.write(bcontent)
def _save_directory(self, os_path, model, path=''):
"""create a directory"""
if is_hidden(os_path, self.root_dir):
@ -478,17 +375,19 @@ class FileContentsManager(ContentsManager):
self.run_pre_save_hook(model=model, path=path)
# One checkpoint should always exist
if self.file_exists(path) and not self.list_checkpoints(path):
self.create_checkpoint(path)
os_path = self._get_os_path(path)
self.log.debug("Saving %s", os_path)
try:
if model['type'] == 'notebook':
self._save_notebook(os_path, model, path)
nb = nbformat.from_dict(model['content'])
self.check_and_sign(nb, path)
self._save_notebook(os_path, nb)
# One checkpoint should always exist for notebooks.
if not self.checkpoints.list_checkpoints(path):
self.create_checkpoint(path)
elif model['type'] == 'file':
self._save_file(os_path, model, path)
# Missing format will be handled internally by _save_file.
self._save_file(os_path, model['content'], model.get('format'))
elif model['type'] == 'directory':
self._save_directory(os_path, model, path)
else:
@ -512,28 +411,23 @@ class FileContentsManager(ContentsManager):
return model
def delete(self, path):
def delete_file(self, path):
"""Delete file at path."""
path = path.strip('/')
os_path = self._get_os_path(path)
rm = os.unlink
if os.path.isdir(os_path):
listing = os.listdir(os_path)
# don't delete non-empty directories (checkpoints dir doesn't count)
if listing and listing != [self.checkpoint_dir]:
raise web.HTTPError(400, u'Directory %s not empty' % os_path)
# Don't delete non-empty directories.
# A directory containing only leftover checkpoints is
# considered empty.
cp_dir = getattr(self.checkpoints, 'checkpoint_dir', None)
for entry in listing:
if entry != cp_dir:
raise web.HTTPError(400, u'Directory %s not empty' % os_path)
elif not os.path.isfile(os_path):
raise web.HTTPError(404, u'File does not exist: %s' % os_path)
# clear checkpoints
for checkpoint in self.list_checkpoints(path):
checkpoint_id = checkpoint['id']
cp_path = self.get_checkpoint_path(checkpoint_id, path)
if os.path.isfile(cp_path):
self.log.debug("Unlinking checkpoint %s", cp_path)
with self.perm_to_403():
rm(cp_path)
if os.path.isdir(os_path):
self.log.debug("Removing directory %s", os_path)
with self.perm_to_403():
@ -543,7 +437,7 @@ class FileContentsManager(ContentsManager):
with self.perm_to_403():
rm(os_path)
def rename(self, old_path, new_path):
def rename_file(self, old_path, new_path):
"""Rename a file."""
old_path = old_path.strip('/')
new_path = new_path.strip('/')
@ -566,111 +460,6 @@ class FileContentsManager(ContentsManager):
except Exception as e:
raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e))
# Move the checkpoints
old_checkpoints = self.list_checkpoints(old_path)
for cp in old_checkpoints:
checkpoint_id = cp['id']
old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path)
if os.path.isfile(old_cp_path):
self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path)
with self.perm_to_403():
shutil.move(old_cp_path, new_cp_path)
# Checkpoint-related utilities
def get_checkpoint_path(self, checkpoint_id, path):
"""find the path to a checkpoint"""
path = path.strip('/')
parent, name = ('/' + path).rsplit('/', 1)
parent = parent.strip('/')
basename, ext = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=ext,
)
os_path = self._get_os_path(path=parent)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
with self.perm_to_403():
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def get_checkpoint_model(self, checkpoint_id, path):
"""construct the info dict for a given checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, path)
stats = os.stat(cp_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id = checkpoint_id,
last_modified = last_modified,
)
return info
# public checkpoint API
def create_checkpoint(self, path):
"""Create a checkpoint from the current state of a file"""
path = path.strip('/')
if not self.file_exists(path):
raise web.HTTPError(404)
src_path = self._get_os_path(path)
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
cp_path = self.get_checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._copy(src_path, cp_path)
# return the checkpoint info
return self.get_checkpoint_model(checkpoint_id, path)
def list_checkpoints(self, path):
"""list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.get_checkpoint_path(checkpoint_id, path)
if not os.path.exists(os_path):
return []
else:
return [self.get_checkpoint_model(checkpoint_id, path)]
def restore_checkpoint(self, checkpoint_id, path):
"""restore a file to a checkpointed state"""
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
nb_path = self._get_os_path(path)
cp_path = self.get_checkpoint_path(checkpoint_id, path)
if not os.path.isfile(cp_path):
self.log.debug("checkpoint file does not exist: %s", cp_path)
raise web.HTTPError(404,
u'checkpoint does not exist: %s@%s' % (path, checkpoint_id)
)
# ensure notebook is readable (never restore from an unreadable notebook)
if cp_path.endswith('.ipynb'):
with self.open(cp_path, 'r', encoding='utf-8') as f:
nbformat.read(f, as_version=4)
self.log.debug("copying %s -> %s", cp_path, nb_path)
with self.perm_to_403():
self._copy(cp_path, nb_path)
def delete_checkpoint(self, checkpoint_id, path):
"""delete a file's checkpoint"""
path = path.strip('/')
cp_path = self.get_checkpoint_path(checkpoint_id, path)
if not os.path.isfile(cp_path):
raise web.HTTPError(404,
u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
)
self.log.debug("unlinking %s", cp_path)
os.unlink(cp_path)
def info_string(self):
return "Serving notebooks from local directory: %s" % self.root_dir

@ -11,15 +11,25 @@ import re
from tornado.web import HTTPError
from .checkpoints import Checkpoints
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import sign, validate, ValidationError
from IPython.nbformat.v4 import new_notebook
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Instance, Unicode, List, Any, TraitError
from IPython.utils.traitlets import (
Any,
Dict,
Instance,
List,
TraitError,
Type,
Unicode,
)
from IPython.utils.py3compat import string_types
copy_pat = re.compile(r'\-Copy\d*\.')
class ContentsManager(LoggingConfigurable):
"""Base class for serving files and directories.
@ -97,6 +107,19 @@ class ContentsManager(LoggingConfigurable):
except Exception:
self.log.error("Pre-save hook failed on %s", path, exc_info=True)
checkpoints_class = Type(Checkpoints, config=True)
checkpoints = Instance(Checkpoints, config=True)
checkpoints_kwargs = Dict(allow_none=False, config=True)
def _checkpoints_default(self):
return self.checkpoints_class(**self.checkpoints_kwargs)
def _checkpoints_kwargs_default(self):
return dict(
parent=self,
log=self.log,
)
# ContentsManager API part 1: methods that must be
# implemented in subclasses.
@ -186,32 +209,27 @@ class ContentsManager(LoggingConfigurable):
"""
raise NotImplementedError('must be implemented in a subclass')
def delete(self, path):
def delete_file(self, path):
"""Delete file or directory by path."""
raise NotImplementedError('must be implemented in a subclass')
def create_checkpoint(self, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint_id for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, path):
"""Return a list of checkpoints for a given file"""
return []
def restore_checkpoint(self, checkpoint_id, path):
"""Restore a file from one of its checkpoints"""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, path):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
def rename_file(self, old_path, new_path):
"""Rename a file."""
raise NotImplementedError('must be implemented in a subclass')
# ContentsManager API part 2: methods that have useable default
# implementations, but can be overridden in subclasses.
def delete(self, path):
"""Delete a file/directory and any associated checkpoints."""
self.delete_file(path)
self.checkpoints.delete_all_checkpoints(path)
def rename(self, old_path, new_path):
"""Rename a file and any checkpoints associated with that file."""
self.rename_file(old_path, new_path)
self.checkpoints.rename_all_checkpoints(old_path, new_path)
def update(self, model, path):
"""Update the file's path
@ -431,3 +449,20 @@ class ContentsManager(LoggingConfigurable):
def should_list(self, name):
"""Should this file/directory name be displayed in a listing?"""
return not any(fnmatch(name, glob) for glob in self.hide_globs)
# Part 3: Checkpoints API
def create_checkpoint(self, path):
"""Create a checkpoint."""
return self.checkpoints.create_checkpoint(self, path)
def restore_checkpoint(self, checkpoint_id, path):
"""
Restore a checkpoint.
"""
self.checkpoints.restore_checkpoint(self, checkpoint_id, path)
def list_checkpoints(self, path):
return self.checkpoints.list_checkpoints(path)
def delete_checkpoint(self, checkpoint_id, path):
return self.checkpoints.delete_checkpoint(checkpoint_id, path)

@ -2,6 +2,7 @@
"""Test the contents webservice API."""
import base64
from contextlib import contextmanager
import io
import json
import os
@ -12,6 +13,9 @@ pjoin = os.path.join
import requests
from ..filecheckpoints import GenericFileCheckpoints
from IPython.config import Config
from IPython.html.utils import url_path_join, url_escape, to_os_path
from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
from IPython.nbformat import read, write, from_dict
@ -21,6 +25,7 @@ from IPython.nbformat.v4 import (
from IPython.nbformat import v2
from IPython.utils import py3compat
from IPython.utils.data import uniq_stable
from IPython.utils.tempdir import TemporaryDirectory
def notebooks_only(dir_model):
@ -502,7 +507,6 @@ class APITest(NotebookTestBase):
self.assertEqual(newnb.cells[0].source,
u'Created by test ³')
def test_checkpoints(self):
resp = self.api.read('foo/a.ipynb')
r = self.api.new_checkpoint('foo/a.ipynb')
@ -540,3 +544,93 @@ class APITest(NotebookTestBase):
self.assertEqual(r.status_code, 204)
cps = self.api.get_checkpoints('foo/a.ipynb').json()
self.assertEqual(cps, [])
def test_file_checkpoints(self):
"""
Test checkpointing of non-notebook files.
"""
filename = 'foo/a.txt'
resp = self.api.read(filename)
orig_content = json.loads(resp.text)['content']
# Create a checkpoint.
r = self.api.new_checkpoint(filename)
self.assertEqual(r.status_code, 201)
cp1 = r.json()
self.assertEqual(set(cp1), {'id', 'last_modified'})
self.assertEqual(r.headers['Location'].split('/')[-1], cp1['id'])
# Modify the file and save.
new_content = orig_content + '\nsecond line'
model = {
'content': new_content,
'type': 'file',
'format': 'text',
}
resp = self.api.save(filename, body=json.dumps(model))
# List checkpoints
cps = self.api.get_checkpoints(filename).json()
self.assertEqual(cps, [cp1])
content = self.api.read(filename).json()['content']
self.assertEqual(content, new_content)
# Restore cp1
r = self.api.restore_checkpoint(filename, cp1['id'])
self.assertEqual(r.status_code, 204)
restored_content = self.api.read(filename).json()['content']
self.assertEqual(restored_content, orig_content)
# Delete cp1
r = self.api.delete_checkpoint(filename, cp1['id'])
self.assertEqual(r.status_code, 204)
cps = self.api.get_checkpoints(filename).json()
self.assertEqual(cps, [])
@contextmanager
def patch_cp_root(self, dirname):
"""
Temporarily patch the root dir of our checkpoint manager.
"""
cpm = self.notebook.contents_manager.checkpoints
old_dirname = cpm.root_dir
cpm.root_dir = dirname
try:
yield
finally:
cpm.root_dir = old_dirname
def test_checkpoints_separate_root(self):
"""
Test that FileCheckpoints functions correctly even when it's
using a different root dir from FileContentsManager. This also keeps
the implementation honest for use with ContentsManagers that don't map
models to the filesystem
Override this method to a no-op when testing other managers.
"""
with TemporaryDirectory() as td:
with self.patch_cp_root(td):
self.test_checkpoints()
with TemporaryDirectory() as td:
with self.patch_cp_root(td):
self.test_file_checkpoints()
class GenericFileCheckpointsAPITest(APITest):
"""
Run the tests from APITest with GenericFileCheckpoints.
"""
config = Config()
config.FileContentsManager.checkpoints_class = GenericFileCheckpoints
def test_config_did_something(self):
self.assertIsInstance(
self.notebook.contents_manager.checkpoints,
GenericFileCheckpoints,
)

@ -84,11 +84,16 @@ class TestFileContentsManager(TestCase):
root = td
os.mkdir(os.path.join(td, subd))
fm = FileContentsManager(root_dir=root)
cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb')
cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd)
cpm = fm.checkpoints
cp_dir = cpm.checkpoint_path(
'cp', 'test.ipynb'
)
cp_subdir = cpm.checkpoint_path(
'cp', '/%s/test.ipynb' % subd
)
self.assertNotEqual(cp_dir, cp_subdir)
self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name))
self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name))
self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name))
self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name))
@dec.skip_win32
def test_bad_symlink(self):

Loading…
Cancel
Save