diff --git a/IPython/html/services/contents/filemanager.py b/IPython/html/services/contents/filemanager.py index 9a1ab9f27..047e6b0e9 100644 --- a/IPython/html/services/contents/filemanager.py +++ b/IPython/html/services/contents/filemanager.py @@ -4,24 +4,31 @@ # Distributed under the terms of the Modified BSD License. import base64 +from contextlib import contextmanager import errno import io import os import shutil -from contextlib import contextmanager import mimetypes from tornado import web -from .manager import ContentsManager +from .manager import ( + CheckpointManager, + ContentsManager, +) from IPython import nbformat from IPython.utils.io import atomic_writing from IPython.utils.importstring import import_item from IPython.utils.path import ensure_dir_exists from IPython.utils.traitlets import Any, Unicode, Bool, TraitError -from IPython.utils.py3compat import getcwd, str_to_unicode, string_types +from IPython.utils.py3compat import getcwd, string_types, str_to_unicode from IPython.utils import tz -from IPython.html.utils import is_hidden, to_os_path, to_api_path +from IPython.html.utils import ( + is_hidden, + to_api_path, + to_os_path, +) _script_exporter = None @@ -48,19 +55,40 @@ def _post_save_script(model, os_path, contents_manager, **kwargs): with io.open(script_fname, 'w', encoding='utf-8') as f: f.write(script) -class FileContentsManager(ContentsManager): - root_dir = Unicode(config=True) +class FileManagerMixin(object): + """ + Mixin for ContentsAPI classes that interact with the filesystem. + + Shared by both FileContentsManager and FileCheckpointManager. + + Note + ---- + Classes using this mixin must provide the following attributes: + + root_dir : unicode + A directory against against which API-style paths are to be resolved. + + log : logging.Logger + """ + + @contextmanager + def open(self, os_path, *args, **kwargs): + """wrapper around io.open that turns permission errors into 403""" + with self.perm_to_403(os_path): + with io.open(os_path, *args, **kwargs) as f: + yield f + + @contextmanager + def atomic_writing(self, os_path, *args, **kwargs): + """wrapper around atomic_writing that turns permission errors into 403""" + with self.perm_to_403(os_path): + with atomic_writing(os_path, *args, **kwargs) as f: + yield f - def _root_dir_default(self): - try: - return self.parent.notebook_dir - except AttributeError: - return getcwd() - @contextmanager def perm_to_403(self, os_path=''): - """context manager for turning permission errors into 403""" + """context manager for turning permission errors into 403.""" try: yield except OSError as e: @@ -70,92 +98,10 @@ class FileContentsManager(ContentsManager): # but nobody should be doing that anyway. if not os_path: os_path = str_to_unicode(e.filename or 'unknown file') - path = to_api_path(os_path, self.root_dir) + path = to_api_path(os_path, root=self.root_dir) raise web.HTTPError(403, u'Permission denied: %s' % path) else: raise - - @contextmanager - def open(self, os_path, *args, **kwargs): - """wrapper around io.open that turns permission errors into 403""" - with self.perm_to_403(os_path): - with io.open(os_path, *args, **kwargs) as f: - yield f - - @contextmanager - def atomic_writing(self, os_path, *args, **kwargs): - """wrapper around atomic_writing that turns permission errors into 403""" - with self.perm_to_403(os_path): - with atomic_writing(os_path, *args, **kwargs) as f: - yield f - - save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook') - def _save_script_changed(self): - self.log.warn(""" - `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks: - - ContentsManager.pre_save_hook - FileContentsManager.post_save_hook - - A post-save hook has been registered that calls: - - ipython nbconvert --to script [notebook] - - which behaves similarly to `--script`. - """) - - self.post_save_hook = _post_save_script - - post_save_hook = Any(None, config=True, - help="""Python callable or importstring thereof - - to be called on the path of a file just saved. - - This can be used to process the file on disk, - such as converting the notebook to a script or HTML via nbconvert. - - It will be called as (all arguments passed by keyword): - - hook(os_path=os_path, model=model, contents_manager=instance) - - path: the filesystem path to the file just written - model: the model representing the file - contents_manager: this ContentsManager instance - """ - ) - def _post_save_hook_changed(self, name, old, new): - if new and isinstance(new, string_types): - self.post_save_hook = import_item(self.post_save_hook) - elif new: - if not callable(new): - raise TraitError("post_save_hook must be callable") - - def run_post_save_hook(self, model, os_path): - """Run the post-save hook if defined, and log errors""" - if self.post_save_hook: - try: - self.log.debug("Running post-save hook on %s", os_path) - self.post_save_hook(os_path=os_path, model=model, contents_manager=self) - except Exception: - self.log.error("Post-save hook failed on %s", os_path, exc_info=True) - - def _root_dir_changed(self, name, old, new): - """Do a bit of validation of the root_dir.""" - if not os.path.isabs(new): - # If we receive a non-absolute path, make it absolute. - self.root_dir = os.path.abspath(new) - return - if not os.path.isdir(new): - raise TraitError("%r is not a directory" % new) - - checkpoint_dir = Unicode('.ipynb_checkpoints', config=True, - help="""The directory name in which to keep file checkpoints - - This is a path relative to the file's own directory. - - By default, it is .ipynb_checkpoints - """ - ) def _copy(self, src, dest): """copy src to dest @@ -183,26 +129,6 @@ class FileContentsManager(ContentsManager): """ return to_os_path(path, self.root_dir) - def dir_exists(self, path): - """Does the API-style path refer to an extant directory? - - API-style wrapper for os.path.isdir - - Parameters - ---------- - path : string - The path to check. This is an API path (`/` separated, - relative to root_dir). - - Returns - ------- - exists : bool - Whether the path is indeed a directory. - """ - path = path.strip('/') - os_path = self._get_os_path(path=path) - return os.path.isdir(os_path) - def is_hidden(self, path): """Does the API style path correspond to a hidden directory or file? @@ -240,6 +166,26 @@ class FileContentsManager(ContentsManager): os_path = self._get_os_path(path) return os.path.isfile(os_path) + def dir_exists(self, path): + """Does the API-style path refer to an extant directory? + + API-style wrapper for os.path.isdir + + Parameters + ---------- + path : string + The path to check. This is an API path (`/` separated, + relative to root_dir). + + Returns + ------- + exists : bool + Whether the path is indeed a directory. + """ + path = path.strip('/') + os_path = self._get_os_path(path=path) + return os.path.isdir(os_path) + def exists(self, path): """Returns True if the path exists, else returns False. @@ -259,6 +205,214 @@ class FileContentsManager(ContentsManager): os_path = self._get_os_path(path=path) return os.path.exists(os_path) + +class FileCheckpointManager(FileManagerMixin, CheckpointManager): + """ + A CheckpointManager that caches checkpoints for files in adjacent + directories. + """ + + checkpoint_dir = Unicode( + '.ipynb_checkpoints', + config=True, + help="""The directory name in which to keep file checkpoints + + This is a path relative to the file's own directory. + + By default, it is .ipynb_checkpoints + """, + ) + + @property + def root_dir(self): + try: + return self.parent.root_dir + except AttributeError: + return getcwd() + + # public checkpoint API + def create_checkpoint(self, path): + """Create a checkpoint from the current state of a file""" + path = path.strip('/') + if not self.file_exists(path): + raise web.HTTPError(404) + src_path = self._get_os_path(path) + # only the one checkpoint ID: + checkpoint_id = u"checkpoint" + cp_path = self.get_checkpoint_path(checkpoint_id, path) + self.log.debug("creating checkpoint for %s", path) + with self.perm_to_403(): + self._copy(src_path, cp_path) + + # return the checkpoint info + return self.get_checkpoint_model(checkpoint_id, path) + + def rename_checkpoint(self, checkpoint_id, old_path, new_path): + """Rename a checkpoint from old_path to new_path.""" + old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path) + new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path) + if os.path.isfile(old_cp_path): + self.log.debug( + "Renaming checkpoint %s -> %s", + old_cp_path, + new_cp_path, + ) + with self.perm_to_403(): + shutil.move(old_cp_path, new_cp_path) + + def list_checkpoints(self, path): + """list the checkpoints for a given file + + This contents manager currently only supports one checkpoint per file. + """ + path = path.strip('/') + checkpoint_id = "checkpoint" + os_path = self.get_checkpoint_path(checkpoint_id, path) + if not os.path.exists(os_path): + return [] + else: + return [self.get_checkpoint_model(checkpoint_id, path)] + + def restore_checkpoint(self, checkpoint_id, path): + """restore a file to a checkpointed state""" + path = path.strip('/') + self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) + nb_path = self._get_os_path(path) + cp_path = self.get_checkpoint_path(checkpoint_id, path) + if not os.path.isfile(cp_path): + self.log.debug("checkpoint file does not exist: %s", cp_path) + self.no_such_checkpoint(path, checkpoint_id) + + # ensure notebook is readable (never restore from unreadable notebook) + if cp_path.endswith('.ipynb'): + with self.open(cp_path, 'r', encoding='utf-8') as f: + nbformat.read(f, as_version=4) + self.log.debug("copying %s -> %s", cp_path, nb_path) + with self.perm_to_403(): + self._copy(cp_path, nb_path) + + def delete_checkpoint(self, checkpoint_id, path): + """delete a file's checkpoint""" + path = path.strip('/') + cp_path = self.get_checkpoint_path(checkpoint_id, path) + if not os.path.isfile(cp_path): + self.no_such_checkpoint(path, checkpoint_id) + + self.log.debug("unlinking %s", cp_path) + with self.perm_to_403(): + os.unlink(cp_path) + + # Checkpoint-related utilities + def get_checkpoint_path(self, checkpoint_id, path): + """find the path to a checkpoint""" + path = path.strip('/') + parent, name = ('/' + path).rsplit('/', 1) + parent = parent.strip('/') + basename, ext = os.path.splitext(name) + filename = u"{name}-{checkpoint_id}{ext}".format( + name=basename, + checkpoint_id=checkpoint_id, + ext=ext, + ) + os_path = self._get_os_path(path=parent) + cp_dir = os.path.join(os_path, self.checkpoint_dir) + with self.perm_to_403(): + ensure_dir_exists(cp_dir) + cp_path = os.path.join(cp_dir, filename) + return cp_path + + def get_checkpoint_model(self, checkpoint_id, path): + """construct the info dict for a given checkpoint""" + path = path.strip('/') + cp_path = self.get_checkpoint_path(checkpoint_id, path) + stats = os.stat(cp_path) + last_modified = tz.utcfromtimestamp(stats.st_mtime) + info = dict( + id=checkpoint_id, + last_modified=last_modified, + ) + return info + + # Error Handling + def no_such_checkpoint(self, path, checkpoint_id): + raise web.HTTPError( + 404, + u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id) + ) + + +class FileContentsManager(FileManagerMixin, ContentsManager): + + root_dir = Unicode(config=True) + + def _root_dir_default(self): + try: + return self.parent.notebook_dir + except AttributeError: + return getcwd() + + save_script = Bool(False, config=True, help='DEPRECATED, use post_save_hook') + def _save_script_changed(self): + self.log.warn(""" + `--script` is deprecated. You can trigger nbconvert via pre- or post-save hooks: + + ContentsManager.pre_save_hook + FileContentsManager.post_save_hook + + A post-save hook has been registered that calls: + + ipython nbconvert --to script [notebook] + + which behaves similarly to `--script`. + """) + + self.post_save_hook = _post_save_script + + post_save_hook = Any(None, config=True, + help="""Python callable or importstring thereof + + to be called on the path of a file just saved. + + This can be used to process the file on disk, + such as converting the notebook to a script or HTML via nbconvert. + + It will be called as (all arguments passed by keyword): + + hook(os_path=os_path, model=model, contents_manager=instance) + + path: the filesystem path to the file just written + model: the model representing the file + contents_manager: this ContentsManager instance + """ + ) + def _post_save_hook_changed(self, name, old, new): + if new and isinstance(new, string_types): + self.post_save_hook = import_item(self.post_save_hook) + elif new: + if not callable(new): + raise TraitError("post_save_hook must be callable") + + def run_post_save_hook(self, model, os_path): + """Run the post-save hook if defined, and log errors""" + if self.post_save_hook: + try: + self.log.debug("Running post-save hook on %s", os_path) + self.post_save_hook(os_path=os_path, model=model, contents_manager=self) + except Exception: + self.log.error("Post-save hook failed on %s", os_path, exc_info=True) + + def _root_dir_changed(self, name, old, new): + """Do a bit of validation of the root_dir.""" + if not os.path.isabs(new): + # If we receive a non-absolute path, make it absolute. + self.root_dir = os.path.abspath(new) + return + if not os.path.isdir(new): + raise TraitError("%r is not a directory" % new) + + def _checkpoint_manager_class_default(self): + return FileCheckpointManager + def _base_model(self, path): """Build the common base of a contents model""" os_path = self._get_os_path(path) @@ -478,9 +632,10 @@ class FileContentsManager(ContentsManager): self.run_pre_save_hook(model=model, path=path) + cp_mgr = self.checkpoint_manager # One checkpoint should always exist - if self.file_exists(path) and not self.list_checkpoints(path): - self.create_checkpoint(path) + if self.file_exists(path) and not cp_mgr.list_checkpoints(path): + cp_mgr.create_checkpoint(path) os_path = self._get_os_path(path) self.log.debug("Saving %s", os_path) @@ -512,28 +667,23 @@ class FileContentsManager(ContentsManager): return model - def delete(self, path): + def delete_file(self, path): """Delete file at path.""" path = path.strip('/') os_path = self._get_os_path(path) rm = os.unlink if os.path.isdir(os_path): listing = os.listdir(os_path) - # don't delete non-empty directories (checkpoints dir doesn't count) - if listing and listing != [self.checkpoint_dir]: - raise web.HTTPError(400, u'Directory %s not empty' % os_path) + # Don't delete non-empty directories. + # A directory containing only leftover checkpoints is + # considered empty. + cp_dir = getattr(self.checkpoint_manager, 'checkpoint_dir', None) + for entry in listing: + if entry != cp_dir: + raise web.HTTPError(400, u'Directory %s not empty' % os_path) elif not os.path.isfile(os_path): raise web.HTTPError(404, u'File does not exist: %s' % os_path) - # clear checkpoints - for checkpoint in self.list_checkpoints(path): - checkpoint_id = checkpoint['id'] - cp_path = self.get_checkpoint_path(checkpoint_id, path) - if os.path.isfile(cp_path): - self.log.debug("Unlinking checkpoint %s", cp_path) - with self.perm_to_403(): - rm(cp_path) - if os.path.isdir(os_path): self.log.debug("Removing directory %s", os_path) with self.perm_to_403(): @@ -543,7 +693,7 @@ class FileContentsManager(ContentsManager): with self.perm_to_403(): rm(os_path) - def rename(self, old_path, new_path): + def rename_file(self, old_path, new_path): """Rename a file.""" old_path = old_path.strip('/') new_path = new_path.strip('/') @@ -566,111 +716,6 @@ class FileContentsManager(ContentsManager): except Exception as e: raise web.HTTPError(500, u'Unknown error renaming file: %s %s' % (old_path, e)) - # Move the checkpoints - old_checkpoints = self.list_checkpoints(old_path) - for cp in old_checkpoints: - checkpoint_id = cp['id'] - old_cp_path = self.get_checkpoint_path(checkpoint_id, old_path) - new_cp_path = self.get_checkpoint_path(checkpoint_id, new_path) - if os.path.isfile(old_cp_path): - self.log.debug("Renaming checkpoint %s -> %s", old_cp_path, new_cp_path) - with self.perm_to_403(): - shutil.move(old_cp_path, new_cp_path) - - # Checkpoint-related utilities - - def get_checkpoint_path(self, checkpoint_id, path): - """find the path to a checkpoint""" - path = path.strip('/') - parent, name = ('/' + path).rsplit('/', 1) - parent = parent.strip('/') - basename, ext = os.path.splitext(name) - filename = u"{name}-{checkpoint_id}{ext}".format( - name=basename, - checkpoint_id=checkpoint_id, - ext=ext, - ) - os_path = self._get_os_path(path=parent) - cp_dir = os.path.join(os_path, self.checkpoint_dir) - with self.perm_to_403(): - ensure_dir_exists(cp_dir) - cp_path = os.path.join(cp_dir, filename) - return cp_path - - def get_checkpoint_model(self, checkpoint_id, path): - """construct the info dict for a given checkpoint""" - path = path.strip('/') - cp_path = self.get_checkpoint_path(checkpoint_id, path) - stats = os.stat(cp_path) - last_modified = tz.utcfromtimestamp(stats.st_mtime) - info = dict( - id = checkpoint_id, - last_modified = last_modified, - ) - return info - - # public checkpoint API - - def create_checkpoint(self, path): - """Create a checkpoint from the current state of a file""" - path = path.strip('/') - if not self.file_exists(path): - raise web.HTTPError(404) - src_path = self._get_os_path(path) - # only the one checkpoint ID: - checkpoint_id = u"checkpoint" - cp_path = self.get_checkpoint_path(checkpoint_id, path) - self.log.debug("creating checkpoint for %s", path) - with self.perm_to_403(): - self._copy(src_path, cp_path) - - # return the checkpoint info - return self.get_checkpoint_model(checkpoint_id, path) - - def list_checkpoints(self, path): - """list the checkpoints for a given file - - This contents manager currently only supports one checkpoint per file. - """ - path = path.strip('/') - checkpoint_id = "checkpoint" - os_path = self.get_checkpoint_path(checkpoint_id, path) - if not os.path.exists(os_path): - return [] - else: - return [self.get_checkpoint_model(checkpoint_id, path)] - - - def restore_checkpoint(self, checkpoint_id, path): - """restore a file to a checkpointed state""" - path = path.strip('/') - self.log.info("restoring %s from checkpoint %s", path, checkpoint_id) - nb_path = self._get_os_path(path) - cp_path = self.get_checkpoint_path(checkpoint_id, path) - if not os.path.isfile(cp_path): - self.log.debug("checkpoint file does not exist: %s", cp_path) - raise web.HTTPError(404, - u'checkpoint does not exist: %s@%s' % (path, checkpoint_id) - ) - # ensure notebook is readable (never restore from an unreadable notebook) - if cp_path.endswith('.ipynb'): - with self.open(cp_path, 'r', encoding='utf-8') as f: - nbformat.read(f, as_version=4) - self.log.debug("copying %s -> %s", cp_path, nb_path) - with self.perm_to_403(): - self._copy(cp_path, nb_path) - - def delete_checkpoint(self, checkpoint_id, path): - """delete a file's checkpoint""" - path = path.strip('/') - cp_path = self.get_checkpoint_path(checkpoint_id, path) - if not os.path.isfile(cp_path): - raise web.HTTPError(404, - u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id) - ) - self.log.debug("unlinking %s", cp_path) - os.unlink(cp_path) - def info_string(self): return "Serving notebooks from local directory: %s" % self.root_dir diff --git a/IPython/html/services/contents/manager.py b/IPython/html/services/contents/manager.py index d876c0944..5ec49ee37 100644 --- a/IPython/html/services/contents/manager.py +++ b/IPython/html/services/contents/manager.py @@ -15,11 +15,60 @@ from IPython.config.configurable import LoggingConfigurable from IPython.nbformat import sign, validate, ValidationError from IPython.nbformat.v4 import new_notebook from IPython.utils.importstring import import_item -from IPython.utils.traitlets import Instance, Unicode, List, Any, TraitError +from IPython.utils.traitlets import ( + Any, + Dict, + Instance, + List, + TraitError, + Type, + Unicode, +) from IPython.utils.py3compat import string_types copy_pat = re.compile(r'\-Copy\d*\.') + +class CheckpointManager(LoggingConfigurable): + """ + Base class for managing checkpoints for a ContentsManager. + """ + + def create_checkpoint(self, path): + """Create a checkpoint of the current state of a file + + Returns a checkpoint_id for the new checkpoint. + """ + raise NotImplementedError("must be implemented in a subclass") + + def rename_checkpoint(self, checkpoint_id, old_path, new_path): + """Rename a checkpoint from old_path to new_path.""" + raise NotImplementedError("must be implemented in a subclass") + + def delete_checkpoint(self, checkpoint_id, path): + """delete a checkpoint for a file""" + raise NotImplementedError("must be implemented in a subclass") + + def restore_checkpoint(self, checkpoint_id, path): + """Restore a file from one of its checkpoints""" + raise NotImplementedError("must be implemented in a subclass") + + def list_checkpoints(self, path): + """Return a list of checkpoints for a given file""" + raise NotImplementedError("must be implemented in a subclass") + + def rename_all_checkpoints(self, old_path, new_path): + """Rename all checkpoints for old_path to new_path.""" + old_checkpoints = self.list_checkpoints(old_path) + for cp in old_checkpoints: + self.rename_checkpoint(cp['id'], old_path, new_path) + + def delete_all_checkpoints(self, path): + """Delete all checkpoints for the given path.""" + for checkpoint in self.list_checkpoints(path): + self.delete_checkpoint(checkpoint['id'], path) + + class ContentsManager(LoggingConfigurable): """Base class for serving files and directories. @@ -97,6 +146,19 @@ class ContentsManager(LoggingConfigurable): except Exception: self.log.error("Pre-save hook failed on %s", path, exc_info=True) + checkpoint_manager_class = Type(CheckpointManager, config=True) + checkpoint_manager = Instance(CheckpointManager) + checkpoint_manager_kwargs = Dict(allow_none=False) + + def _checkpoint_manager_default(self): + return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs) + + def _checkpoint_manager_kwargs_default(self): + return dict( + parent=self, + log=self.log, + ) + # ContentsManager API part 1: methods that must be # implemented in subclasses. @@ -186,32 +248,27 @@ class ContentsManager(LoggingConfigurable): """ raise NotImplementedError('must be implemented in a subclass') - def delete(self, path): + def delete_file(self, path): """Delete file or directory by path.""" raise NotImplementedError('must be implemented in a subclass') - def create_checkpoint(self, path): - """Create a checkpoint of the current state of a file - - Returns a checkpoint_id for the new checkpoint. - """ - raise NotImplementedError("must be implemented in a subclass") - - def list_checkpoints(self, path): - """Return a list of checkpoints for a given file""" - return [] - - def restore_checkpoint(self, checkpoint_id, path): - """Restore a file from one of its checkpoints""" - raise NotImplementedError("must be implemented in a subclass") - - def delete_checkpoint(self, checkpoint_id, path): - """delete a checkpoint for a file""" - raise NotImplementedError("must be implemented in a subclass") + def rename_file(self, old_path, new_path): + """Rename a file.""" + raise NotImplementedError('must be implemented in a subclass') # ContentsManager API part 2: methods that have useable default # implementations, but can be overridden in subclasses. + def delete(self, path): + """Delete a file/directory and any associated checkpoints.""" + self.delete_file(path) + self.checkpoint_manager.delete_all_checkpoints(path) + + def rename(self, old_path, new_path): + """Rename a file and any checkpoints associated with that file.""" + self.rename_file(old_path, new_path) + self.checkpoint_manager.rename_all_checkpoints(old_path, new_path) + def update(self, model, path): """Update the file's path @@ -431,3 +488,24 @@ class ContentsManager(LoggingConfigurable): def should_list(self, name): """Should this file/directory name be displayed in a listing?""" return not any(fnmatch(name, glob) for glob in self.hide_globs) + + # Part 3: Checkpoints API + # By default, all methods are forwarded to our CheckpointManager instance. + def create_checkpoint(self, path): + return self.checkpoint_manager.create_checkpoint(path) + + def rename_checkpoint(self, checkpoint_id, old_path, new_path): + return self.checkpoint_manager.rename_checkpoint( + checkpoint_id, + old_path, + new_path, + ) + + def list_checkpoints(self, path): + return self.checkpoint_manager.list_checkpoints(path) + + def restore_checkpoint(self, checkpoint_id, path): + return self.checkpoint_manager.restore_checkpoint(checkpoint_id, path) + + def delete_checkpoint(self, checkpoint_id, path): + return self.checkpoint_manager.delete_checkpoint(checkpoint_id, path) diff --git a/IPython/html/services/contents/tests/test_manager.py b/IPython/html/services/contents/tests/test_manager.py index f531742f5..2a81e622e 100644 --- a/IPython/html/services/contents/tests/test_manager.py +++ b/IPython/html/services/contents/tests/test_manager.py @@ -84,11 +84,16 @@ class TestFileContentsManager(TestCase): root = td os.mkdir(os.path.join(td, subd)) fm = FileContentsManager(root_dir=root) - cp_dir = fm.get_checkpoint_path('cp', 'test.ipynb') - cp_subdir = fm.get_checkpoint_path('cp', '/%s/test.ipynb' % subd) + cpm = fm.checkpoint_manager + cp_dir = cpm.get_checkpoint_path( + 'cp', 'test.ipynb' + ) + cp_subdir = cpm.get_checkpoint_path( + 'cp', '/%s/test.ipynb' % subd + ) self.assertNotEqual(cp_dir, cp_subdir) - self.assertEqual(cp_dir, os.path.join(root, fm.checkpoint_dir, cp_name)) - self.assertEqual(cp_subdir, os.path.join(root, subd, fm.checkpoint_dir, cp_name)) + self.assertEqual(cp_dir, os.path.join(root, cpm.checkpoint_dir, cp_name)) + self.assertEqual(cp_subdir, os.path.join(root, subd, cpm.checkpoint_dir, cp_name)) @dec.skip_win32 def test_bad_symlink(self):