You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
4.8 KiB

5 months ago
"""Helper functions for a standard streaming compression API"""
from zipfile import ZipFile
import fsspec.utils
from fsspec.spec import AbstractBufferedFile
def noop_file(file, mode, **kwargs):
return file
# TODO: files should also be available as contexts
# should be functions of the form func(infile, mode=, **kwargs) -> file-like
compr = {None: noop_file}
def register_compression(name, callback, extensions, force=False):
"""Register an "inferable" file compression type.
Registers transparent file compression type for use with fsspec.open.
Compression can be specified by name in open, or "infer"-ed for any files
ending with the given extensions.
Args:
name: (str) The compression type name. Eg. "gzip".
callback: A callable of form (infile, mode, **kwargs) -> file-like.
Accepts an input file-like object, the target mode and kwargs.
Returns a wrapped file-like object.
extensions: (str, Iterable[str]) A file extension, or list of file
extensions for which to infer this compression scheme. Eg. "gz".
force: (bool) Force re-registration of compression type or extensions.
Raises:
ValueError: If name or extensions already registered, and not force.
"""
if isinstance(extensions, str):
extensions = [extensions]
# Validate registration
if name in compr and not force:
raise ValueError(f"Duplicate compression registration: {name}")
for ext in extensions:
if ext in fsspec.utils.compressions and not force:
raise ValueError(f"Duplicate compression file extension: {ext} ({name})")
compr[name] = callback
for ext in extensions:
fsspec.utils.compressions[ext] = name
def unzip(infile, mode="rb", filename=None, **kwargs):
if "r" not in mode:
filename = filename or "file"
z = ZipFile(infile, mode="w", **kwargs)
fo = z.open(filename, mode="w")
fo.close = lambda closer=fo.close: closer() or z.close()
return fo
z = ZipFile(infile)
if filename is None:
filename = z.namelist()[0]
return z.open(filename, mode="r", **kwargs)
register_compression("zip", unzip, "zip")
try:
from bz2 import BZ2File
except ImportError:
pass
else:
register_compression("bz2", BZ2File, "bz2")
try: # pragma: no cover
from isal import igzip
def isal(infile, mode="rb", **kwargs):
return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
register_compression("gzip", isal, "gz")
except ImportError:
from gzip import GzipFile
register_compression(
"gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
)
try:
from lzma import LZMAFile
register_compression("lzma", LZMAFile, "lzma")
register_compression("xz", LZMAFile, "xz")
except ImportError:
pass
try:
import lzmaffi
register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True)
register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
except ImportError:
pass
class SnappyFile(AbstractBufferedFile):
def __init__(self, infile, mode, **kwargs):
import snappy
super().__init__(
fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
)
self.infile = infile
if "r" in mode:
self.codec = snappy.StreamDecompressor()
else:
self.codec = snappy.StreamCompressor()
def _upload_chunk(self, final=False):
self.buffer.seek(0)
out = self.codec.add_chunk(self.buffer.read())
self.infile.write(out)
return True
def seek(self, loc, whence=0):
raise NotImplementedError("SnappyFile is not seekable")
def seekable(self):
return False
def _fetch_range(self, start, end):
"""Get the specified set of bytes from remote"""
data = self.infile.read(end - start)
return self.codec.decompress(data)
try:
import snappy
snappy.compress(b"")
# Snappy may use the .sz file extension, but this is not part of the
# standard implementation.
register_compression("snappy", SnappyFile, [])
except (ImportError, NameError, AttributeError):
pass
try:
import lz4.frame
register_compression("lz4", lz4.frame.open, "lz4")
except ImportError:
pass
try:
import zstandard as zstd
def zstandard_file(infile, mode="rb"):
if "r" in mode:
cctx = zstd.ZstdDecompressor()
return cctx.stream_reader(infile)
else:
cctx = zstd.ZstdCompressor(level=10)
return cctx.stream_writer(infile)
register_compression("zstd", zstandard_file, "zst")
except ImportError:
pass
def available_compressions():
"""Return a list of the implemented compressions."""
return list(compr)