diff --git a/IPython/html/services/contents/filemanager.py b/IPython/html/services/contents/filemanager.py
index 047e6b0e9..74feafc67 100644
--- a/IPython/html/services/contents/filemanager.py
+++ b/IPython/html/services/contents/filemanager.py
@@ -111,9 +111,20 @@ class FileManagerMixin(object):
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
- except OSError as e:
+ except OSError:
self.log.debug("copystat on %s failed", dest, exc_info=True)
+ def _read_notebook(self, os_path, as_version=4):
+ """Read a notebook from an os path."""
+ with self.open(os_path, 'r', encoding='utf-8') as f:
+ try:
+ return nbformat.read(f, as_version=as_version)
+ except Exception as e:
+ raise web.HTTPError(
+ 400,
+ u"Unreadable Notebook: %s %r" % (os_path, e),
+ )
+
def _get_os_path(self, path):
"""Given an API path, return its file system path.
@@ -129,82 +140,6 @@ class FileManagerMixin(object):
"""
return to_os_path(path, self.root_dir)
- def is_hidden(self, path):
- """Does the API style path correspond to a hidden directory or file?
-
- Parameters
- ----------
- path : string
- The path to check. This is an API path (`/` separated,
- relative to root_dir).
-
- Returns
- -------
- hidden : bool
- Whether the path exists and is hidden.
- """
- path = path.strip('/')
- os_path = self._get_os_path(path=path)
- return is_hidden(os_path, self.root_dir)
-
- def file_exists(self, path):
- """Returns True if the file exists, else returns False.
-
- API-style wrapper for os.path.isfile
-
- Parameters
- ----------
- path : string
- The relative path to the file (with '/' as separator)
-
- Returns
- -------
- exists : bool
- Whether the file exists.
- """
- path = path.strip('/')
- os_path = self._get_os_path(path)
- return os.path.isfile(os_path)
-
- def dir_exists(self, path):
- """Does the API-style path refer to an extant directory?
-
- API-style wrapper for os.path.isdir
-
- Parameters
- ----------
- path : string
- The path to check. This is an API path (`/` separated,
- relative to root_dir).
-
- Returns
- -------
- exists : bool
- Whether the path is indeed a directory.
- """
- path = path.strip('/')
- os_path = self._get_os_path(path=path)
- return os.path.isdir(os_path)
-
- def exists(self, path):
- """Returns True if the path exists, else returns False.
-
- API-style wrapper for os.path.exists
-
- Parameters
- ----------
- path : string
- The API path to the file (with '/' as separator)
-
- Returns
- -------
- exists : bool
- Whether the target exists.
- """
- path = path.strip('/')
- os_path = self._get_os_path(path=path)
- return os.path.exists(os_path)
-
class FileCheckpointManager(FileManagerMixin, CheckpointManager):
"""
@@ -223,30 +158,44 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
""",
)
- @property
- def root_dir(self):
+ root_dir = Unicode(config=True)
+
+ def _root_dir_default(self):
try:
return self.parent.root_dir
except AttributeError:
return getcwd()
# public checkpoint API
- def create_checkpoint(self, path):
- """Create a checkpoint from the current state of a file"""
+ def create_checkpoint(self, nb, path):
+ """Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
- if not self.file_exists(path):
- raise web.HTTPError(404)
- src_path = self._get_os_path(path)
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
- cp_path = self.get_checkpoint_path(checkpoint_id, path)
+ os_checkpoint_path = self.get_checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
- self._copy(src_path, cp_path)
+ self._save_notebook(os_checkpoint_path, nb)
# return the checkpoint info
return self.get_checkpoint_model(checkpoint_id, path)
+ def get_checkpoint_content(self, checkpoint_id, path):
+ """Get the content of a checkpoint.
+
+ Returns an unvalidated model with the same structure as
+ the return value of ContentsManager.get
+ """
+ path = path.strip('/')
+ self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
+ os_checkpoint_path = self.get_checkpoint_path(checkpoint_id, path)
+ return self._read_notebook(os_checkpoint_path, as_version=4)
+
+ def _save_notebook(self, os_path, nb):
+ """Save a notebook file."""
+ with self.atomic_writing(os_path, encoding='utf-8') as f:
+ nbformat.write(nb, f, version=nbformat.NO_CONVERT)
+
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a checkpoint from old_path to new_path."""
old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path)
@@ -260,6 +209,17 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
with self.perm_to_403():
shutil.move(old_cp_path, new_cp_path)
+ def delete_checkpoint(self, checkpoint_id, path):
+ """delete a file's checkpoint"""
+ path = path.strip('/')
+ cp_path = self.get_checkpoint_path(checkpoint_id, path)
+ if not os.path.isfile(cp_path):
+ self.no_such_checkpoint(path, checkpoint_id)
+
+ self.log.debug("unlinking %s", cp_path)
+ with self.perm_to_403():
+ os.unlink(cp_path)
+
def list_checkpoints(self, path):
"""list the checkpoints for a given file
@@ -273,35 +233,6 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
else:
return [self.get_checkpoint_model(checkpoint_id, path)]
- def restore_checkpoint(self, checkpoint_id, path):
- """restore a file to a checkpointed state"""
- path = path.strip('/')
- self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
- nb_path = self._get_os_path(path)
- cp_path = self.get_checkpoint_path(checkpoint_id, path)
- if not os.path.isfile(cp_path):
- self.log.debug("checkpoint file does not exist: %s", cp_path)
- self.no_such_checkpoint(path, checkpoint_id)
-
- # ensure notebook is readable (never restore from unreadable notebook)
- if cp_path.endswith('.ipynb'):
- with self.open(cp_path, 'r', encoding='utf-8') as f:
- nbformat.read(f, as_version=4)
- self.log.debug("copying %s -> %s", cp_path, nb_path)
- with self.perm_to_403():
- self._copy(cp_path, nb_path)
-
- def delete_checkpoint(self, checkpoint_id, path):
- """delete a file's checkpoint"""
- path = path.strip('/')
- cp_path = self.get_checkpoint_path(checkpoint_id, path)
- if not os.path.isfile(cp_path):
- self.no_such_checkpoint(path, checkpoint_id)
-
- self.log.debug("unlinking %s", cp_path)
- with self.perm_to_403():
- os.unlink(cp_path)
-
# Checkpoint-related utilities
def get_checkpoint_path(self, checkpoint_id, path):
"""find the path to a checkpoint"""
@@ -413,6 +344,82 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
def _checkpoint_manager_class_default(self):
return FileCheckpointManager
+ def is_hidden(self, path):
+ """Does the API style path correspond to a hidden directory or file?
+
+ Parameters
+ ----------
+ path : string
+ The path to check. This is an API path (`/` separated,
+ relative to root_dir).
+
+ Returns
+ -------
+ hidden : bool
+ Whether the path exists and is hidden.
+ """
+ path = path.strip('/')
+ os_path = self._get_os_path(path=path)
+ return is_hidden(os_path, self.root_dir)
+
+ def file_exists(self, path):
+ """Returns True if the file exists, else returns False.
+
+ API-style wrapper for os.path.isfile
+
+ Parameters
+ ----------
+ path : string
+ The relative path to the file (with '/' as separator)
+
+ Returns
+ -------
+ exists : bool
+ Whether the file exists.
+ """
+ path = path.strip('/')
+ os_path = self._get_os_path(path)
+ return os.path.isfile(os_path)
+
+ def dir_exists(self, path):
+ """Does the API-style path refer to an extant directory?
+
+ API-style wrapper for os.path.isdir
+
+ Parameters
+ ----------
+ path : string
+ The path to check. This is an API path (`/` separated,
+ relative to root_dir).
+
+ Returns
+ -------
+ exists : bool
+ Whether the path is indeed a directory.
+ """
+ path = path.strip('/')
+ os_path = self._get_os_path(path=path)
+ return os.path.isdir(os_path)
+
+ def exists(self, path):
+ """Returns True if the path exists, else returns False.
+
+ API-style wrapper for os.path.exists
+
+ Parameters
+ ----------
+ path : string
+ The API path to the file (with '/' as separator)
+
+ Returns
+ -------
+ exists : bool
+ Whether the target exists.
+ """
+ path = path.strip('/')
+ os_path = self._get_os_path(path=path)
+ return os.path.exists(os_path)
+
def _base_model(self, path):
"""Build the common base of a contents model"""
os_path = self._get_os_path(path)
@@ -518,7 +525,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
return model
-
def _notebook_model(self, path, content=True):
"""Build a notebook model
@@ -529,11 +535,7 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
model['type'] = 'notebook'
if content:
os_path = self._get_os_path(path)
- with self.open(os_path, 'r', encoding='utf-8') as f:
- try:
- nb = nbformat.read(f, as_version=4)
- except Exception as e:
- raise web.HTTPError(400, u"Unreadable Notebook: %s %r" % (os_path, e))
+ nb = self._read_notebook(os_path, as_version=4)
self.mark_trusted_cells(nb, path)
model['content'] = nb
model['format'] = 'json'
@@ -582,13 +584,15 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
model = self._file_model(path, content=content, format=format)
return model
- def _save_notebook(self, os_path, model, path=''):
+ def _save_notebook(self, os_path, model, path):
"""save a notebook file"""
- # Save the notebook file
nb = nbformat.from_dict(model['content'])
-
self.check_and_sign(nb, path)
+ # One checkpoint should always exist for notebooks.
+ if not self.checkpoint_manager.list_checkpoints(path):
+ self.checkpoint_manager.create_checkpoint(nb, path)
+
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
@@ -632,11 +636,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
self.run_pre_save_hook(model=model, path=path)
- cp_mgr = self.checkpoint_manager
- # One checkpoint should always exist
- if self.file_exists(path) and not cp_mgr.list_checkpoints(path):
- cp_mgr.create_checkpoint(path)
-
os_path = self._get_os_path(path)
self.log.debug("Saving %s", os_path)
try:
diff --git a/IPython/html/services/contents/manager.py b/IPython/html/services/contents/manager.py
index 195cbf59b..10eeda56e 100644
--- a/IPython/html/services/contents/manager.py
+++ b/IPython/html/services/contents/manager.py
@@ -11,6 +11,7 @@ import re
from tornado.web import HTTPError
+from IPython import nbformat
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import sign, validate, ValidationError
from IPython.nbformat.v4 import new_notebook
@@ -34,33 +35,36 @@ class CheckpointManager(LoggingConfigurable):
Base class for managing checkpoints for a ContentsManager.
"""
- def create_checkpoint(self, path):
+ def create_checkpoint(self, nb, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint_id for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
+ def get_checkpoint_content(self, checkpoint_id, path):
+ """Get the content of a checkpoint.
+
+ Returns an unvalidated model with the same structure as
+ the return value of ContentsManager.get
+ """
+ raise NotImplementedError("must be implemented in a subclass")
+
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
- """Rename a checkpoint from old_path to new_path."""
+ """Rename a single checkpoint from old_path to new_path."""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, path):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
- def restore_checkpoint(self, checkpoint_id, path):
- """Restore a file from one of its checkpoints"""
- raise NotImplementedError("must be implemented in a subclass")
-
def list_checkpoints(self, path):
"""Return a list of checkpoints for a given file"""
raise NotImplementedError("must be implemented in a subclass")
def rename_all_checkpoints(self, old_path, new_path):
"""Rename all checkpoints for old_path to new_path."""
- old_checkpoints = self.list_checkpoints(old_path)
- for cp in old_checkpoints:
+ for cp in self.list_checkpoints(old_path):
self.rename_checkpoint(cp['id'], old_path, new_path)
def delete_all_checkpoints(self, path):
@@ -490,22 +494,34 @@ class ContentsManager(LoggingConfigurable):
return not any(fnmatch(name, glob) for glob in self.hide_globs)
# Part 3: Checkpoints API
- # By default, all methods are forwarded to our CheckpointManager instance.
def create_checkpoint(self, path):
- return self.checkpoint_manager.create_checkpoint(path)
+ """Create a checkpoint."""
- def rename_checkpoint(self, checkpoint_id, old_path, new_path):
- return self.checkpoint_manager.rename_checkpoint(
- checkpoint_id,
- old_path,
- new_path,
- )
+ nb = nbformat.from_dict(self.get(path, content=True)['content'])
+ self.check_and_sign(nb, path)
+ return self.checkpoint_manager.create_checkpoint(nb, path)
def list_checkpoints(self, path):
return self.checkpoint_manager.list_checkpoints(path)
def restore_checkpoint(self, checkpoint_id, path):
- return self.checkpoint_manager.restore_checkpoint(checkpoint_id, path)
+ """
+ Restore a checkpoint.
+ """
+ nb = self.checkpoint_manager.get_checkpoint_content(
+ checkpoint_id,
+ path,
+ )
+
+ self.mark_trusted_cells(nb, path)
+
+ model = {
+ 'content': nb,
+ 'type': 'notebook',
+ }
+
+ self.validate_notebook_model(model)
+ return self.save(model, path)
def delete_checkpoint(self, checkpoint_id, path):
return self.checkpoint_manager.delete_checkpoint(checkpoint_id, path)