@ -4,24 +4,31 @@
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
import errno
import io
import os
import shutil
from contextlib import contextmanager
import mimetypes
from tornado import web
from . manager import ContentsManager
from . manager import (
CheckpointManager ,
ContentsManager ,
)
from IPython import nbformat
from IPython . utils . io import atomic_writing
from IPython . utils . importstring import import_item
from IPython . utils . path import ensure_dir_exists
from IPython . utils . traitlets import Any , Unicode , Bool , TraitError
from IPython . utils . py3compat import getcwd , str _to_unicode, string_types
from IPython . utils . py3compat import getcwd , str ing_types, str_to_unicode
from IPython . utils import tz
from IPython . html . utils import is_hidden , to_os_path , to_api_path
from IPython . html . utils import (
is_hidden ,
to_api_path ,
to_os_path ,
)
_script_exporter = None
@ -48,19 +55,40 @@ def _post_save_script(model, os_path, contents_manager, **kwargs):
with io . open ( script_fname , ' w ' , encoding = ' utf-8 ' ) as f :
f . write ( script )
class FileContentsManager ( ContentsManager ) :
root_dir = Unicode ( config = True )
class FileManagerMixin ( object ) :
"""
Mixin for ContentsAPI classes that interact with the filesystem .
Shared by both FileContentsManager and FileCheckpointManager .
Note
- - - -
Classes using this mixin must provide the following attributes :
root_dir : unicode
A directory against against which API - style paths are to be resolved .
log : logging . Logger
"""
@contextmanager
def open ( self , os_path , * args , * * kwargs ) :
""" wrapper around io.open that turns permission errors into 403 """
with self . perm_to_403 ( os_path ) :
with io . open ( os_path , * args , * * kwargs ) as f :
yield f
@contextmanager
def atomic_writing ( self , os_path , * args , * * kwargs ) :
""" wrapper around atomic_writing that turns permission errors into 403 """
with self . perm_to_403 ( os_path ) :
with atomic_writing ( os_path , * args , * * kwargs ) as f :
yield f
def _root_dir_default ( self ) :
try :
return self . parent . notebook_dir
except AttributeError :
return getcwd ( )
@contextmanager
def perm_to_403 ( self , os_path = ' ' ) :
""" context manager for turning permission errors into 403 """
""" context manager for turning permission errors into 403. """
try :
yield
except OSError as e :
@ -70,92 +98,10 @@ class FileContentsManager(ContentsManager):
# but nobody should be doing that anyway.
if not os_path :
os_path = str_to_unicode ( e . filename or ' unknown file ' )
path = to_api_path ( os_path , self . root_dir )
path = to_api_path ( os_path , root = self . root_dir )
raise web . HTTPError ( 403 , u ' Permission denied: %s ' % path )
else :
raise
@contextmanager
def open ( self , os_path , * args , * * kwargs ) :
""" wrapper around io.open that turns permission errors into 403 """
with self . perm_to_403 ( os_path ) :
with io . open ( os_path , * args , * * kwargs ) as f :
yield f
@contextmanager
def atomic_writing ( self , os_path , * args , * * kwargs ) :
""" wrapper around atomic_writing that turns permission errors into 403 """
with self . perm_to_403 ( os_path ) :
with atomic_writing ( os_path , * args , * * kwargs ) as f :
yield f
save_script = Bool ( False , config = True , help = ' DEPRECATED, use post_save_hook ' )
def _save_script_changed ( self ) :
self . log . warn ( """
` - - script ` is deprecated . You can trigger nbconvert via pre - or post - save hooks :
ContentsManager . pre_save_hook
FileContentsManager . post_save_hook
A post - save hook has been registered that calls :
ipython nbconvert - - to script [ notebook ]
which behaves similarly to ` - - script ` .
""" )
self . post_save_hook = _post_save_script
post_save_hook = Any ( None , config = True ,
help = """ Python callable or importstring thereof
to be called on the path of a file just saved .
This can be used to process the file on disk ,
such as converting the notebook to a script or HTML via nbconvert .
It will be called as ( all arguments passed by keyword ) :
hook ( os_path = os_path , model = model , contents_manager = instance )
path : the filesystem path to the file just written
model : the model representing the file
contents_manager : this ContentsManager instance
"""
)
def _post_save_hook_changed ( self , name , old , new ) :
if new and isinstance ( new , string_types ) :
self . post_save_hook = import_item ( self . post_save_hook )
elif new :
if not callable ( new ) :
raise TraitError ( " post_save_hook must be callable " )
def run_post_save_hook ( self , model , os_path ) :
""" Run the post-save hook if defined, and log errors """
if self . post_save_hook :
try :
self . log . debug ( " Running post-save hook on %s " , os_path )
self . post_save_hook ( os_path = os_path , model = model , contents_manager = self )
except Exception :
self . log . error ( " Post-save hook failed on %s " , os_path , exc_info = True )
def _root_dir_changed ( self , name , old , new ) :
""" Do a bit of validation of the root_dir. """
if not os . path . isabs ( new ) :
# If we receive a non-absolute path, make it absolute.
self . root_dir = os . path . abspath ( new )
return
if not os . path . isdir ( new ) :
raise TraitError ( " %r is not a directory " % new )
checkpoint_dir = Unicode ( ' .ipynb_checkpoints ' , config = True ,
help = """ The directory name in which to keep file checkpoints
This is a path relative to the file ' s own directory.
By default , it is . ipynb_checkpoints
"""
)
def _copy ( self , src , dest ) :
""" copy src to dest
@ -183,26 +129,6 @@ class FileContentsManager(ContentsManager):
"""
return to_os_path ( path , self . root_dir )
def dir_exists ( self , path ) :
""" Does the API-style path refer to an extant directory?
API - style wrapper for os . path . isdir
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
exists : bool
Whether the path is indeed a directory .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . isdir ( os_path )
def is_hidden ( self , path ) :
""" Does the API style path correspond to a hidden directory or file?
@ -240,6 +166,26 @@ class FileContentsManager(ContentsManager):
os_path = self . _get_os_path ( path )
return os . path . isfile ( os_path )
def dir_exists ( self , path ) :
""" Does the API-style path refer to an extant directory?
API - style wrapper for os . path . isdir
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
exists : bool
Whether the path is indeed a directory .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . isdir ( os_path )
def exists ( self , path ) :
""" Returns True if the path exists, else returns False.
@ -259,6 +205,214 @@ class FileContentsManager(ContentsManager):
os_path = self . _get_os_path ( path = path )
return os . path . exists ( os_path )
class FileCheckpointManager ( FileManagerMixin , CheckpointManager ) :
"""
A CheckpointManager that caches checkpoints for files in adjacent
directories .
"""
checkpoint_dir = Unicode (
' .ipynb_checkpoints ' ,
config = True ,
help = """ The directory name in which to keep file checkpoints
This is a path relative to the file ' s own directory.
By default , it is . ipynb_checkpoints
""" ,
)
@property
def root_dir ( self ) :
try :
return self . parent . root_dir
except AttributeError :
return getcwd ( )
# public checkpoint API
def create_checkpoint ( self , path ) :
""" Create a checkpoint from the current state of a file """
path = path . strip ( ' / ' )
if not self . file_exists ( path ) :
raise web . HTTPError ( 404 )
src_path = self . _get_os_path ( path )
# only the one checkpoint ID:
checkpoint_id = u " checkpoint "
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
self . log . debug ( " creating checkpoint for %s " , path )
with self . perm_to_403 ( ) :
self . _copy ( src_path , cp_path )
# return the checkpoint info
return self . get_checkpoint_model ( checkpoint_id , path )
def rename_checkpoint ( self , checkpoint_id , old_path , new_path ) :
""" Rename a checkpoint from old_path to new_path. """
old_cp_path = self . get_checkpoint_path ( checkpoint_id , old_path )
new_cp_path = self . get_checkpoint_path ( checkpoint_id , new_path )
if os . path . isfile ( old_cp_path ) :
self . log . debug (
" Renaming checkpoint %s -> %s " ,
old_cp_path ,
new_cp_path ,
)
with self . perm_to_403 ( ) :
shutil . move ( old_cp_path , new_cp_path )
def list_checkpoints ( self , path ) :
""" list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file .
"""
path = path . strip ( ' / ' )
checkpoint_id = " checkpoint "
os_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . exists ( os_path ) :
return [ ]
else :
return [ self . get_checkpoint_model ( checkpoint_id , path ) ]
def restore_checkpoint ( self , checkpoint_id , path ) :
""" restore a file to a checkpointed state """
path = path . strip ( ' / ' )
self . log . info ( " restoring %s from checkpoint %s " , path , checkpoint_id )
nb_path = self . _get_os_path ( path )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . log . debug ( " checkpoint file does not exist: %s " , cp_path )
self . no_such_checkpoint ( path , checkpoint_id )
# ensure notebook is readable (never restore from unreadable notebook)
if cp_path . endswith ( ' .ipynb ' ) :
with self . open ( cp_path , ' r ' , encoding = ' utf-8 ' ) as f :
nbformat . read ( f , as_version = 4 )
self . log . debug ( " copying %s -> %s " , cp_path , nb_path )
with self . perm_to_403 ( ) :
self . _copy ( cp_path , nb_path )
def delete_checkpoint ( self , checkpoint_id , path ) :
""" delete a file ' s checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . no_such_checkpoint ( path , checkpoint_id )
self . log . debug ( " unlinking %s " , cp_path )
with self . perm_to_403 ( ) :
os . unlink ( cp_path )
# Checkpoint-related utilities
def get_checkpoint_path ( self , checkpoint_id , path ) :
""" find the path to a checkpoint """
path = path . strip ( ' / ' )
parent , name = ( ' / ' + path ) . rsplit ( ' / ' , 1 )
parent = parent . strip ( ' / ' )
basename , ext = os . path . splitext ( name )
filename = u " {name} - {checkpoint_id} {ext} " . format (
name = basename ,
checkpoint_id = checkpoint_id ,
ext = ext ,
)
os_path = self . _get_os_path ( path = parent )
cp_dir = os . path . join ( os_path , self . checkpoint_dir )
with self . perm_to_403 ( ) :
ensure_dir_exists ( cp_dir )
cp_path = os . path . join ( cp_dir , filename )
return cp_path
def get_checkpoint_model ( self , checkpoint_id , path ) :
""" construct the info dict for a given checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
stats = os . stat ( cp_path )
last_modified = tz . utcfromtimestamp ( stats . st_mtime )
info = dict (
id = checkpoint_id ,
last_modified = last_modified ,
)
return info
# Error Handling
def no_such_checkpoint ( self , path , checkpoint_id ) :
raise web . HTTPError (
404 ,
u ' Checkpoint does not exist: %s @ %s ' % ( path , checkpoint_id )
)
class FileContentsManager ( FileManagerMixin , ContentsManager ) :
root_dir = Unicode ( config = True )
def _root_dir_default ( self ) :
try :
return self . parent . notebook_dir
except AttributeError :
return getcwd ( )
save_script = Bool ( False , config = True , help = ' DEPRECATED, use post_save_hook ' )
def _save_script_changed ( self ) :
self . log . warn ( """
` - - script ` is deprecated . You can trigger nbconvert via pre - or post - save hooks :
ContentsManager . pre_save_hook
FileContentsManager . post_save_hook
A post - save hook has been registered that calls :
ipython nbconvert - - to script [ notebook ]
which behaves similarly to ` - - script ` .
""" )
self . post_save_hook = _post_save_script
post_save_hook = Any ( None , config = True ,
help = """ Python callable or importstring thereof
to be called on the path of a file just saved .
This can be used to process the file on disk ,
such as converting the notebook to a script or HTML via nbconvert .
It will be called as ( all arguments passed by keyword ) :
hook ( os_path = os_path , model = model , contents_manager = instance )
path : the filesystem path to the file just written
model : the model representing the file
contents_manager : this ContentsManager instance
"""
)
def _post_save_hook_changed ( self , name , old , new ) :
if new and isinstance ( new , string_types ) :
self . post_save_hook = import_item ( self . post_save_hook )
elif new :
if not callable ( new ) :
raise TraitError ( " post_save_hook must be callable " )
def run_post_save_hook ( self , model , os_path ) :
""" Run the post-save hook if defined, and log errors """
if self . post_save_hook :
try :
self . log . debug ( " Running post-save hook on %s " , os_path )
self . post_save_hook ( os_path = os_path , model = model , contents_manager = self )
except Exception :
self . log . error ( " Post-save hook failed on %s " , os_path , exc_info = True )
def _root_dir_changed ( self , name , old , new ) :
""" Do a bit of validation of the root_dir. """
if not os . path . isabs ( new ) :
# If we receive a non-absolute path, make it absolute.
self . root_dir = os . path . abspath ( new )
return
if not os . path . isdir ( new ) :
raise TraitError ( " %r is not a directory " % new )
def _checkpoint_manager_class_default ( self ) :
return FileCheckpointManager
def _base_model ( self , path ) :
""" Build the common base of a contents model """
os_path = self . _get_os_path ( path )
@ -478,9 +632,10 @@ class FileContentsManager(ContentsManager):
self . run_pre_save_hook ( model = model , path = path )
cp_mgr = self . checkpoint_manager
# One checkpoint should always exist
if self . file_exists ( path ) and not self . list_checkpoints ( path ) :
self . create_checkpoint ( path )
if self . file_exists ( path ) and not cp_mgr . list_checkpoints ( path ) :
cp_mgr . create_checkpoint ( path )
os_path = self . _get_os_path ( path )
self . log . debug ( " Saving %s " , os_path )
@ -512,28 +667,23 @@ class FileContentsManager(ContentsManager):
return model
def delete ( self , path ) :
def delete _file ( self , path ) :
""" Delete file at path. """
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path )
rm = os . unlink
if os . path . isdir ( os_path ) :
listing = os . listdir ( os_path )
# don't delete non-empty directories (checkpoints dir doesn't count)
if listing and listing != [ self . checkpoint_dir ] :
raise web . HTTPError ( 400 , u ' Directory %s not empty ' % os_path )
# Don't delete non-empty directories.
# A directory containing only leftover checkpoints is
# considered empty.
cp_dir = getattr ( self . checkpoint_manager , ' checkpoint_dir ' , None )
for entry in listing :
if entry != cp_dir :
raise web . HTTPError ( 400 , u ' Directory %s not empty ' % os_path )
elif not os . path . isfile ( os_path ) :
raise web . HTTPError ( 404 , u ' File does not exist: %s ' % os_path )
# clear checkpoints
for checkpoint in self . list_checkpoints ( path ) :
checkpoint_id = checkpoint [ ' id ' ]
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if os . path . isfile ( cp_path ) :
self . log . debug ( " Unlinking checkpoint %s " , cp_path )
with self . perm_to_403 ( ) :
rm ( cp_path )
if os . path . isdir ( os_path ) :
self . log . debug ( " Removing directory %s " , os_path )
with self . perm_to_403 ( ) :
@ -543,7 +693,7 @@ class FileContentsManager(ContentsManager):
with self . perm_to_403 ( ) :
rm ( os_path )
def rename ( self , old_path , new_path ) :
def rename _file ( self , old_path , new_path ) :
""" Rename a file. """
old_path = old_path . strip ( ' / ' )
new_path = new_path . strip ( ' / ' )
@ -566,111 +716,6 @@ class FileContentsManager(ContentsManager):
except Exception as e :
raise web . HTTPError ( 500 , u ' Unknown error renaming file: %s %s ' % ( old_path , e ) )
# Move the checkpoints
old_checkpoints = self . list_checkpoints ( old_path )
for cp in old_checkpoints :
checkpoint_id = cp [ ' id ' ]
old_cp_path = self . get_checkpoint_path ( checkpoint_id , old_path )
new_cp_path = self . get_checkpoint_path ( checkpoint_id , new_path )
if os . path . isfile ( old_cp_path ) :
self . log . debug ( " Renaming checkpoint %s -> %s " , old_cp_path , new_cp_path )
with self . perm_to_403 ( ) :
shutil . move ( old_cp_path , new_cp_path )
# Checkpoint-related utilities
def get_checkpoint_path ( self , checkpoint_id , path ) :
""" find the path to a checkpoint """
path = path . strip ( ' / ' )
parent , name = ( ' / ' + path ) . rsplit ( ' / ' , 1 )
parent = parent . strip ( ' / ' )
basename , ext = os . path . splitext ( name )
filename = u " {name} - {checkpoint_id} {ext} " . format (
name = basename ,
checkpoint_id = checkpoint_id ,
ext = ext ,
)
os_path = self . _get_os_path ( path = parent )
cp_dir = os . path . join ( os_path , self . checkpoint_dir )
with self . perm_to_403 ( ) :
ensure_dir_exists ( cp_dir )
cp_path = os . path . join ( cp_dir , filename )
return cp_path
def get_checkpoint_model ( self , checkpoint_id , path ) :
""" construct the info dict for a given checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
stats = os . stat ( cp_path )
last_modified = tz . utcfromtimestamp ( stats . st_mtime )
info = dict (
id = checkpoint_id ,
last_modified = last_modified ,
)
return info
# public checkpoint API
def create_checkpoint ( self , path ) :
""" Create a checkpoint from the current state of a file """
path = path . strip ( ' / ' )
if not self . file_exists ( path ) :
raise web . HTTPError ( 404 )
src_path = self . _get_os_path ( path )
# only the one checkpoint ID:
checkpoint_id = u " checkpoint "
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
self . log . debug ( " creating checkpoint for %s " , path )
with self . perm_to_403 ( ) :
self . _copy ( src_path , cp_path )
# return the checkpoint info
return self . get_checkpoint_model ( checkpoint_id , path )
def list_checkpoints ( self , path ) :
""" list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file .
"""
path = path . strip ( ' / ' )
checkpoint_id = " checkpoint "
os_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . exists ( os_path ) :
return [ ]
else :
return [ self . get_checkpoint_model ( checkpoint_id , path ) ]
def restore_checkpoint ( self , checkpoint_id , path ) :
""" restore a file to a checkpointed state """
path = path . strip ( ' / ' )
self . log . info ( " restoring %s from checkpoint %s " , path , checkpoint_id )
nb_path = self . _get_os_path ( path )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . log . debug ( " checkpoint file does not exist: %s " , cp_path )
raise web . HTTPError ( 404 ,
u ' checkpoint does not exist: %s @ %s ' % ( path , checkpoint_id )
)
# ensure notebook is readable (never restore from an unreadable notebook)
if cp_path . endswith ( ' .ipynb ' ) :
with self . open ( cp_path , ' r ' , encoding = ' utf-8 ' ) as f :
nbformat . read ( f , as_version = 4 )
self . log . debug ( " copying %s -> %s " , cp_path , nb_path )
with self . perm_to_403 ( ) :
self . _copy ( cp_path , nb_path )
def delete_checkpoint ( self , checkpoint_id , path ) :
""" delete a file ' s checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
raise web . HTTPError ( 404 ,
u ' Checkpoint does not exist: %s @ %s ' % ( path , checkpoint_id )
)
self . log . debug ( " unlinking %s " , cp_path )
os . unlink ( cp_path )
def info_string ( self ) :
return " Serving notebooks from local directory: %s " % self . root_dir