@ -111,9 +111,20 @@ class FileManagerMixin(object):
shutil . copyfile ( src , dest )
try :
shutil . copystat ( src , dest )
except OSError as e :
except OSError :
self . log . debug ( " copystat on %s failed " , dest , exc_info = True )
def _read_notebook ( self , os_path , as_version = 4 ) :
""" Read a notebook from an os path. """
with self . open ( os_path , ' r ' , encoding = ' utf-8 ' ) as f :
try :
return nbformat . read ( f , as_version = as_version )
except Exception as e :
raise web . HTTPError (
400 ,
u " Unreadable Notebook: %s %r " % ( os_path , e ) ,
)
def _get_os_path ( self , path ) :
""" Given an API path, return its file system path.
@ -129,82 +140,6 @@ class FileManagerMixin(object):
"""
return to_os_path ( path , self . root_dir )
def is_hidden ( self , path ) :
""" Does the API style path correspond to a hidden directory or file?
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
hidden : bool
Whether the path exists and is hidden .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return is_hidden ( os_path , self . root_dir )
def file_exists ( self , path ) :
""" Returns True if the file exists, else returns False.
API - style wrapper for os . path . isfile
Parameters
- - - - - - - - - -
path : string
The relative path to the file ( with ' / ' as separator )
Returns
- - - - - - -
exists : bool
Whether the file exists .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path )
return os . path . isfile ( os_path )
def dir_exists ( self , path ) :
""" Does the API-style path refer to an extant directory?
API - style wrapper for os . path . isdir
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
exists : bool
Whether the path is indeed a directory .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . isdir ( os_path )
def exists ( self , path ) :
""" Returns True if the path exists, else returns False.
API - style wrapper for os . path . exists
Parameters
- - - - - - - - - -
path : string
The API path to the file ( with ' / ' as separator )
Returns
- - - - - - -
exists : bool
Whether the target exists .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . exists ( os_path )
class FileCheckpointManager ( FileManagerMixin , CheckpointManager ) :
"""
@ -223,30 +158,44 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
""" ,
)
@property
def root_dir ( self ) :
root_dir = Unicode ( config = True )
def _root_dir_default ( self ) :
try :
return self . parent . root_dir
except AttributeError :
return getcwd ( )
# public checkpoint API
def create_checkpoint ( self , path) :
""" Create a checkpoint from the current state of a file """
def create_checkpoint ( self , nb, path) :
""" Create a checkpoint from the current content of a notebook. """
path = path . strip ( ' / ' )
if not self . file_exists ( path ) :
raise web . HTTPError ( 404 )
src_path = self . _get_os_path ( path )
# only the one checkpoint ID:
checkpoint_id = u " checkpoint "
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
os_checkpoint_path = self . get_checkpoint_path ( checkpoint_id , path )
self . log . debug ( " creating checkpoint for %s " , path )
with self . perm_to_403 ( ) :
self . _ copy( src_path , cp_path )
self . _ save_notebook( os_checkpoint_path , nb )
# return the checkpoint info
return self . get_checkpoint_model ( checkpoint_id , path )
def get_checkpoint_content ( self , checkpoint_id , path ) :
""" Get the content of a checkpoint.
Returns an unvalidated model with the same structure as
the return value of ContentsManager . get
"""
path = path . strip ( ' / ' )
self . log . info ( " restoring %s from checkpoint %s " , path , checkpoint_id )
os_checkpoint_path = self . get_checkpoint_path ( checkpoint_id , path )
return self . _read_notebook ( os_checkpoint_path , as_version = 4 )
def _save_notebook ( self , os_path , nb ) :
""" Save a notebook file. """
with self . atomic_writing ( os_path , encoding = ' utf-8 ' ) as f :
nbformat . write ( nb , f , version = nbformat . NO_CONVERT )
def rename_checkpoint ( self , checkpoint_id , old_path , new_path ) :
""" Rename a checkpoint from old_path to new_path. """
old_cp_path = self . get_checkpoint_path ( checkpoint_id , old_path )
@ -260,6 +209,17 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
with self . perm_to_403 ( ) :
shutil . move ( old_cp_path , new_cp_path )
def delete_checkpoint ( self , checkpoint_id , path ) :
""" delete a file ' s checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . no_such_checkpoint ( path , checkpoint_id )
self . log . debug ( " unlinking %s " , cp_path )
with self . perm_to_403 ( ) :
os . unlink ( cp_path )
def list_checkpoints ( self , path ) :
""" list the checkpoints for a given file
@ -273,35 +233,6 @@ class FileCheckpointManager(FileManagerMixin, CheckpointManager):
else :
return [ self . get_checkpoint_model ( checkpoint_id , path ) ]
def restore_checkpoint ( self , checkpoint_id , path ) :
""" restore a file to a checkpointed state """
path = path . strip ( ' / ' )
self . log . info ( " restoring %s from checkpoint %s " , path , checkpoint_id )
nb_path = self . _get_os_path ( path )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . log . debug ( " checkpoint file does not exist: %s " , cp_path )
self . no_such_checkpoint ( path , checkpoint_id )
# ensure notebook is readable (never restore from unreadable notebook)
if cp_path . endswith ( ' .ipynb ' ) :
with self . open ( cp_path , ' r ' , encoding = ' utf-8 ' ) as f :
nbformat . read ( f , as_version = 4 )
self . log . debug ( " copying %s -> %s " , cp_path , nb_path )
with self . perm_to_403 ( ) :
self . _copy ( cp_path , nb_path )
def delete_checkpoint ( self , checkpoint_id , path ) :
""" delete a file ' s checkpoint """
path = path . strip ( ' / ' )
cp_path = self . get_checkpoint_path ( checkpoint_id , path )
if not os . path . isfile ( cp_path ) :
self . no_such_checkpoint ( path , checkpoint_id )
self . log . debug ( " unlinking %s " , cp_path )
with self . perm_to_403 ( ) :
os . unlink ( cp_path )
# Checkpoint-related utilities
def get_checkpoint_path ( self , checkpoint_id , path ) :
""" find the path to a checkpoint """
@ -413,6 +344,82 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
def _checkpoint_manager_class_default ( self ) :
return FileCheckpointManager
def is_hidden ( self , path ) :
""" Does the API style path correspond to a hidden directory or file?
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
hidden : bool
Whether the path exists and is hidden .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return is_hidden ( os_path , self . root_dir )
def file_exists ( self , path ) :
""" Returns True if the file exists, else returns False.
API - style wrapper for os . path . isfile
Parameters
- - - - - - - - - -
path : string
The relative path to the file ( with ' / ' as separator )
Returns
- - - - - - -
exists : bool
Whether the file exists .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path )
return os . path . isfile ( os_path )
def dir_exists ( self , path ) :
""" Does the API-style path refer to an extant directory?
API - style wrapper for os . path . isdir
Parameters
- - - - - - - - - -
path : string
The path to check . This is an API path ( ` / ` separated ,
relative to root_dir ) .
Returns
- - - - - - -
exists : bool
Whether the path is indeed a directory .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . isdir ( os_path )
def exists ( self , path ) :
""" Returns True if the path exists, else returns False.
API - style wrapper for os . path . exists
Parameters
- - - - - - - - - -
path : string
The API path to the file ( with ' / ' as separator )
Returns
- - - - - - -
exists : bool
Whether the target exists .
"""
path = path . strip ( ' / ' )
os_path = self . _get_os_path ( path = path )
return os . path . exists ( os_path )
def _base_model ( self , path ) :
""" Build the common base of a contents model """
os_path = self . _get_os_path ( path )
@ -518,7 +525,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
return model
def _notebook_model ( self , path , content = True ) :
""" Build a notebook model
@ -529,11 +535,7 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
model [ ' type ' ] = ' notebook '
if content :
os_path = self . _get_os_path ( path )
with self . open ( os_path , ' r ' , encoding = ' utf-8 ' ) as f :
try :
nb = nbformat . read ( f , as_version = 4 )
except Exception as e :
raise web . HTTPError ( 400 , u " Unreadable Notebook: %s %r " % ( os_path , e ) )
nb = self . _read_notebook ( os_path , as_version = 4 )
self . mark_trusted_cells ( nb , path )
model [ ' content ' ] = nb
model [ ' format ' ] = ' json '
@ -582,13 +584,15 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
model = self . _file_model ( path , content = content , format = format )
return model
def _save_notebook ( self , os_path , model , path = ' ' ) :
def _save_notebook ( self , os_path , model , path ) :
""" save a notebook file """
# Save the notebook file
nb = nbformat . from_dict ( model [ ' content ' ] )
self . check_and_sign ( nb , path )
# One checkpoint should always exist for notebooks.
if not self . checkpoint_manager . list_checkpoints ( path ) :
self . checkpoint_manager . create_checkpoint ( nb , path )
with self . atomic_writing ( os_path , encoding = ' utf-8 ' ) as f :
nbformat . write ( nb , f , version = nbformat . NO_CONVERT )
@ -632,11 +636,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
self . run_pre_save_hook ( model = model , path = path )
cp_mgr = self . checkpoint_manager
# One checkpoint should always exist
if self . file_exists ( path ) and not cp_mgr . list_checkpoints ( path ) :
cp_mgr . create_checkpoint ( path )
os_path = self . _get_os_path ( path )
self . log . debug ( " Saving %s " , os_path )
try :