You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
325 lines
9.9 KiB
325 lines
9.9 KiB
5 months ago
|
import argparse
|
||
|
import logging
|
||
|
import os
|
||
|
import stat
|
||
|
import threading
|
||
|
import time
|
||
|
from errno import EIO, ENOENT
|
||
|
|
||
|
from fuse import FUSE, FuseOSError, LoggingMixIn, Operations
|
||
|
|
||
|
from fsspec import __version__
|
||
|
from fsspec.core import url_to_fs
|
||
|
|
||
|
logger = logging.getLogger("fsspec.fuse")
|
||
|
|
||
|
|
||
|
class FUSEr(Operations):
|
||
|
def __init__(self, fs, path, ready_file=False):
|
||
|
self.fs = fs
|
||
|
self.cache = {}
|
||
|
self.root = path.rstrip("/") + "/"
|
||
|
self.counter = 0
|
||
|
logger.info("Starting FUSE at %s", path)
|
||
|
self._ready_file = ready_file
|
||
|
|
||
|
def getattr(self, path, fh=None):
|
||
|
logger.debug("getattr %s", path)
|
||
|
if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]:
|
||
|
return {"type": "file", "st_size": 5}
|
||
|
|
||
|
path = "".join([self.root, path.lstrip("/")]).rstrip("/")
|
||
|
try:
|
||
|
info = self.fs.info(path)
|
||
|
except FileNotFoundError:
|
||
|
raise FuseOSError(ENOENT)
|
||
|
|
||
|
data = {"st_uid": info.get("uid", 1000), "st_gid": info.get("gid", 1000)}
|
||
|
perm = info.get("mode", 0o777)
|
||
|
|
||
|
if info["type"] != "file":
|
||
|
data["st_mode"] = stat.S_IFDIR | perm
|
||
|
data["st_size"] = 0
|
||
|
data["st_blksize"] = 0
|
||
|
else:
|
||
|
data["st_mode"] = stat.S_IFREG | perm
|
||
|
data["st_size"] = info["size"]
|
||
|
data["st_blksize"] = 5 * 2**20
|
||
|
data["st_nlink"] = 1
|
||
|
data["st_atime"] = info["atime"] if "atime" in info else time.time()
|
||
|
data["st_ctime"] = info["ctime"] if "ctime" in info else time.time()
|
||
|
data["st_mtime"] = info["mtime"] if "mtime" in info else time.time()
|
||
|
return data
|
||
|
|
||
|
def readdir(self, path, fh):
|
||
|
logger.debug("readdir %s", path)
|
||
|
path = "".join([self.root, path.lstrip("/")])
|
||
|
files = self.fs.ls(path, False)
|
||
|
files = [os.path.basename(f.rstrip("/")) for f in files]
|
||
|
return [".", ".."] + files
|
||
|
|
||
|
def mkdir(self, path, mode):
|
||
|
path = "".join([self.root, path.lstrip("/")])
|
||
|
self.fs.mkdir(path)
|
||
|
return 0
|
||
|
|
||
|
def rmdir(self, path):
|
||
|
path = "".join([self.root, path.lstrip("/")])
|
||
|
self.fs.rmdir(path)
|
||
|
return 0
|
||
|
|
||
|
def read(self, path, size, offset, fh):
|
||
|
logger.debug("read %s", (path, size, offset))
|
||
|
if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]:
|
||
|
# status indicator
|
||
|
return b"ready"
|
||
|
|
||
|
f = self.cache[fh]
|
||
|
f.seek(offset)
|
||
|
out = f.read(size)
|
||
|
return out
|
||
|
|
||
|
def write(self, path, data, offset, fh):
|
||
|
logger.debug("write %s", (path, offset))
|
||
|
f = self.cache[fh]
|
||
|
f.seek(offset)
|
||
|
f.write(data)
|
||
|
return len(data)
|
||
|
|
||
|
def create(self, path, flags, fi=None):
|
||
|
logger.debug("create %s", (path, flags))
|
||
|
fn = "".join([self.root, path.lstrip("/")])
|
||
|
self.fs.touch(fn) # OS will want to get attributes immediately
|
||
|
f = self.fs.open(fn, "wb")
|
||
|
self.cache[self.counter] = f
|
||
|
self.counter += 1
|
||
|
return self.counter - 1
|
||
|
|
||
|
def open(self, path, flags):
|
||
|
logger.debug("open %s", (path, flags))
|
||
|
fn = "".join([self.root, path.lstrip("/")])
|
||
|
if flags % 2 == 0:
|
||
|
# read
|
||
|
mode = "rb"
|
||
|
else:
|
||
|
# write/create
|
||
|
mode = "wb"
|
||
|
self.cache[self.counter] = self.fs.open(fn, mode)
|
||
|
self.counter += 1
|
||
|
return self.counter - 1
|
||
|
|
||
|
def truncate(self, path, length, fh=None):
|
||
|
fn = "".join([self.root, path.lstrip("/")])
|
||
|
if length != 0:
|
||
|
raise NotImplementedError
|
||
|
# maybe should be no-op since open with write sets size to zero anyway
|
||
|
self.fs.touch(fn)
|
||
|
|
||
|
def unlink(self, path):
|
||
|
fn = "".join([self.root, path.lstrip("/")])
|
||
|
try:
|
||
|
self.fs.rm(fn, False)
|
||
|
except (OSError, FileNotFoundError):
|
||
|
raise FuseOSError(EIO)
|
||
|
|
||
|
def release(self, path, fh):
|
||
|
try:
|
||
|
if fh in self.cache:
|
||
|
f = self.cache[fh]
|
||
|
f.close()
|
||
|
self.cache.pop(fh)
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
return 0
|
||
|
|
||
|
def chmod(self, path, mode):
|
||
|
if hasattr(self.fs, "chmod"):
|
||
|
path = "".join([self.root, path.lstrip("/")])
|
||
|
return self.fs.chmod(path, mode)
|
||
|
raise NotImplementedError
|
||
|
|
||
|
|
||
|
def run(
|
||
|
fs,
|
||
|
path,
|
||
|
mount_point,
|
||
|
foreground=True,
|
||
|
threads=False,
|
||
|
ready_file=False,
|
||
|
ops_class=FUSEr,
|
||
|
):
|
||
|
"""Mount stuff in a local directory
|
||
|
|
||
|
This uses fusepy to make it appear as if a given path on an fsspec
|
||
|
instance is in fact resident within the local file-system.
|
||
|
|
||
|
This requires that fusepy by installed, and that FUSE be available on
|
||
|
the system (typically requiring a package to be installed with
|
||
|
apt, yum, brew, etc.).
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
fs: file-system instance
|
||
|
From one of the compatible implementations
|
||
|
path: str
|
||
|
Location on that file-system to regard as the root directory to
|
||
|
mount. Note that you typically should include the terminating "/"
|
||
|
character.
|
||
|
mount_point: str
|
||
|
An empty directory on the local file-system where the contents of
|
||
|
the remote path will appear.
|
||
|
foreground: bool
|
||
|
Whether or not calling this function will block. Operation will
|
||
|
typically be more stable if True.
|
||
|
threads: bool
|
||
|
Whether or not to create threads when responding to file operations
|
||
|
within the mounter directory. Operation will typically be more
|
||
|
stable if False.
|
||
|
ready_file: bool
|
||
|
Whether the FUSE process is ready. The ``.fuse_ready`` file will
|
||
|
exist in the ``mount_point`` directory if True. Debugging purpose.
|
||
|
ops_class: FUSEr or Subclass of FUSEr
|
||
|
To override the default behavior of FUSEr. For Example, logging
|
||
|
to file.
|
||
|
|
||
|
"""
|
||
|
func = lambda: FUSE(
|
||
|
ops_class(fs, path, ready_file=ready_file),
|
||
|
mount_point,
|
||
|
nothreads=not threads,
|
||
|
foreground=foreground,
|
||
|
)
|
||
|
if not foreground:
|
||
|
th = threading.Thread(target=func)
|
||
|
th.daemon = True
|
||
|
th.start()
|
||
|
return th
|
||
|
else: # pragma: no cover
|
||
|
try:
|
||
|
func()
|
||
|
except KeyboardInterrupt:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def main(args):
|
||
|
"""Mount filesystem from chained URL to MOUNT_POINT.
|
||
|
|
||
|
Examples:
|
||
|
|
||
|
python3 -m fsspec.fuse memory /usr/share /tmp/mem
|
||
|
|
||
|
python3 -m fsspec.fuse local /tmp/source /tmp/local \\
|
||
|
-l /tmp/fsspecfuse.log
|
||
|
|
||
|
You can also mount chained-URLs and use special settings:
|
||
|
|
||
|
python3 -m fsspec.fuse 'filecache::zip::file://data.zip' \\
|
||
|
/ /tmp/zip \\
|
||
|
-o 'filecache-cache_storage=/tmp/simplecache'
|
||
|
|
||
|
You can specify the type of the setting by using `[int]` or `[bool]`,
|
||
|
(`true`, `yes`, `1` represents the Boolean value `True`):
|
||
|
|
||
|
python3 -m fsspec.fuse 'simplecache::ftp://ftp1.at.proftpd.org' \\
|
||
|
/historic/packages/RPMS /tmp/ftp \\
|
||
|
-o 'simplecache-cache_storage=/tmp/simplecache' \\
|
||
|
-o 'simplecache-check_files=false[bool]' \\
|
||
|
-o 'ftp-listings_expiry_time=60[int]' \\
|
||
|
-o 'ftp-username=anonymous' \\
|
||
|
-o 'ftp-password=xieyanbo'
|
||
|
"""
|
||
|
|
||
|
class RawDescriptionArgumentParser(argparse.ArgumentParser):
|
||
|
def format_help(self):
|
||
|
usage = super().format_help()
|
||
|
parts = usage.split("\n\n")
|
||
|
parts[1] = self.description.rstrip()
|
||
|
return "\n\n".join(parts)
|
||
|
|
||
|
parser = RawDescriptionArgumentParser(prog="fsspec.fuse", description=main.__doc__)
|
||
|
parser.add_argument("--version", action="version", version=__version__)
|
||
|
parser.add_argument("url", type=str, help="fs url")
|
||
|
parser.add_argument("source_path", type=str, help="source directory in fs")
|
||
|
parser.add_argument("mount_point", type=str, help="local directory")
|
||
|
parser.add_argument(
|
||
|
"-o",
|
||
|
"--option",
|
||
|
action="append",
|
||
|
help="Any options of protocol included in the chained URL",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-l", "--log-file", type=str, help="Logging FUSE debug info (Default: '')"
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-f",
|
||
|
"--foreground",
|
||
|
action="store_false",
|
||
|
help="Running in foreground or not (Default: False)",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-t",
|
||
|
"--threads",
|
||
|
action="store_false",
|
||
|
help="Running with threads support (Default: False)",
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-r",
|
||
|
"--ready-file",
|
||
|
action="store_false",
|
||
|
help="The `.fuse_ready` file will exist after FUSE is ready. "
|
||
|
"(Debugging purpose, Default: False)",
|
||
|
)
|
||
|
args = parser.parse_args(args)
|
||
|
|
||
|
kwargs = {}
|
||
|
for item in args.option or []:
|
||
|
key, sep, value = item.partition("=")
|
||
|
if not sep:
|
||
|
parser.error(message=f"Wrong option: {item!r}")
|
||
|
val = value.lower()
|
||
|
if val.endswith("[int]"):
|
||
|
value = int(value[: -len("[int]")])
|
||
|
elif val.endswith("[bool]"):
|
||
|
value = val[: -len("[bool]")] in ["1", "yes", "true"]
|
||
|
|
||
|
if "-" in key:
|
||
|
fs_name, setting_name = key.split("-", 1)
|
||
|
if fs_name in kwargs:
|
||
|
kwargs[fs_name][setting_name] = value
|
||
|
else:
|
||
|
kwargs[fs_name] = {setting_name: value}
|
||
|
else:
|
||
|
kwargs[key] = value
|
||
|
|
||
|
if args.log_file:
|
||
|
logging.basicConfig(
|
||
|
level=logging.DEBUG,
|
||
|
filename=args.log_file,
|
||
|
format="%(asctime)s %(message)s",
|
||
|
)
|
||
|
|
||
|
class LoggingFUSEr(FUSEr, LoggingMixIn):
|
||
|
pass
|
||
|
|
||
|
fuser = LoggingFUSEr
|
||
|
else:
|
||
|
fuser = FUSEr
|
||
|
|
||
|
fs, url_path = url_to_fs(args.url, **kwargs)
|
||
|
logger.debug("Mounting %s to %s", url_path, str(args.mount_point))
|
||
|
run(
|
||
|
fs,
|
||
|
args.source_path,
|
||
|
args.mount_point,
|
||
|
foreground=args.foreground,
|
||
|
threads=args.threads,
|
||
|
ready_file=args.ready_file,
|
||
|
ops_class=fuser,
|
||
|
)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
import sys
|
||
|
|
||
|
main(sys.argv[1:])
|