You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
930 lines
32 KiB
930 lines
32 KiB
5 months ago
|
from __future__ import annotations
|
||
|
|
||
|
import inspect
|
||
|
import logging
|
||
|
import os
|
||
|
import tempfile
|
||
|
import time
|
||
|
import weakref
|
||
|
from shutil import rmtree
|
||
|
from typing import TYPE_CHECKING, Any, Callable, ClassVar
|
||
|
|
||
|
from fsspec import AbstractFileSystem, filesystem
|
||
|
from fsspec.callbacks import DEFAULT_CALLBACK
|
||
|
from fsspec.compression import compr
|
||
|
from fsspec.core import BaseCache, MMapCache
|
||
|
from fsspec.exceptions import BlocksizeMismatchError
|
||
|
from fsspec.implementations.cache_mapper import create_cache_mapper
|
||
|
from fsspec.implementations.cache_metadata import CacheMetadata
|
||
|
from fsspec.spec import AbstractBufferedFile
|
||
|
from fsspec.transaction import Transaction
|
||
|
from fsspec.utils import infer_compression
|
||
|
|
||
|
if TYPE_CHECKING:
|
||
|
from fsspec.implementations.cache_mapper import AbstractCacheMapper
|
||
|
|
||
|
logger = logging.getLogger("fsspec.cached")
|
||
|
|
||
|
|
||
|
class WriteCachedTransaction(Transaction):
|
||
|
def complete(self, commit=True):
|
||
|
rpaths = [f.path for f in self.files]
|
||
|
lpaths = [f.fn for f in self.files]
|
||
|
if commit:
|
||
|
self.fs.put(lpaths, rpaths)
|
||
|
self.files.clear()
|
||
|
self.fs._intrans = False
|
||
|
self.fs._transaction = None
|
||
|
self.fs = None # break cycle
|
||
|
|
||
|
|
||
|
class CachingFileSystem(AbstractFileSystem):
|
||
|
"""Locally caching filesystem, layer over any other FS
|
||
|
|
||
|
This class implements chunk-wise local storage of remote files, for quick
|
||
|
access after the initial download. The files are stored in a given
|
||
|
directory with hashes of URLs for the filenames. If no directory is given,
|
||
|
a temporary one is used, which should be cleaned up by the OS after the
|
||
|
process ends. The files themselves are sparse (as implemented in
|
||
|
:class:`~fsspec.caching.MMapCache`), so only the data which is accessed
|
||
|
takes up space.
|
||
|
|
||
|
Restrictions:
|
||
|
|
||
|
- the block-size must be the same for each access of a given file, unless
|
||
|
all blocks of the file have already been read
|
||
|
- caching can only be applied to file-systems which produce files
|
||
|
derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
|
||
|
allowed, for testing
|
||
|
"""
|
||
|
|
||
|
protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
|
||
|
|
||
|
def __init__(
|
||
|
self,
|
||
|
target_protocol=None,
|
||
|
cache_storage="TMP",
|
||
|
cache_check=10,
|
||
|
check_files=False,
|
||
|
expiry_time=604800,
|
||
|
target_options=None,
|
||
|
fs=None,
|
||
|
same_names: bool | None = None,
|
||
|
compression=None,
|
||
|
cache_mapper: AbstractCacheMapper | None = None,
|
||
|
**kwargs,
|
||
|
):
|
||
|
"""
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
target_protocol: str (optional)
|
||
|
Target filesystem protocol. Provide either this or ``fs``.
|
||
|
cache_storage: str or list(str)
|
||
|
Location to store files. If "TMP", this is a temporary directory,
|
||
|
and will be cleaned up by the OS when this process ends (or later).
|
||
|
If a list, each location will be tried in the order given, but
|
||
|
only the last will be considered writable.
|
||
|
cache_check: int
|
||
|
Number of seconds between reload of cache metadata
|
||
|
check_files: bool
|
||
|
Whether to explicitly see if the UID of the remote file matches
|
||
|
the stored one before using. Warning: some file systems such as
|
||
|
HTTP cannot reliably give a unique hash of the contents of some
|
||
|
path, so be sure to set this option to False.
|
||
|
expiry_time: int
|
||
|
The time in seconds after which a local copy is considered useless.
|
||
|
Set to falsy to prevent expiry. The default is equivalent to one
|
||
|
week.
|
||
|
target_options: dict or None
|
||
|
Passed to the instantiation of the FS, if fs is None.
|
||
|
fs: filesystem instance
|
||
|
The target filesystem to run against. Provide this or ``protocol``.
|
||
|
same_names: bool (optional)
|
||
|
By default, target URLs are hashed using a ``HashCacheMapper`` so
|
||
|
that files from different backends with the same basename do not
|
||
|
conflict. If this argument is ``true``, a ``BasenameCacheMapper``
|
||
|
is used instead. Other cache mapper options are available by using
|
||
|
the ``cache_mapper`` keyword argument. Only one of this and
|
||
|
``cache_mapper`` should be specified.
|
||
|
compression: str (optional)
|
||
|
To decompress on download. Can be 'infer' (guess from the URL name),
|
||
|
one of the entries in ``fsspec.compression.compr``, or None for no
|
||
|
decompression.
|
||
|
cache_mapper: AbstractCacheMapper (optional)
|
||
|
The object use to map from original filenames to cached filenames.
|
||
|
Only one of this and ``same_names`` should be specified.
|
||
|
"""
|
||
|
super().__init__(**kwargs)
|
||
|
if fs is None and target_protocol is None:
|
||
|
raise ValueError(
|
||
|
"Please provide filesystem instance(fs) or target_protocol"
|
||
|
)
|
||
|
if not (fs is None) ^ (target_protocol is None):
|
||
|
raise ValueError(
|
||
|
"Both filesystems (fs) and target_protocol may not be both given."
|
||
|
)
|
||
|
if cache_storage == "TMP":
|
||
|
tempdir = tempfile.mkdtemp()
|
||
|
storage = [tempdir]
|
||
|
weakref.finalize(self, self._remove_tempdir, tempdir)
|
||
|
else:
|
||
|
if isinstance(cache_storage, str):
|
||
|
storage = [cache_storage]
|
||
|
else:
|
||
|
storage = cache_storage
|
||
|
os.makedirs(storage[-1], exist_ok=True)
|
||
|
self.storage = storage
|
||
|
self.kwargs = target_options or {}
|
||
|
self.cache_check = cache_check
|
||
|
self.check_files = check_files
|
||
|
self.expiry = expiry_time
|
||
|
self.compression = compression
|
||
|
|
||
|
# Size of cache in bytes. If None then the size is unknown and will be
|
||
|
# recalculated the next time cache_size() is called. On writes to the
|
||
|
# cache this is reset to None.
|
||
|
self._cache_size = None
|
||
|
|
||
|
if same_names is not None and cache_mapper is not None:
|
||
|
raise ValueError(
|
||
|
"Cannot specify both same_names and cache_mapper in "
|
||
|
"CachingFileSystem.__init__"
|
||
|
)
|
||
|
if cache_mapper is not None:
|
||
|
self._mapper = cache_mapper
|
||
|
else:
|
||
|
self._mapper = create_cache_mapper(
|
||
|
same_names if same_names is not None else False
|
||
|
)
|
||
|
|
||
|
self.target_protocol = (
|
||
|
target_protocol
|
||
|
if isinstance(target_protocol, str)
|
||
|
else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
|
||
|
)
|
||
|
self._metadata = CacheMetadata(self.storage)
|
||
|
self.load_cache()
|
||
|
self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
|
||
|
|
||
|
def _strip_protocol(path):
|
||
|
# acts as a method, since each instance has a difference target
|
||
|
return self.fs._strip_protocol(type(self)._strip_protocol(path))
|
||
|
|
||
|
self._strip_protocol: Callable = _strip_protocol
|
||
|
|
||
|
@staticmethod
|
||
|
def _remove_tempdir(tempdir):
|
||
|
try:
|
||
|
rmtree(tempdir)
|
||
|
except Exception:
|
||
|
pass
|
||
|
|
||
|
def _mkcache(self):
|
||
|
os.makedirs(self.storage[-1], exist_ok=True)
|
||
|
|
||
|
def cache_size(self):
|
||
|
"""Return size of cache in bytes.
|
||
|
|
||
|
If more than one cache directory is in use, only the size of the last
|
||
|
one (the writable cache directory) is returned.
|
||
|
"""
|
||
|
if self._cache_size is None:
|
||
|
cache_dir = self.storage[-1]
|
||
|
self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
|
||
|
return self._cache_size
|
||
|
|
||
|
def load_cache(self):
|
||
|
"""Read set of stored blocks from file"""
|
||
|
self._metadata.load()
|
||
|
self._mkcache()
|
||
|
self.last_cache = time.time()
|
||
|
|
||
|
def save_cache(self):
|
||
|
"""Save set of stored blocks from file"""
|
||
|
self._mkcache()
|
||
|
self._metadata.save()
|
||
|
self.last_cache = time.time()
|
||
|
self._cache_size = None
|
||
|
|
||
|
def _check_cache(self):
|
||
|
"""Reload caches if time elapsed or any disappeared"""
|
||
|
self._mkcache()
|
||
|
if not self.cache_check:
|
||
|
# explicitly told not to bother checking
|
||
|
return
|
||
|
timecond = time.time() - self.last_cache > self.cache_check
|
||
|
existcond = all(os.path.exists(storage) for storage in self.storage)
|
||
|
if timecond or not existcond:
|
||
|
self.load_cache()
|
||
|
|
||
|
def _check_file(self, path):
|
||
|
"""Is path in cache and still valid"""
|
||
|
path = self._strip_protocol(path)
|
||
|
self._check_cache()
|
||
|
return self._metadata.check_file(path, self)
|
||
|
|
||
|
def clear_cache(self):
|
||
|
"""Remove all files and metadata from the cache
|
||
|
|
||
|
In the case of multiple cache locations, this clears only the last one,
|
||
|
which is assumed to be the read/write one.
|
||
|
"""
|
||
|
rmtree(self.storage[-1])
|
||
|
self.load_cache()
|
||
|
self._cache_size = None
|
||
|
|
||
|
def clear_expired_cache(self, expiry_time=None):
|
||
|
"""Remove all expired files and metadata from the cache
|
||
|
|
||
|
In the case of multiple cache locations, this clears only the last one,
|
||
|
which is assumed to be the read/write one.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
expiry_time: int
|
||
|
The time in seconds after which a local copy is considered useless.
|
||
|
If not defined the default is equivalent to the attribute from the
|
||
|
file caching instantiation.
|
||
|
"""
|
||
|
|
||
|
if not expiry_time:
|
||
|
expiry_time = self.expiry
|
||
|
|
||
|
self._check_cache()
|
||
|
|
||
|
expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
|
||
|
for fn in expired_files:
|
||
|
if os.path.exists(fn):
|
||
|
os.remove(fn)
|
||
|
|
||
|
if writable_cache_empty:
|
||
|
rmtree(self.storage[-1])
|
||
|
self.load_cache()
|
||
|
|
||
|
self._cache_size = None
|
||
|
|
||
|
def pop_from_cache(self, path):
|
||
|
"""Remove cached version of given file
|
||
|
|
||
|
Deletes local copy of the given (remote) path. If it is found in a cache
|
||
|
location which is not the last, it is assumed to be read-only, and
|
||
|
raises PermissionError
|
||
|
"""
|
||
|
path = self._strip_protocol(path)
|
||
|
fn = self._metadata.pop_file(path)
|
||
|
if fn is not None:
|
||
|
os.remove(fn)
|
||
|
self._cache_size = None
|
||
|
|
||
|
def _open(
|
||
|
self,
|
||
|
path,
|
||
|
mode="rb",
|
||
|
block_size=None,
|
||
|
autocommit=True,
|
||
|
cache_options=None,
|
||
|
**kwargs,
|
||
|
):
|
||
|
"""Wrap the target _open
|
||
|
|
||
|
If the whole file exists in the cache, just open it locally and
|
||
|
return that.
|
||
|
|
||
|
Otherwise, open the file on the target FS, and make it have a mmap
|
||
|
cache pointing to the location which we determine, in our cache.
|
||
|
The ``blocks`` instance is shared, so as the mmap cache instance
|
||
|
updates, so does the entry in our ``cached_files`` attribute.
|
||
|
We monkey-patch this file, so that when it closes, we call
|
||
|
``close_and_update`` to save the state of the blocks.
|
||
|
"""
|
||
|
path = self._strip_protocol(path)
|
||
|
|
||
|
path = self.fs._strip_protocol(path)
|
||
|
if "r" not in mode:
|
||
|
return self.fs._open(
|
||
|
path,
|
||
|
mode=mode,
|
||
|
block_size=block_size,
|
||
|
autocommit=autocommit,
|
||
|
cache_options=cache_options,
|
||
|
**kwargs,
|
||
|
)
|
||
|
detail = self._check_file(path)
|
||
|
if detail:
|
||
|
# file is in cache
|
||
|
detail, fn = detail
|
||
|
hash, blocks = detail["fn"], detail["blocks"]
|
||
|
if blocks is True:
|
||
|
# stored file is complete
|
||
|
logger.debug("Opening local copy of %s", path)
|
||
|
return open(fn, mode)
|
||
|
# TODO: action where partial file exists in read-only cache
|
||
|
logger.debug("Opening partially cached copy of %s", path)
|
||
|
else:
|
||
|
hash = self._mapper(path)
|
||
|
fn = os.path.join(self.storage[-1], hash)
|
||
|
blocks = set()
|
||
|
detail = {
|
||
|
"original": path,
|
||
|
"fn": hash,
|
||
|
"blocks": blocks,
|
||
|
"time": time.time(),
|
||
|
"uid": self.fs.ukey(path),
|
||
|
}
|
||
|
self._metadata.update_file(path, detail)
|
||
|
logger.debug("Creating local sparse file for %s", path)
|
||
|
|
||
|
# call target filesystems open
|
||
|
self._mkcache()
|
||
|
f = self.fs._open(
|
||
|
path,
|
||
|
mode=mode,
|
||
|
block_size=block_size,
|
||
|
autocommit=autocommit,
|
||
|
cache_options=cache_options,
|
||
|
cache_type="none",
|
||
|
**kwargs,
|
||
|
)
|
||
|
if self.compression:
|
||
|
comp = (
|
||
|
infer_compression(path)
|
||
|
if self.compression == "infer"
|
||
|
else self.compression
|
||
|
)
|
||
|
f = compr[comp](f, mode="rb")
|
||
|
if "blocksize" in detail:
|
||
|
if detail["blocksize"] != f.blocksize:
|
||
|
raise BlocksizeMismatchError(
|
||
|
f"Cached file must be reopened with same block"
|
||
|
f" size as original (old: {detail['blocksize']},"
|
||
|
f" new {f.blocksize})"
|
||
|
)
|
||
|
else:
|
||
|
detail["blocksize"] = f.blocksize
|
||
|
f.cache = MMapCache(f.blocksize, f._fetch_range, f.size, fn, blocks)
|
||
|
close = f.close
|
||
|
f.close = lambda: self.close_and_update(f, close)
|
||
|
self.save_cache()
|
||
|
return f
|
||
|
|
||
|
def _parent(self, path):
|
||
|
return self.fs._parent(path)
|
||
|
|
||
|
def hash_name(self, path: str, *args: Any) -> str:
|
||
|
# Kept for backward compatibility with downstream libraries.
|
||
|
# Ignores extra arguments, previously same_name boolean.
|
||
|
return self._mapper(path)
|
||
|
|
||
|
def close_and_update(self, f, close):
|
||
|
"""Called when a file is closing, so store the set of blocks"""
|
||
|
if f.closed:
|
||
|
return
|
||
|
path = self._strip_protocol(f.path)
|
||
|
self._metadata.on_close_cached_file(f, path)
|
||
|
try:
|
||
|
logger.debug("going to save")
|
||
|
self.save_cache()
|
||
|
logger.debug("saved")
|
||
|
except OSError:
|
||
|
logger.debug("Cache saving failed while closing file")
|
||
|
except NameError:
|
||
|
logger.debug("Cache save failed due to interpreter shutdown")
|
||
|
close()
|
||
|
f.closed = True
|
||
|
|
||
|
def ls(self, path, detail=True):
|
||
|
return self.fs.ls(path, detail)
|
||
|
|
||
|
def __getattribute__(self, item):
|
||
|
if item in {
|
||
|
"load_cache",
|
||
|
"_open",
|
||
|
"save_cache",
|
||
|
"close_and_update",
|
||
|
"__init__",
|
||
|
"__getattribute__",
|
||
|
"__reduce__",
|
||
|
"_make_local_details",
|
||
|
"open",
|
||
|
"cat",
|
||
|
"cat_file",
|
||
|
"cat_ranges",
|
||
|
"get",
|
||
|
"read_block",
|
||
|
"tail",
|
||
|
"head",
|
||
|
"info",
|
||
|
"ls",
|
||
|
"exists",
|
||
|
"isfile",
|
||
|
"isdir",
|
||
|
"_check_file",
|
||
|
"_check_cache",
|
||
|
"_mkcache",
|
||
|
"clear_cache",
|
||
|
"clear_expired_cache",
|
||
|
"pop_from_cache",
|
||
|
"local_file",
|
||
|
"_paths_from_path",
|
||
|
"get_mapper",
|
||
|
"open_many",
|
||
|
"commit_many",
|
||
|
"hash_name",
|
||
|
"__hash__",
|
||
|
"__eq__",
|
||
|
"to_json",
|
||
|
"to_dict",
|
||
|
"cache_size",
|
||
|
"pipe_file",
|
||
|
"pipe",
|
||
|
"start_transaction",
|
||
|
"end_transaction",
|
||
|
}:
|
||
|
# all the methods defined in this class. Note `open` here, since
|
||
|
# it calls `_open`, but is actually in superclass
|
||
|
return lambda *args, **kw: getattr(type(self), item).__get__(self)(
|
||
|
*args, **kw
|
||
|
)
|
||
|
if item in ["__reduce_ex__"]:
|
||
|
raise AttributeError
|
||
|
if item in ["transaction"]:
|
||
|
# property
|
||
|
return type(self).transaction.__get__(self)
|
||
|
if item in ["_cache", "transaction_type"]:
|
||
|
# class attributes
|
||
|
return getattr(type(self), item)
|
||
|
if item == "__class__":
|
||
|
return type(self)
|
||
|
d = object.__getattribute__(self, "__dict__")
|
||
|
fs = d.get("fs", None) # fs is not immediately defined
|
||
|
if item in d:
|
||
|
return d[item]
|
||
|
elif fs is not None:
|
||
|
if item in fs.__dict__:
|
||
|
# attribute of instance
|
||
|
return fs.__dict__[item]
|
||
|
# attributed belonging to the target filesystem
|
||
|
cls = type(fs)
|
||
|
m = getattr(cls, item)
|
||
|
if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
|
||
|
not hasattr(m, "__self__") or m.__self__ is None
|
||
|
):
|
||
|
# instance method
|
||
|
return m.__get__(fs, cls)
|
||
|
return m # class method or attribute
|
||
|
else:
|
||
|
# attributes of the superclass, while target is being set up
|
||
|
return super().__getattribute__(item)
|
||
|
|
||
|
def __eq__(self, other):
|
||
|
"""Test for equality."""
|
||
|
if self is other:
|
||
|
return True
|
||
|
if not isinstance(other, type(self)):
|
||
|
return False
|
||
|
return (
|
||
|
self.storage == other.storage
|
||
|
and self.kwargs == other.kwargs
|
||
|
and self.cache_check == other.cache_check
|
||
|
and self.check_files == other.check_files
|
||
|
and self.expiry == other.expiry
|
||
|
and self.compression == other.compression
|
||
|
and self._mapper == other._mapper
|
||
|
and self.target_protocol == other.target_protocol
|
||
|
)
|
||
|
|
||
|
def __hash__(self):
|
||
|
"""Calculate hash."""
|
||
|
return (
|
||
|
hash(tuple(self.storage))
|
||
|
^ hash(str(self.kwargs))
|
||
|
^ hash(self.cache_check)
|
||
|
^ hash(self.check_files)
|
||
|
^ hash(self.expiry)
|
||
|
^ hash(self.compression)
|
||
|
^ hash(self._mapper)
|
||
|
^ hash(self.target_protocol)
|
||
|
)
|
||
|
|
||
|
|
||
|
class WholeFileCacheFileSystem(CachingFileSystem):
|
||
|
"""Caches whole remote files on first access
|
||
|
|
||
|
This class is intended as a layer over any other file system, and
|
||
|
will make a local copy of each file accessed, so that all subsequent
|
||
|
reads are local. This is similar to ``CachingFileSystem``, but without
|
||
|
the block-wise functionality and so can work even when sparse files
|
||
|
are not allowed. See its docstring for definition of the init
|
||
|
arguments.
|
||
|
|
||
|
The class still needs access to the remote store for listing files,
|
||
|
and may refresh cached files.
|
||
|
"""
|
||
|
|
||
|
protocol = "filecache"
|
||
|
local_file = True
|
||
|
|
||
|
def open_many(self, open_files, **kwargs):
|
||
|
paths = [of.path for of in open_files]
|
||
|
if "r" in open_files.mode:
|
||
|
self._mkcache()
|
||
|
else:
|
||
|
return [
|
||
|
LocalTempFile(
|
||
|
self.fs,
|
||
|
path,
|
||
|
mode=open_files.mode,
|
||
|
fn=os.path.join(self.storage[-1], self._mapper(path)),
|
||
|
**kwargs,
|
||
|
)
|
||
|
for path in paths
|
||
|
]
|
||
|
|
||
|
if self.compression:
|
||
|
raise NotImplementedError
|
||
|
details = [self._check_file(sp) for sp in paths]
|
||
|
downpath = [p for p, d in zip(paths, details) if not d]
|
||
|
downfn0 = [
|
||
|
os.path.join(self.storage[-1], self._mapper(p))
|
||
|
for p, d in zip(paths, details)
|
||
|
] # keep these path names for opening later
|
||
|
downfn = [fn for fn, d in zip(downfn0, details) if not d]
|
||
|
if downpath:
|
||
|
# skip if all files are already cached and up to date
|
||
|
self.fs.get(downpath, downfn)
|
||
|
|
||
|
# update metadata - only happens when downloads are successful
|
||
|
newdetail = [
|
||
|
{
|
||
|
"original": path,
|
||
|
"fn": self._mapper(path),
|
||
|
"blocks": True,
|
||
|
"time": time.time(),
|
||
|
"uid": self.fs.ukey(path),
|
||
|
}
|
||
|
for path in downpath
|
||
|
]
|
||
|
for path, detail in zip(downpath, newdetail):
|
||
|
self._metadata.update_file(path, detail)
|
||
|
self.save_cache()
|
||
|
|
||
|
def firstpart(fn):
|
||
|
# helper to adapt both whole-file and simple-cache
|
||
|
return fn[1] if isinstance(fn, tuple) else fn
|
||
|
|
||
|
return [
|
||
|
open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
|
||
|
for fn0, fn1 in zip(details, downfn0)
|
||
|
]
|
||
|
|
||
|
def commit_many(self, open_files):
|
||
|
self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
|
||
|
[f.close() for f in open_files]
|
||
|
for f in open_files:
|
||
|
# in case autocommit is off, and so close did not already delete
|
||
|
try:
|
||
|
os.remove(f.name)
|
||
|
except FileNotFoundError:
|
||
|
pass
|
||
|
self._cache_size = None
|
||
|
|
||
|
def _make_local_details(self, path):
|
||
|
hash = self._mapper(path)
|
||
|
fn = os.path.join(self.storage[-1], hash)
|
||
|
detail = {
|
||
|
"original": path,
|
||
|
"fn": hash,
|
||
|
"blocks": True,
|
||
|
"time": time.time(),
|
||
|
"uid": self.fs.ukey(path),
|
||
|
}
|
||
|
self._metadata.update_file(path, detail)
|
||
|
logger.debug("Copying %s to local cache", path)
|
||
|
return fn
|
||
|
|
||
|
def cat(
|
||
|
self,
|
||
|
path,
|
||
|
recursive=False,
|
||
|
on_error="raise",
|
||
|
callback=DEFAULT_CALLBACK,
|
||
|
**kwargs,
|
||
|
):
|
||
|
paths = self.expand_path(
|
||
|
path, recursive=recursive, maxdepth=kwargs.get("maxdepth", None)
|
||
|
)
|
||
|
getpaths = []
|
||
|
storepaths = []
|
||
|
fns = []
|
||
|
out = {}
|
||
|
for p in paths.copy():
|
||
|
try:
|
||
|
detail = self._check_file(p)
|
||
|
if not detail:
|
||
|
fn = self._make_local_details(p)
|
||
|
getpaths.append(p)
|
||
|
storepaths.append(fn)
|
||
|
else:
|
||
|
detail, fn = detail if isinstance(detail, tuple) else (None, detail)
|
||
|
fns.append(fn)
|
||
|
except Exception as e:
|
||
|
if on_error == "raise":
|
||
|
raise
|
||
|
if on_error == "return":
|
||
|
out[p] = e
|
||
|
paths.remove(p)
|
||
|
|
||
|
if getpaths:
|
||
|
self.fs.get(getpaths, storepaths)
|
||
|
self.save_cache()
|
||
|
|
||
|
callback.set_size(len(paths))
|
||
|
for p, fn in zip(paths, fns):
|
||
|
with open(fn, "rb") as f:
|
||
|
out[p] = f.read()
|
||
|
callback.relative_update(1)
|
||
|
if isinstance(path, str) and len(paths) == 1 and recursive is False:
|
||
|
out = out[paths[0]]
|
||
|
return out
|
||
|
|
||
|
def _open(self, path, mode="rb", **kwargs):
|
||
|
path = self._strip_protocol(path)
|
||
|
if "r" not in mode:
|
||
|
hash = self._mapper(path)
|
||
|
fn = os.path.join(self.storage[-1], hash)
|
||
|
user_specified_kwargs = {
|
||
|
k: v
|
||
|
for k, v in kwargs.items()
|
||
|
# those kwargs were added by open(), we don't want them
|
||
|
if k not in ["autocommit", "block_size", "cache_options"]
|
||
|
}
|
||
|
return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
|
||
|
detail = self._check_file(path)
|
||
|
if detail:
|
||
|
detail, fn = detail
|
||
|
_, blocks = detail["fn"], detail["blocks"]
|
||
|
if blocks is True:
|
||
|
logger.debug("Opening local copy of %s", path)
|
||
|
|
||
|
# In order to support downstream filesystems to be able to
|
||
|
# infer the compression from the original filename, like
|
||
|
# the `TarFileSystem`, let's extend the `io.BufferedReader`
|
||
|
# fileobject protocol by adding a dedicated attribute
|
||
|
# `original`.
|
||
|
f = open(fn, mode)
|
||
|
f.original = detail.get("original")
|
||
|
return f
|
||
|
else:
|
||
|
raise ValueError(
|
||
|
f"Attempt to open partially cached file {path}"
|
||
|
f" as a wholly cached file"
|
||
|
)
|
||
|
else:
|
||
|
fn = self._make_local_details(path)
|
||
|
kwargs["mode"] = mode
|
||
|
|
||
|
# call target filesystems open
|
||
|
self._mkcache()
|
||
|
if self.compression:
|
||
|
with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
|
||
|
if isinstance(f, AbstractBufferedFile):
|
||
|
# want no type of caching if just downloading whole thing
|
||
|
f.cache = BaseCache(0, f.cache.fetcher, f.size)
|
||
|
comp = (
|
||
|
infer_compression(path)
|
||
|
if self.compression == "infer"
|
||
|
else self.compression
|
||
|
)
|
||
|
f = compr[comp](f, mode="rb")
|
||
|
data = True
|
||
|
while data:
|
||
|
block = getattr(f, "blocksize", 5 * 2**20)
|
||
|
data = f.read(block)
|
||
|
f2.write(data)
|
||
|
else:
|
||
|
self.fs.get_file(path, fn)
|
||
|
self.save_cache()
|
||
|
return self._open(path, mode)
|
||
|
|
||
|
|
||
|
class SimpleCacheFileSystem(WholeFileCacheFileSystem):
|
||
|
"""Caches whole remote files on first access
|
||
|
|
||
|
This class is intended as a layer over any other file system, and
|
||
|
will make a local copy of each file accessed, so that all subsequent
|
||
|
reads are local. This implementation only copies whole files, and
|
||
|
does not keep any metadata about the download time or file details.
|
||
|
It is therefore safer to use in multi-threaded/concurrent situations.
|
||
|
|
||
|
This is the only of the caching filesystems that supports write: you will
|
||
|
be given a real local open file, and upon close and commit, it will be
|
||
|
uploaded to the target filesystem; the writability or the target URL is
|
||
|
not checked until that time.
|
||
|
|
||
|
"""
|
||
|
|
||
|
protocol = "simplecache"
|
||
|
local_file = True
|
||
|
transaction_type = WriteCachedTransaction
|
||
|
|
||
|
def __init__(self, **kwargs):
|
||
|
kw = kwargs.copy()
|
||
|
for key in ["cache_check", "expiry_time", "check_files"]:
|
||
|
kw[key] = False
|
||
|
super().__init__(**kw)
|
||
|
for storage in self.storage:
|
||
|
if not os.path.exists(storage):
|
||
|
os.makedirs(storage, exist_ok=True)
|
||
|
|
||
|
def _check_file(self, path):
|
||
|
self._check_cache()
|
||
|
sha = self._mapper(path)
|
||
|
for storage in self.storage:
|
||
|
fn = os.path.join(storage, sha)
|
||
|
if os.path.exists(fn):
|
||
|
return fn
|
||
|
|
||
|
def save_cache(self):
|
||
|
pass
|
||
|
|
||
|
def load_cache(self):
|
||
|
pass
|
||
|
|
||
|
def pipe_file(self, path, value=None, **kwargs):
|
||
|
if self._intrans:
|
||
|
with self.open(path, "wb") as f:
|
||
|
f.write(value)
|
||
|
else:
|
||
|
super().pipe_file(path, value)
|
||
|
|
||
|
def ls(self, path, detail=True, **kwargs):
|
||
|
path = self._strip_protocol(path)
|
||
|
details = []
|
||
|
try:
|
||
|
details = self.fs.ls(
|
||
|
path, detail=True, **kwargs
|
||
|
).copy() # don't edit original!
|
||
|
except FileNotFoundError as e:
|
||
|
ex = e
|
||
|
else:
|
||
|
ex = None
|
||
|
if self._intrans:
|
||
|
path1 = path.rstrip("/") + "/"
|
||
|
for f in self.transaction.files:
|
||
|
if f.path == path:
|
||
|
details.append(
|
||
|
{"name": path, "size": f.size or f.tell(), "type": "file"}
|
||
|
)
|
||
|
elif f.path.startswith(path1):
|
||
|
if f.path.count("/") == path1.count("/"):
|
||
|
details.append(
|
||
|
{"name": f.path, "size": f.size or f.tell(), "type": "file"}
|
||
|
)
|
||
|
else:
|
||
|
dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
|
||
|
details.append({"name": dname, "size": 0, "type": "directory"})
|
||
|
if ex is not None and not details:
|
||
|
raise ex
|
||
|
if detail:
|
||
|
return details
|
||
|
return sorted(_["name"] for _ in details)
|
||
|
|
||
|
def info(self, path, **kwargs):
|
||
|
path = self._strip_protocol(path)
|
||
|
if self._intrans:
|
||
|
f = [_ for _ in self.transaction.files if _.path == path]
|
||
|
if f:
|
||
|
size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
|
||
|
return {"name": path, "size": size, "type": "file"}
|
||
|
f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
|
||
|
if f:
|
||
|
return {"name": path, "size": 0, "type": "directory"}
|
||
|
return self.fs.info(path, **kwargs)
|
||
|
|
||
|
def pipe(self, path, value=None, **kwargs):
|
||
|
if isinstance(path, str):
|
||
|
self.pipe_file(self._strip_protocol(path), value, **kwargs)
|
||
|
elif isinstance(path, dict):
|
||
|
for k, v in path.items():
|
||
|
self.pipe_file(self._strip_protocol(k), v, **kwargs)
|
||
|
else:
|
||
|
raise ValueError("path must be str or dict")
|
||
|
|
||
|
def cat_ranges(
|
||
|
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
|
||
|
):
|
||
|
lpaths = [self._check_file(p) for p in paths]
|
||
|
rpaths = [p for l, p in zip(lpaths, paths) if l is False]
|
||
|
lpaths = [l for l, p in zip(lpaths, paths) if l is False]
|
||
|
self.fs.get(rpaths, lpaths)
|
||
|
return super().cat_ranges(
|
||
|
paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
|
||
|
)
|
||
|
|
||
|
def _open(self, path, mode="rb", **kwargs):
|
||
|
path = self._strip_protocol(path)
|
||
|
sha = self._mapper(path)
|
||
|
|
||
|
if "r" not in mode:
|
||
|
fn = os.path.join(self.storage[-1], sha)
|
||
|
user_specified_kwargs = {
|
||
|
k: v
|
||
|
for k, v in kwargs.items()
|
||
|
if k not in ["autocommit", "block_size", "cache_options"]
|
||
|
} # those were added by open()
|
||
|
return LocalTempFile(
|
||
|
self,
|
||
|
path,
|
||
|
mode=mode,
|
||
|
autocommit=not self._intrans,
|
||
|
fn=fn,
|
||
|
**user_specified_kwargs,
|
||
|
)
|
||
|
fn = self._check_file(path)
|
||
|
if fn:
|
||
|
return open(fn, mode)
|
||
|
|
||
|
fn = os.path.join(self.storage[-1], sha)
|
||
|
logger.debug("Copying %s to local cache", path)
|
||
|
kwargs["mode"] = mode
|
||
|
|
||
|
self._mkcache()
|
||
|
self._cache_size = None
|
||
|
if self.compression:
|
||
|
with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
|
||
|
if isinstance(f, AbstractBufferedFile):
|
||
|
# want no type of caching if just downloading whole thing
|
||
|
f.cache = BaseCache(0, f.cache.fetcher, f.size)
|
||
|
comp = (
|
||
|
infer_compression(path)
|
||
|
if self.compression == "infer"
|
||
|
else self.compression
|
||
|
)
|
||
|
f = compr[comp](f, mode="rb")
|
||
|
data = True
|
||
|
while data:
|
||
|
block = getattr(f, "blocksize", 5 * 2**20)
|
||
|
data = f.read(block)
|
||
|
f2.write(data)
|
||
|
else:
|
||
|
self.fs.get_file(path, fn)
|
||
|
return self._open(path, mode)
|
||
|
|
||
|
|
||
|
class LocalTempFile:
|
||
|
"""A temporary local file, which will be uploaded on commit"""
|
||
|
|
||
|
def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
|
||
|
self.fn = fn
|
||
|
self.fh = open(fn, mode)
|
||
|
self.mode = mode
|
||
|
if seek:
|
||
|
self.fh.seek(seek)
|
||
|
self.path = path
|
||
|
self.size = None
|
||
|
self.fs = fs
|
||
|
self.closed = False
|
||
|
self.autocommit = autocommit
|
||
|
self.kwargs = kwargs
|
||
|
|
||
|
def __reduce__(self):
|
||
|
# always open in r+b to allow continuing writing at a location
|
||
|
return (
|
||
|
LocalTempFile,
|
||
|
(self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
|
||
|
)
|
||
|
|
||
|
def __enter__(self):
|
||
|
return self.fh
|
||
|
|
||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
|
self.close()
|
||
|
|
||
|
def close(self):
|
||
|
# self.size = self.fh.tell()
|
||
|
if self.closed:
|
||
|
return
|
||
|
self.fh.close()
|
||
|
self.closed = True
|
||
|
if self.autocommit:
|
||
|
self.commit()
|
||
|
|
||
|
def discard(self):
|
||
|
self.fh.close()
|
||
|
os.remove(self.fn)
|
||
|
|
||
|
def commit(self):
|
||
|
self.fs.put(self.fn, self.path, **self.kwargs)
|
||
|
# we do not delete local copy - it's still in the cache
|
||
|
|
||
|
@property
|
||
|
def name(self):
|
||
|
return self.fn
|
||
|
|
||
|
def __repr__(self) -> str:
|
||
|
return f"LocalTempFile: {self.path}"
|
||
|
|
||
|
def __getattr__(self, item):
|
||
|
return getattr(self.fh, item)
|