You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
176 lines
4.8 KiB
176 lines
4.8 KiB
5 months ago
|
"""Helper functions for a standard streaming compression API"""
|
||
|
|
||
|
from zipfile import ZipFile
|
||
|
|
||
|
import fsspec.utils
|
||
|
from fsspec.spec import AbstractBufferedFile
|
||
|
|
||
|
|
||
|
def noop_file(file, mode, **kwargs):
|
||
|
return file
|
||
|
|
||
|
|
||
|
# TODO: files should also be available as contexts
|
||
|
# should be functions of the form func(infile, mode=, **kwargs) -> file-like
|
||
|
compr = {None: noop_file}
|
||
|
|
||
|
|
||
|
def register_compression(name, callback, extensions, force=False):
|
||
|
"""Register an "inferable" file compression type.
|
||
|
|
||
|
Registers transparent file compression type for use with fsspec.open.
|
||
|
Compression can be specified by name in open, or "infer"-ed for any files
|
||
|
ending with the given extensions.
|
||
|
|
||
|
Args:
|
||
|
name: (str) The compression type name. Eg. "gzip".
|
||
|
callback: A callable of form (infile, mode, **kwargs) -> file-like.
|
||
|
Accepts an input file-like object, the target mode and kwargs.
|
||
|
Returns a wrapped file-like object.
|
||
|
extensions: (str, Iterable[str]) A file extension, or list of file
|
||
|
extensions for which to infer this compression scheme. Eg. "gz".
|
||
|
force: (bool) Force re-registration of compression type or extensions.
|
||
|
|
||
|
Raises:
|
||
|
ValueError: If name or extensions already registered, and not force.
|
||
|
|
||
|
"""
|
||
|
if isinstance(extensions, str):
|
||
|
extensions = [extensions]
|
||
|
|
||
|
# Validate registration
|
||
|
if name in compr and not force:
|
||
|
raise ValueError(f"Duplicate compression registration: {name}")
|
||
|
|
||
|
for ext in extensions:
|
||
|
if ext in fsspec.utils.compressions and not force:
|
||
|
raise ValueError(f"Duplicate compression file extension: {ext} ({name})")
|
||
|
|
||
|
compr[name] = callback
|
||
|
|
||
|
for ext in extensions:
|
||
|
fsspec.utils.compressions[ext] = name
|
||
|
|
||
|
|
||
|
def unzip(infile, mode="rb", filename=None, **kwargs):
|
||
|
if "r" not in mode:
|
||
|
filename = filename or "file"
|
||
|
z = ZipFile(infile, mode="w", **kwargs)
|
||
|
fo = z.open(filename, mode="w")
|
||
|
fo.close = lambda closer=fo.close: closer() or z.close()
|
||
|
return fo
|
||
|
z = ZipFile(infile)
|
||
|
if filename is None:
|
||
|
filename = z.namelist()[0]
|
||
|
return z.open(filename, mode="r", **kwargs)
|
||
|
|
||
|
|
||
|
register_compression("zip", unzip, "zip")
|
||
|
|
||
|
try:
|
||
|
from bz2 import BZ2File
|
||
|
except ImportError:
|
||
|
pass
|
||
|
else:
|
||
|
register_compression("bz2", BZ2File, "bz2")
|
||
|
|
||
|
try: # pragma: no cover
|
||
|
from isal import igzip
|
||
|
|
||
|
def isal(infile, mode="rb", **kwargs):
|
||
|
return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
|
||
|
|
||
|
register_compression("gzip", isal, "gz")
|
||
|
except ImportError:
|
||
|
from gzip import GzipFile
|
||
|
|
||
|
register_compression(
|
||
|
"gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
from lzma import LZMAFile
|
||
|
|
||
|
register_compression("lzma", LZMAFile, "lzma")
|
||
|
register_compression("xz", LZMAFile, "xz")
|
||
|
except ImportError:
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
import lzmaffi
|
||
|
|
||
|
register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True)
|
||
|
register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
|
||
|
except ImportError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class SnappyFile(AbstractBufferedFile):
|
||
|
def __init__(self, infile, mode, **kwargs):
|
||
|
import snappy
|
||
|
|
||
|
super().__init__(
|
||
|
fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
|
||
|
)
|
||
|
self.infile = infile
|
||
|
if "r" in mode:
|
||
|
self.codec = snappy.StreamDecompressor()
|
||
|
else:
|
||
|
self.codec = snappy.StreamCompressor()
|
||
|
|
||
|
def _upload_chunk(self, final=False):
|
||
|
self.buffer.seek(0)
|
||
|
out = self.codec.add_chunk(self.buffer.read())
|
||
|
self.infile.write(out)
|
||
|
return True
|
||
|
|
||
|
def seek(self, loc, whence=0):
|
||
|
raise NotImplementedError("SnappyFile is not seekable")
|
||
|
|
||
|
def seekable(self):
|
||
|
return False
|
||
|
|
||
|
def _fetch_range(self, start, end):
|
||
|
"""Get the specified set of bytes from remote"""
|
||
|
data = self.infile.read(end - start)
|
||
|
return self.codec.decompress(data)
|
||
|
|
||
|
|
||
|
try:
|
||
|
import snappy
|
||
|
|
||
|
snappy.compress(b"")
|
||
|
# Snappy may use the .sz file extension, but this is not part of the
|
||
|
# standard implementation.
|
||
|
register_compression("snappy", SnappyFile, [])
|
||
|
|
||
|
except (ImportError, NameError, AttributeError):
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
import lz4.frame
|
||
|
|
||
|
register_compression("lz4", lz4.frame.open, "lz4")
|
||
|
except ImportError:
|
||
|
pass
|
||
|
|
||
|
try:
|
||
|
import zstandard as zstd
|
||
|
|
||
|
def zstandard_file(infile, mode="rb"):
|
||
|
if "r" in mode:
|
||
|
cctx = zstd.ZstdDecompressor()
|
||
|
return cctx.stream_reader(infile)
|
||
|
else:
|
||
|
cctx = zstd.ZstdCompressor(level=10)
|
||
|
return cctx.stream_writer(infile)
|
||
|
|
||
|
register_compression("zstd", zstandard_file, "zst")
|
||
|
except ImportError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def available_compressions():
|
||
|
"""Return a list of the implemented compressions."""
|
||
|
return list(compr)
|