You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1425 lines
49 KiB

6 months ago
""" test parquet compat """
import datetime
from decimal import Decimal
from io import BytesIO
import os
import pathlib
import numpy as np
import pytest
from pandas._config import using_copy_on_write
from pandas._config.config import _get_option
from pandas.compat import is_platform_windows
from pandas.compat.pyarrow import (
pa_version_under11p0,
pa_version_under13p0,
pa_version_under15p0,
)
import pandas as pd
import pandas._testing as tm
from pandas.util.version import Version
from pandas.io.parquet import (
FastParquetImpl,
PyArrowImpl,
get_engine,
read_parquet,
to_parquet,
)
try:
import pyarrow
_HAVE_PYARROW = True
except ImportError:
_HAVE_PYARROW = False
try:
import fastparquet
_HAVE_FASTPARQUET = True
except ImportError:
_HAVE_FASTPARQUET = False
# TODO(ArrayManager) fastparquet relies on BlockManager internals
pytestmark = [
pytest.mark.filterwarnings("ignore:DataFrame._data is deprecated:FutureWarning"),
pytest.mark.filterwarnings(
"ignore:Passing a BlockManager to DataFrame:DeprecationWarning"
),
]
# setup engines & skips
@pytest.fixture(
params=[
pytest.param(
"fastparquet",
marks=pytest.mark.skipif(
not _HAVE_FASTPARQUET
or _get_option("mode.data_manager", silent=True) == "array",
reason="fastparquet is not installed or ArrayManager is used",
),
),
pytest.param(
"pyarrow",
marks=pytest.mark.skipif(
not _HAVE_PYARROW, reason="pyarrow is not installed"
),
),
]
)
def engine(request):
return request.param
@pytest.fixture
def pa():
if not _HAVE_PYARROW:
pytest.skip("pyarrow is not installed")
return "pyarrow"
@pytest.fixture
def fp():
if not _HAVE_FASTPARQUET:
pytest.skip("fastparquet is not installed")
elif _get_option("mode.data_manager", silent=True) == "array":
pytest.skip("ArrayManager is not supported with fastparquet")
return "fastparquet"
@pytest.fixture
def df_compat():
return pd.DataFrame({"A": [1, 2, 3], "B": "foo"})
@pytest.fixture
def df_cross_compat():
df = pd.DataFrame(
{
"a": list("abc"),
"b": list(range(1, 4)),
# 'c': np.arange(3, 6).astype('u1'),
"d": np.arange(4.0, 7.0, dtype="float64"),
"e": [True, False, True],
"f": pd.date_range("20130101", periods=3),
# 'g': pd.date_range('20130101', periods=3,
# tz='US/Eastern'),
# 'h': pd.date_range('20130101', periods=3, freq='ns')
}
)
return df
@pytest.fixture
def df_full():
return pd.DataFrame(
{
"string": list("abc"),
"string_with_nan": ["a", np.nan, "c"],
"string_with_none": ["a", None, "c"],
"bytes": [b"foo", b"bar", b"baz"],
"unicode": ["foo", "bar", "baz"],
"int": list(range(1, 4)),
"uint": np.arange(3, 6).astype("u1"),
"float": np.arange(4.0, 7.0, dtype="float64"),
"float_with_nan": [2.0, np.nan, 3.0],
"bool": [True, False, True],
"datetime": pd.date_range("20130101", periods=3),
"datetime_with_nat": [
pd.Timestamp("20130101"),
pd.NaT,
pd.Timestamp("20130103"),
],
}
)
@pytest.fixture(
params=[
datetime.datetime.now(datetime.timezone.utc),
datetime.datetime.now(datetime.timezone.min),
datetime.datetime.now(datetime.timezone.max),
datetime.datetime.strptime("2019-01-04T16:41:24+0200", "%Y-%m-%dT%H:%M:%S%z"),
datetime.datetime.strptime("2019-01-04T16:41:24+0215", "%Y-%m-%dT%H:%M:%S%z"),
datetime.datetime.strptime("2019-01-04T16:41:24-0200", "%Y-%m-%dT%H:%M:%S%z"),
datetime.datetime.strptime("2019-01-04T16:41:24-0215", "%Y-%m-%dT%H:%M:%S%z"),
]
)
def timezone_aware_date_list(request):
return request.param
def check_round_trip(
df,
engine=None,
path=None,
write_kwargs=None,
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
check_dtype=True,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Performs a pandas to disk and disk to pandas round trip,
then compares the 2 resulting DataFrames to verify equality.
Parameters
----------
df: Dataframe
engine: str, optional
'pyarrow' or 'fastparquet'
path: str, optional
write_kwargs: dict of str:str, optional
read_kwargs: dict of str:str, optional
expected: DataFrame, optional
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
write_kwargs = write_kwargs or {"compression": None}
read_kwargs = read_kwargs or {}
if expected is None:
expected = df
if engine:
write_kwargs["engine"] = engine
read_kwargs["engine"] = engine
def compare(repeat):
for _ in range(repeat):
df.to_parquet(path, **write_kwargs)
actual = read_parquet(path, **read_kwargs)
if "string_with_nan" in expected:
expected.loc[1, "string_with_nan"] = None
tm.assert_frame_equal(
expected,
actual,
check_names=check_names,
check_like=check_like,
check_dtype=check_dtype,
)
if path is None:
with tm.ensure_clean() as path:
compare(repeat)
else:
compare(repeat)
def check_partition_names(path, expected):
"""Check partitions of a parquet file are as expected.
Parameters
----------
path: str
Path of the dataset.
expected: iterable of str
Expected partition names.
"""
import pyarrow.dataset as ds
dataset = ds.dataset(path, partitioning="hive")
assert dataset.partitioning.schema.names == expected
def test_invalid_engine(df_compat):
msg = "engine must be one of 'pyarrow', 'fastparquet'"
with pytest.raises(ValueError, match=msg):
check_round_trip(df_compat, "foo", "bar")
def test_options_py(df_compat, pa):
# use the set option
with pd.option_context("io.parquet.engine", "pyarrow"):
check_round_trip(df_compat)
def test_options_fp(df_compat, fp):
# use the set option
with pd.option_context("io.parquet.engine", "fastparquet"):
check_round_trip(df_compat)
def test_options_auto(df_compat, fp, pa):
# use the set option
with pd.option_context("io.parquet.engine", "auto"):
check_round_trip(df_compat)
def test_options_get_engine(fp, pa):
assert isinstance(get_engine("pyarrow"), PyArrowImpl)
assert isinstance(get_engine("fastparquet"), FastParquetImpl)
with pd.option_context("io.parquet.engine", "pyarrow"):
assert isinstance(get_engine("auto"), PyArrowImpl)
assert isinstance(get_engine("pyarrow"), PyArrowImpl)
assert isinstance(get_engine("fastparquet"), FastParquetImpl)
with pd.option_context("io.parquet.engine", "fastparquet"):
assert isinstance(get_engine("auto"), FastParquetImpl)
assert isinstance(get_engine("pyarrow"), PyArrowImpl)
assert isinstance(get_engine("fastparquet"), FastParquetImpl)
with pd.option_context("io.parquet.engine", "auto"):
assert isinstance(get_engine("auto"), PyArrowImpl)
assert isinstance(get_engine("pyarrow"), PyArrowImpl)
assert isinstance(get_engine("fastparquet"), FastParquetImpl)
def test_get_engine_auto_error_message():
# Expect different error messages from get_engine(engine="auto")
# if engines aren't installed vs. are installed but bad version
from pandas.compat._optional import VERSIONS
# Do we have engines installed, but a bad version of them?
pa_min_ver = VERSIONS.get("pyarrow")
fp_min_ver = VERSIONS.get("fastparquet")
have_pa_bad_version = (
False
if not _HAVE_PYARROW
else Version(pyarrow.__version__) < Version(pa_min_ver)
)
have_fp_bad_version = (
False
if not _HAVE_FASTPARQUET
else Version(fastparquet.__version__) < Version(fp_min_ver)
)
# Do we have usable engines installed?
have_usable_pa = _HAVE_PYARROW and not have_pa_bad_version
have_usable_fp = _HAVE_FASTPARQUET and not have_fp_bad_version
if not have_usable_pa and not have_usable_fp:
# No usable engines found.
if have_pa_bad_version:
match = f"Pandas requires version .{pa_min_ver}. or newer of .pyarrow."
with pytest.raises(ImportError, match=match):
get_engine("auto")
else:
match = "Missing optional dependency .pyarrow."
with pytest.raises(ImportError, match=match):
get_engine("auto")
if have_fp_bad_version:
match = f"Pandas requires version .{fp_min_ver}. or newer of .fastparquet."
with pytest.raises(ImportError, match=match):
get_engine("auto")
else:
match = "Missing optional dependency .fastparquet."
with pytest.raises(ImportError, match=match):
get_engine("auto")
def test_cross_engine_pa_fp(df_cross_compat, pa, fp):
# cross-compat with differing reading/writing engines
df = df_cross_compat
with tm.ensure_clean() as path:
df.to_parquet(path, engine=pa, compression=None)
result = read_parquet(path, engine=fp)
tm.assert_frame_equal(result, df)
result = read_parquet(path, engine=fp, columns=["a", "d"])
tm.assert_frame_equal(result, df[["a", "d"]])
def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
# cross-compat with differing reading/writing engines
df = df_cross_compat
with tm.ensure_clean() as path:
df.to_parquet(path, engine=fp, compression=None)
result = read_parquet(path, engine=pa)
tm.assert_frame_equal(result, df)
result = read_parquet(path, engine=pa, columns=["a", "d"])
tm.assert_frame_equal(result, df[["a", "d"]])
def test_parquet_pos_args_deprecation(engine):
# GH-54229
df = pd.DataFrame({"a": [1, 2, 3]})
msg = (
r"Starting with pandas version 3.0 all arguments of to_parquet except for the "
r"argument 'path' will be keyword-only."
)
with tm.ensure_clean() as path:
with tm.assert_produces_warning(
FutureWarning,
match=msg,
check_stacklevel=False,
raise_on_extra_warnings=False,
):
df.to_parquet(path, engine)
class Base:
def check_error_on_write(self, df, engine, exc, err_msg):
# check that we are raising the exception on writing
with tm.ensure_clean() as path:
with pytest.raises(exc, match=err_msg):
to_parquet(df, path, engine, compression=None)
def check_external_error_on_write(self, df, engine, exc):
# check that an external library is raising the exception on writing
with tm.ensure_clean() as path:
with tm.external_error_raised(exc):
to_parquet(df, path, engine, compression=None)
@pytest.mark.network
@pytest.mark.single_cpu
def test_parquet_read_from_url(self, httpserver, datapath, df_compat, engine):
if engine != "auto":
pytest.importorskip(engine)
with open(datapath("io", "data", "parquet", "simple.parquet"), mode="rb") as f:
httpserver.serve_content(content=f.read())
df = read_parquet(httpserver.url)
tm.assert_frame_equal(df, df_compat)
class TestBasic(Base):
def test_error(self, engine):
for obj in [
pd.Series([1, 2, 3]),
1,
"foo",
pd.Timestamp("20130101"),
np.array([1, 2, 3]),
]:
msg = "to_parquet only supports IO with DataFrames"
self.check_error_on_write(obj, engine, ValueError, msg)
def test_columns_dtypes(self, engine):
df = pd.DataFrame({"string": list("abc"), "int": list(range(1, 4))})
# unicode
df.columns = ["foo", "bar"]
check_round_trip(df, engine)
@pytest.mark.parametrize("compression", [None, "gzip", "snappy", "brotli"])
def test_compression(self, engine, compression):
df = pd.DataFrame({"A": [1, 2, 3]})
check_round_trip(df, engine, write_kwargs={"compression": compression})
def test_read_columns(self, engine):
# GH18154
df = pd.DataFrame({"string": list("abc"), "int": list(range(1, 4))})
expected = pd.DataFrame({"string": list("abc")})
check_round_trip(
df, engine, expected=expected, read_kwargs={"columns": ["string"]}
)
def test_read_filters(self, engine, tmp_path):
df = pd.DataFrame(
{
"int": list(range(4)),
"part": list("aabb"),
}
)
expected = pd.DataFrame({"int": [0, 1]})
check_round_trip(
df,
engine,
path=tmp_path,
expected=expected,
write_kwargs={"partition_cols": ["part"]},
read_kwargs={"filters": [("part", "==", "a")], "columns": ["int"]},
repeat=1,
)
def test_write_index(self, engine, using_copy_on_write, request):
check_names = engine != "fastparquet"
if using_copy_on_write and engine == "fastparquet":
request.applymarker(
pytest.mark.xfail(reason="fastparquet write into index")
)
df = pd.DataFrame({"A": [1, 2, 3]})
check_round_trip(df, engine)
indexes = [
[2, 3, 4],
pd.date_range("20130101", periods=3),
list("abc"),
[1, 3, 4],
]
# non-default index
for index in indexes:
df.index = index
if isinstance(index, pd.DatetimeIndex):
df.index = df.index._with_freq(None) # freq doesn't round-trip
check_round_trip(df, engine, check_names=check_names)
# index with meta-data
df.index = [0, 1, 2]
df.index.name = "foo"
check_round_trip(df, engine)
def test_write_multiindex(self, pa):
# Not supported in fastparquet as of 0.1.3 or older pyarrow version
engine = pa
df = pd.DataFrame({"A": [1, 2, 3]})
index = pd.MultiIndex.from_tuples([("a", 1), ("a", 2), ("b", 1)])
df.index = index
check_round_trip(df, engine)
def test_multiindex_with_columns(self, pa):
engine = pa
dates = pd.date_range("01-Jan-2018", "01-Dec-2018", freq="MS")
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((2 * len(dates), 3)),
columns=list("ABC"),
)
index1 = pd.MultiIndex.from_product(
[["Level1", "Level2"], dates], names=["level", "date"]
)
index2 = index1.copy(names=None)
for index in [index1, index2]:
df.index = index
check_round_trip(df, engine)
check_round_trip(
df, engine, read_kwargs={"columns": ["A", "B"]}, expected=df[["A", "B"]]
)
def test_write_ignoring_index(self, engine):
# ENH 20768
# Ensure index=False omits the index from the written Parquet file.
df = pd.DataFrame({"a": [1, 2, 3], "b": ["q", "r", "s"]})
write_kwargs = {"compression": None, "index": False}
# Because we're dropping the index, we expect the loaded dataframe to
# have the default integer index.
expected = df.reset_index(drop=True)
check_round_trip(df, engine, write_kwargs=write_kwargs, expected=expected)
# Ignore custom index
df = pd.DataFrame(
{"a": [1, 2, 3], "b": ["q", "r", "s"]}, index=["zyx", "wvu", "tsr"]
)
check_round_trip(df, engine, write_kwargs=write_kwargs, expected=expected)
# Ignore multi-indexes as well.
arrays = [
["bar", "bar", "baz", "baz", "foo", "foo", "qux", "qux"],
["one", "two", "one", "two", "one", "two", "one", "two"],
]
df = pd.DataFrame(
{"one": list(range(8)), "two": [-i for i in range(8)]}, index=arrays
)
expected = df.reset_index(drop=True)
check_round_trip(df, engine, write_kwargs=write_kwargs, expected=expected)
def test_write_column_multiindex(self, engine):
# Not able to write column multi-indexes with non-string column names.
mi_columns = pd.MultiIndex.from_tuples([("a", 1), ("a", 2), ("b", 1)])
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((4, 3)), columns=mi_columns
)
if engine == "fastparquet":
self.check_error_on_write(
df, engine, TypeError, "Column name must be a string"
)
elif engine == "pyarrow":
check_round_trip(df, engine)
def test_write_column_multiindex_nonstring(self, engine):
# GH #34777
# Not able to write column multi-indexes with non-string column names
arrays = [
["bar", "bar", "baz", "baz", "foo", "foo", "qux", "qux"],
[1, 2, 1, 2, 1, 2, 1, 2],
]
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((8, 8)), columns=arrays
)
df.columns.names = ["Level1", "Level2"]
if engine == "fastparquet":
self.check_error_on_write(df, engine, ValueError, "Column name")
elif engine == "pyarrow":
check_round_trip(df, engine)
def test_write_column_multiindex_string(self, pa):
# GH #34777
# Not supported in fastparquet as of 0.1.3
engine = pa
# Write column multi-indexes with string column names
arrays = [
["bar", "bar", "baz", "baz", "foo", "foo", "qux", "qux"],
["one", "two", "one", "two", "one", "two", "one", "two"],
]
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((8, 8)), columns=arrays
)
df.columns.names = ["ColLevel1", "ColLevel2"]
check_round_trip(df, engine)
def test_write_column_index_string(self, pa):
# GH #34777
# Not supported in fastparquet as of 0.1.3
engine = pa
# Write column indexes with string column names
arrays = ["bar", "baz", "foo", "qux"]
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((8, 4)), columns=arrays
)
df.columns.name = "StringCol"
check_round_trip(df, engine)
def test_write_column_index_nonstring(self, engine):
# GH #34777
# Write column indexes with string column names
arrays = [1, 2, 3, 4]
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((8, 4)), columns=arrays
)
df.columns.name = "NonStringCol"
if engine == "fastparquet":
self.check_error_on_write(
df, engine, TypeError, "Column name must be a string"
)
else:
check_round_trip(df, engine)
def test_dtype_backend(self, engine, request):
pq = pytest.importorskip("pyarrow.parquet")
if engine == "fastparquet":
# We are manually disabling fastparquet's
# nullable dtype support pending discussion
mark = pytest.mark.xfail(
reason="Fastparquet nullable dtype support is disabled"
)
request.applymarker(mark)
table = pyarrow.table(
{
"a": pyarrow.array([1, 2, 3, None], "int64"),
"b": pyarrow.array([1, 2, 3, None], "uint8"),
"c": pyarrow.array(["a", "b", "c", None]),
"d": pyarrow.array([True, False, True, None]),
# Test that nullable dtypes used even in absence of nulls
"e": pyarrow.array([1, 2, 3, 4], "int64"),
# GH 45694
"f": pyarrow.array([1.0, 2.0, 3.0, None], "float32"),
"g": pyarrow.array([1.0, 2.0, 3.0, None], "float64"),
}
)
with tm.ensure_clean() as path:
# write manually with pyarrow to write integers
pq.write_table(table, path)
result1 = read_parquet(path, engine=engine)
result2 = read_parquet(path, engine=engine, dtype_backend="numpy_nullable")
assert result1["a"].dtype == np.dtype("float64")
expected = pd.DataFrame(
{
"a": pd.array([1, 2, 3, None], dtype="Int64"),
"b": pd.array([1, 2, 3, None], dtype="UInt8"),
"c": pd.array(["a", "b", "c", None], dtype="string"),
"d": pd.array([True, False, True, None], dtype="boolean"),
"e": pd.array([1, 2, 3, 4], dtype="Int64"),
"f": pd.array([1.0, 2.0, 3.0, None], dtype="Float32"),
"g": pd.array([1.0, 2.0, 3.0, None], dtype="Float64"),
}
)
if engine == "fastparquet":
# Fastparquet doesn't support string columns yet
# Only int and boolean
result2 = result2.drop("c", axis=1)
expected = expected.drop("c", axis=1)
tm.assert_frame_equal(result2, expected)
@pytest.mark.parametrize(
"dtype",
[
"Int64",
"UInt8",
"boolean",
"object",
"datetime64[ns, UTC]",
"float",
"period[D]",
"Float64",
"string",
],
)
def test_read_empty_array(self, pa, dtype):
# GH #41241
df = pd.DataFrame(
{
"value": pd.array([], dtype=dtype),
}
)
# GH 45694
expected = None
if dtype == "float":
expected = pd.DataFrame(
{
"value": pd.array([], dtype="Float64"),
}
)
check_round_trip(
df, pa, read_kwargs={"dtype_backend": "numpy_nullable"}, expected=expected
)
class TestParquetPyArrow(Base):
def test_basic(self, pa, df_full):
df = df_full
# additional supported types for pyarrow
dti = pd.date_range("20130101", periods=3, tz="Europe/Brussels")
dti = dti._with_freq(None) # freq doesn't round-trip
df["datetime_tz"] = dti
df["bool_with_none"] = [True, None, True]
check_round_trip(df, pa)
def test_basic_subset_columns(self, pa, df_full):
# GH18628
df = df_full
# additional supported types for pyarrow
df["datetime_tz"] = pd.date_range("20130101", periods=3, tz="Europe/Brussels")
check_round_trip(
df,
pa,
expected=df[["string", "int"]],
read_kwargs={"columns": ["string", "int"]},
)
def test_to_bytes_without_path_or_buf_provided(self, pa, df_full):
# GH 37105
buf_bytes = df_full.to_parquet(engine=pa)
assert isinstance(buf_bytes, bytes)
buf_stream = BytesIO(buf_bytes)
res = read_parquet(buf_stream)
expected = df_full.copy()
expected.loc[1, "string_with_nan"] = None
tm.assert_frame_equal(res, expected)
def test_duplicate_columns(self, pa):
# not currently able to handle duplicate columns
df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list("aaa")).copy()
self.check_error_on_write(df, pa, ValueError, "Duplicate column names found")
def test_timedelta(self, pa):
df = pd.DataFrame({"a": pd.timedelta_range("1 day", periods=3)})
check_round_trip(df, pa)
def test_unsupported(self, pa):
# mixed python objects
df = pd.DataFrame({"a": ["a", 1, 2.0]})
# pyarrow 0.11 raises ArrowTypeError
# older pyarrows raise ArrowInvalid
self.check_external_error_on_write(df, pa, pyarrow.ArrowException)
def test_unsupported_float16(self, pa):
# #44847, #44914
# Not able to write float 16 column using pyarrow.
data = np.arange(2, 10, dtype=np.float16)
df = pd.DataFrame(data=data, columns=["fp16"])
if pa_version_under15p0:
self.check_external_error_on_write(df, pa, pyarrow.ArrowException)
else:
check_round_trip(df, pa)
@pytest.mark.xfail(
is_platform_windows(),
reason=(
"PyArrow does not cleanup of partial files dumps when unsupported "
"dtypes are passed to_parquet function in windows"
),
)
@pytest.mark.skipif(not pa_version_under15p0, reason="float16 works on 15")
@pytest.mark.parametrize("path_type", [str, pathlib.Path])
def test_unsupported_float16_cleanup(self, pa, path_type):
# #44847, #44914
# Not able to write float 16 column using pyarrow.
# Tests cleanup by pyarrow in case of an error
data = np.arange(2, 10, dtype=np.float16)
df = pd.DataFrame(data=data, columns=["fp16"])
with tm.ensure_clean() as path_str:
path = path_type(path_str)
with tm.external_error_raised(pyarrow.ArrowException):
df.to_parquet(path=path, engine=pa)
assert not os.path.isfile(path)
def test_categorical(self, pa):
# supported in >= 0.7.0
df = pd.DataFrame()
df["a"] = pd.Categorical(list("abcdef"))
# test for null, out-of-order values, and unobserved category
df["b"] = pd.Categorical(
["bar", "foo", "foo", "bar", None, "bar"],
dtype=pd.CategoricalDtype(["foo", "bar", "baz"]),
)
# test for ordered flag
df["c"] = pd.Categorical(
["a", "b", "c", "a", "c", "b"], categories=["b", "c", "d"], ordered=True
)
check_round_trip(df, pa)
@pytest.mark.single_cpu
def test_s3_roundtrip_explicit_fs(self, df_compat, s3_public_bucket, pa, s3so):
s3fs = pytest.importorskip("s3fs")
s3 = s3fs.S3FileSystem(**s3so)
kw = {"filesystem": s3}
check_round_trip(
df_compat,
pa,
path=f"{s3_public_bucket.name}/pyarrow.parquet",
read_kwargs=kw,
write_kwargs=kw,
)
@pytest.mark.single_cpu
def test_s3_roundtrip(self, df_compat, s3_public_bucket, pa, s3so):
# GH #19134
s3so = {"storage_options": s3so}
check_round_trip(
df_compat,
pa,
path=f"s3://{s3_public_bucket.name}/pyarrow.parquet",
read_kwargs=s3so,
write_kwargs=s3so,
)
@pytest.mark.single_cpu
@pytest.mark.parametrize(
"partition_col",
[
["A"],
[],
],
)
def test_s3_roundtrip_for_dir(
self, df_compat, s3_public_bucket, pa, partition_col, s3so
):
pytest.importorskip("s3fs")
# GH #26388
expected_df = df_compat.copy()
# GH #35791
if partition_col:
expected_df = expected_df.astype(dict.fromkeys(partition_col, np.int32))
partition_col_type = "category"
expected_df[partition_col] = expected_df[partition_col].astype(
partition_col_type
)
check_round_trip(
df_compat,
pa,
expected=expected_df,
path=f"s3://{s3_public_bucket.name}/parquet_dir",
read_kwargs={"storage_options": s3so},
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"storage_options": s3so,
},
check_like=True,
repeat=1,
)
def test_read_file_like_obj_support(self, df_compat):
pytest.importorskip("pyarrow")
buffer = BytesIO()
df_compat.to_parquet(buffer)
df_from_buf = read_parquet(buffer)
tm.assert_frame_equal(df_compat, df_from_buf)
def test_expand_user(self, df_compat, monkeypatch):
pytest.importorskip("pyarrow")
monkeypatch.setenv("HOME", "TestingUser")
monkeypatch.setenv("USERPROFILE", "TestingUser")
with pytest.raises(OSError, match=r".*TestingUser.*"):
read_parquet("~/file.parquet")
with pytest.raises(OSError, match=r".*TestingUser.*"):
df_compat.to_parquet("~/file.parquet")
def test_partition_cols_supported(self, tmp_path, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
df = df_full
df.to_parquet(tmp_path, partition_cols=partition_cols, compression=None)
check_partition_names(tmp_path, partition_cols)
assert read_parquet(tmp_path).shape == df.shape
def test_partition_cols_string(self, tmp_path, pa, df_full):
# GH #27117
partition_cols = "bool"
partition_cols_list = [partition_cols]
df = df_full
df.to_parquet(tmp_path, partition_cols=partition_cols, compression=None)
check_partition_names(tmp_path, partition_cols_list)
assert read_parquet(tmp_path).shape == df.shape
@pytest.mark.parametrize(
"path_type", [str, lambda x: x], ids=["string", "pathlib.Path"]
)
def test_partition_cols_pathlib(self, tmp_path, pa, df_compat, path_type):
# GH 35902
partition_cols = "B"
partition_cols_list = [partition_cols]
df = df_compat
path = path_type(tmp_path)
df.to_parquet(path, partition_cols=partition_cols_list)
assert read_parquet(path).shape == df.shape
def test_empty_dataframe(self, pa):
# GH #27339
df = pd.DataFrame(index=[], columns=[])
check_round_trip(df, pa)
def test_write_with_schema(self, pa):
import pyarrow
df = pd.DataFrame({"x": [0, 1]})
schema = pyarrow.schema([pyarrow.field("x", type=pyarrow.bool_())])
out_df = df.astype(bool)
check_round_trip(df, pa, write_kwargs={"schema": schema}, expected=out_df)
def test_additional_extension_arrays(self, pa):
# test additional ExtensionArrays that are supported through the
# __arrow_array__ protocol
pytest.importorskip("pyarrow")
df = pd.DataFrame(
{
"a": pd.Series([1, 2, 3], dtype="Int64"),
"b": pd.Series([1, 2, 3], dtype="UInt32"),
"c": pd.Series(["a", None, "c"], dtype="string"),
}
)
check_round_trip(df, pa)
df = pd.DataFrame({"a": pd.Series([1, 2, 3, None], dtype="Int64")})
check_round_trip(df, pa)
def test_pyarrow_backed_string_array(self, pa, string_storage):
# test ArrowStringArray supported through the __arrow_array__ protocol
pytest.importorskip("pyarrow")
df = pd.DataFrame({"a": pd.Series(["a", None, "c"], dtype="string[pyarrow]")})
with pd.option_context("string_storage", string_storage):
check_round_trip(df, pa, expected=df.astype(f"string[{string_storage}]"))
def test_additional_extension_types(self, pa):
# test additional ExtensionArrays that are supported through the
# __arrow_array__ protocol + by defining a custom ExtensionType
pytest.importorskip("pyarrow")
df = pd.DataFrame(
{
"c": pd.IntervalIndex.from_tuples([(0, 1), (1, 2), (3, 4)]),
"d": pd.period_range("2012-01-01", periods=3, freq="D"),
# GH-45881 issue with interval with datetime64[ns] subtype
"e": pd.IntervalIndex.from_breaks(
pd.date_range("2012-01-01", periods=4, freq="D")
),
}
)
check_round_trip(df, pa)
def test_timestamp_nanoseconds(self, pa):
# with version 2.6, pyarrow defaults to writing the nanoseconds, so
# this should work without error
# Note in previous pyarrows(<7.0.0), only the pseudo-version 2.0 was available
ver = "2.6"
df = pd.DataFrame({"a": pd.date_range("2017-01-01", freq="1ns", periods=10)})
check_round_trip(df, pa, write_kwargs={"version": ver})
def test_timezone_aware_index(self, request, pa, timezone_aware_date_list):
if timezone_aware_date_list.tzinfo != datetime.timezone.utc:
request.applymarker(
pytest.mark.xfail(
reason="temporary skip this test until it is properly resolved: "
"https://github.com/pandas-dev/pandas/issues/37286"
)
)
idx = 5 * [timezone_aware_date_list]
df = pd.DataFrame(index=idx, data={"index_as_col": idx})
# see gh-36004
# compare time(zone) values only, skip their class:
# pyarrow always creates fixed offset timezones using pytz.FixedOffset()
# even if it was datetime.timezone() originally
#
# technically they are the same:
# they both implement datetime.tzinfo
# they both wrap datetime.timedelta()
# this use-case sets the resolution to 1 minute
check_round_trip(df, pa, check_dtype=False)
def test_filter_row_groups(self, pa):
# https://github.com/pandas-dev/pandas/issues/26551
pytest.importorskip("pyarrow")
df = pd.DataFrame({"a": list(range(3))})
with tm.ensure_clean() as path:
df.to_parquet(path, engine=pa)
result = read_parquet(path, pa, filters=[("a", "==", 0)])
assert len(result) == 1
def test_read_parquet_manager(self, pa, using_array_manager):
# ensure that read_parquet honors the pandas.options.mode.data_manager option
df = pd.DataFrame(
np.random.default_rng(2).standard_normal((10, 3)), columns=["A", "B", "C"]
)
with tm.ensure_clean() as path:
df.to_parquet(path, engine=pa)
result = read_parquet(path, pa)
if using_array_manager:
assert isinstance(result._mgr, pd.core.internals.ArrayManager)
else:
assert isinstance(result._mgr, pd.core.internals.BlockManager)
def test_read_dtype_backend_pyarrow_config(self, pa, df_full):
import pyarrow
df = df_full
# additional supported types for pyarrow
dti = pd.date_range("20130101", periods=3, tz="Europe/Brussels")
dti = dti._with_freq(None) # freq doesn't round-trip
df["datetime_tz"] = dti
df["bool_with_none"] = [True, None, True]
pa_table = pyarrow.Table.from_pandas(df)
expected = pa_table.to_pandas(types_mapper=pd.ArrowDtype)
if pa_version_under13p0:
# pyarrow infers datetimes as us instead of ns
expected["datetime"] = expected["datetime"].astype("timestamp[us][pyarrow]")
expected["datetime_with_nat"] = expected["datetime_with_nat"].astype(
"timestamp[us][pyarrow]"
)
expected["datetime_tz"] = expected["datetime_tz"].astype(
pd.ArrowDtype(pyarrow.timestamp(unit="us", tz="Europe/Brussels"))
)
check_round_trip(
df,
engine=pa,
read_kwargs={"dtype_backend": "pyarrow"},
expected=expected,
)
def test_read_dtype_backend_pyarrow_config_index(self, pa):
df = pd.DataFrame(
{"a": [1, 2]}, index=pd.Index([3, 4], name="test"), dtype="int64[pyarrow]"
)
expected = df.copy()
import pyarrow
if Version(pyarrow.__version__) > Version("11.0.0"):
expected.index = expected.index.astype("int64[pyarrow]")
check_round_trip(
df,
engine=pa,
read_kwargs={"dtype_backend": "pyarrow"},
expected=expected,
)
def test_columns_dtypes_not_invalid(self, pa):
df = pd.DataFrame({"string": list("abc"), "int": list(range(1, 4))})
# numeric
df.columns = [0, 1]
check_round_trip(df, pa)
# bytes
df.columns = [b"foo", b"bar"]
with pytest.raises(NotImplementedError, match="|S3"):
# Bytes fails on read_parquet
check_round_trip(df, pa)
# python object
df.columns = [
datetime.datetime(2011, 1, 1, 0, 0),
datetime.datetime(2011, 1, 1, 1, 1),
]
check_round_trip(df, pa)
def test_empty_columns(self, pa):
# GH 52034
df = pd.DataFrame(index=pd.Index(["a", "b", "c"], name="custom name"))
check_round_trip(df, pa)
def test_df_attrs_persistence(self, tmp_path, pa):
path = tmp_path / "test_df_metadata.p"
df = pd.DataFrame(data={1: [1]})
df.attrs = {"test_attribute": 1}
df.to_parquet(path, engine=pa)
new_df = read_parquet(path, engine=pa)
assert new_df.attrs == df.attrs
def test_string_inference(self, tmp_path, pa):
# GH#54431
path = tmp_path / "test_string_inference.p"
df = pd.DataFrame(data={"a": ["x", "y"]}, index=["a", "b"])
df.to_parquet(path, engine="pyarrow")
with pd.option_context("future.infer_string", True):
result = read_parquet(path, engine="pyarrow")
expected = pd.DataFrame(
data={"a": ["x", "y"]},
dtype="string[pyarrow_numpy]",
index=pd.Index(["a", "b"], dtype="string[pyarrow_numpy]"),
)
tm.assert_frame_equal(result, expected)
@pytest.mark.skipif(pa_version_under11p0, reason="not supported before 11.0")
def test_roundtrip_decimal(self, tmp_path, pa):
# GH#54768
import pyarrow as pa
path = tmp_path / "decimal.p"
df = pd.DataFrame({"a": [Decimal("123.00")]}, dtype="string[pyarrow]")
df.to_parquet(path, schema=pa.schema([("a", pa.decimal128(5))]))
result = read_parquet(path)
expected = pd.DataFrame({"a": ["123"]}, dtype="string[python]")
tm.assert_frame_equal(result, expected)
def test_infer_string_large_string_type(self, tmp_path, pa):
# GH#54798
import pyarrow as pa
import pyarrow.parquet as pq
path = tmp_path / "large_string.p"
table = pa.table({"a": pa.array([None, "b", "c"], pa.large_string())})
pq.write_table(table, path)
with pd.option_context("future.infer_string", True):
result = read_parquet(path)
expected = pd.DataFrame(
data={"a": [None, "b", "c"]},
dtype="string[pyarrow_numpy]",
columns=pd.Index(["a"], dtype="string[pyarrow_numpy]"),
)
tm.assert_frame_equal(result, expected)
# NOTE: this test is not run by default, because it requires a lot of memory (>5GB)
# @pytest.mark.slow
# def test_string_column_above_2GB(self, tmp_path, pa):
# # https://github.com/pandas-dev/pandas/issues/55606
# # above 2GB of string data
# v1 = b"x" * 100000000
# v2 = b"x" * 147483646
# df = pd.DataFrame({"strings": [v1] * 20 + [v2] + ["x"] * 20}, dtype="string")
# df.to_parquet(tmp_path / "test.parquet")
# result = read_parquet(tmp_path / "test.parquet")
# assert result["strings"].dtype == "string"
class TestParquetFastParquet(Base):
def test_basic(self, fp, df_full):
df = df_full
dti = pd.date_range("20130101", periods=3, tz="US/Eastern")
dti = dti._with_freq(None) # freq doesn't round-trip
df["datetime_tz"] = dti
df["timedelta"] = pd.timedelta_range("1 day", periods=3)
check_round_trip(df, fp)
def test_columns_dtypes_invalid(self, fp):
df = pd.DataFrame({"string": list("abc"), "int": list(range(1, 4))})
err = TypeError
msg = "Column name must be a string"
# numeric
df.columns = [0, 1]
self.check_error_on_write(df, fp, err, msg)
# bytes
df.columns = [b"foo", b"bar"]
self.check_error_on_write(df, fp, err, msg)
# python object
df.columns = [
datetime.datetime(2011, 1, 1, 0, 0),
datetime.datetime(2011, 1, 1, 1, 1),
]
self.check_error_on_write(df, fp, err, msg)
def test_duplicate_columns(self, fp):
# not currently able to handle duplicate columns
df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=list("aaa")).copy()
msg = "Cannot create parquet dataset with duplicate column names"
self.check_error_on_write(df, fp, ValueError, msg)
def test_bool_with_none(self, fp):
df = pd.DataFrame({"a": [True, None, False]})
expected = pd.DataFrame({"a": [1.0, np.nan, 0.0]}, dtype="float16")
# Fastparquet bug in 0.7.1 makes it so that this dtype becomes
# float64
check_round_trip(df, fp, expected=expected, check_dtype=False)
def test_unsupported(self, fp):
# period
df = pd.DataFrame({"a": pd.period_range("2013", freq="M", periods=3)})
# error from fastparquet -> don't check exact error message
self.check_error_on_write(df, fp, ValueError, None)
# mixed
df = pd.DataFrame({"a": ["a", 1, 2.0]})
msg = "Can't infer object conversion type"
self.check_error_on_write(df, fp, ValueError, msg)
def test_categorical(self, fp):
df = pd.DataFrame({"a": pd.Categorical(list("abc"))})
check_round_trip(df, fp)
def test_filter_row_groups(self, fp):
d = {"a": list(range(3))}
df = pd.DataFrame(d)
with tm.ensure_clean() as path:
df.to_parquet(path, engine=fp, compression=None, row_group_offsets=1)
result = read_parquet(path, fp, filters=[("a", "==", 0)])
assert len(result) == 1
@pytest.mark.single_cpu
def test_s3_roundtrip(self, df_compat, s3_public_bucket, fp, s3so):
# GH #19134
check_round_trip(
df_compat,
fp,
path=f"s3://{s3_public_bucket.name}/fastparquet.parquet",
read_kwargs={"storage_options": s3so},
write_kwargs={"compression": None, "storage_options": s3so},
)
def test_partition_cols_supported(self, tmp_path, fp, df_full):
# GH #23283
partition_cols = ["bool", "int"]
df = df_full
df.to_parquet(
tmp_path,
engine="fastparquet",
partition_cols=partition_cols,
compression=None,
)
assert os.path.exists(tmp_path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(str(tmp_path), False).cats
assert len(actual_partition_cols) == 2
def test_partition_cols_string(self, tmp_path, fp, df_full):
# GH #27117
partition_cols = "bool"
df = df_full
df.to_parquet(
tmp_path,
engine="fastparquet",
partition_cols=partition_cols,
compression=None,
)
assert os.path.exists(tmp_path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(str(tmp_path), False).cats
assert len(actual_partition_cols) == 1
def test_partition_on_supported(self, tmp_path, fp, df_full):
# GH #23283
partition_cols = ["bool", "int"]
df = df_full
df.to_parquet(
tmp_path,
engine="fastparquet",
compression=None,
partition_on=partition_cols,
)
assert os.path.exists(tmp_path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(str(tmp_path), False).cats
assert len(actual_partition_cols) == 2
def test_error_on_using_partition_cols_and_partition_on(
self, tmp_path, fp, df_full
):
# GH #23283
partition_cols = ["bool", "int"]
df = df_full
msg = (
"Cannot use both partition_on and partition_cols. Use partition_cols for "
"partitioning data"
)
with pytest.raises(ValueError, match=msg):
df.to_parquet(
tmp_path,
engine="fastparquet",
compression=None,
partition_on=partition_cols,
partition_cols=partition_cols,
)
@pytest.mark.skipif(using_copy_on_write(), reason="fastparquet writes into Index")
def test_empty_dataframe(self, fp):
# GH #27339
df = pd.DataFrame()
expected = df.copy()
check_round_trip(df, fp, expected=expected)
@pytest.mark.skipif(using_copy_on_write(), reason="fastparquet writes into Index")
def test_timezone_aware_index(self, fp, timezone_aware_date_list):
idx = 5 * [timezone_aware_date_list]
df = pd.DataFrame(index=idx, data={"index_as_col": idx})
expected = df.copy()
expected.index.name = "index"
check_round_trip(df, fp, expected=expected)
def test_use_nullable_dtypes_not_supported(self, fp):
df = pd.DataFrame({"a": [1, 2]})
with tm.ensure_clean() as path:
df.to_parquet(path)
with pytest.raises(ValueError, match="not supported for the fastparquet"):
with tm.assert_produces_warning(FutureWarning):
read_parquet(path, engine="fastparquet", use_nullable_dtypes=True)
with pytest.raises(ValueError, match="not supported for the fastparquet"):
read_parquet(path, engine="fastparquet", dtype_backend="pyarrow")
def test_close_file_handle_on_read_error(self):
with tm.ensure_clean("test.parquet") as path:
pathlib.Path(path).write_bytes(b"breakit")
with pytest.raises(Exception, match=""): # Not important which exception
read_parquet(path, engine="fastparquet")
# The next line raises an error on Windows if the file is still open
pathlib.Path(path).unlink(missing_ok=False)
def test_bytes_file_name(self, engine):
# GH#48944
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean("test.parquet") as path:
with open(path.encode(), "wb") as f:
df.to_parquet(f)
result = read_parquet(path, engine=engine)
tm.assert_frame_equal(result, df)
def test_filesystem_notimplemented(self):
pytest.importorskip("fastparquet")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
NotImplementedError, match="filesystem is not implemented"
):
df.to_parquet(path, engine="fastparquet", filesystem="foo")
with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
NotImplementedError, match="filesystem is not implemented"
):
read_parquet(path, engine="fastparquet", filesystem="foo")
def test_invalid_filesystem(self):
pytest.importorskip("pyarrow")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
):
df.to_parquet(path, engine="pyarrow", filesystem="foo")
with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
ValueError, match="filesystem must be a pyarrow or fsspec FileSystem"
):
read_parquet(path, engine="pyarrow", filesystem="foo")
def test_unsupported_pa_filesystem_storage_options(self):
pa_fs = pytest.importorskip("pyarrow.fs")
df = pd.DataFrame(data={"A": [0, 1], "B": [1, 0]})
with tm.ensure_clean() as path:
with pytest.raises(
NotImplementedError,
match="storage_options not supported with a pyarrow FileSystem.",
):
df.to_parquet(
path,
engine="pyarrow",
filesystem=pa_fs.LocalFileSystem(),
storage_options={"foo": "bar"},
)
with tm.ensure_clean() as path:
pathlib.Path(path).write_bytes(b"foo")
with pytest.raises(
NotImplementedError,
match="storage_options not supported with a pyarrow FileSystem.",
):
read_parquet(
path,
engine="pyarrow",
filesystem=pa_fs.LocalFileSystem(),
storage_options={"foo": "bar"},
)
def test_invalid_dtype_backend(self, engine):
msg = (
"dtype_backend numpy is invalid, only 'numpy_nullable' and "
"'pyarrow' are allowed."
)
df = pd.DataFrame({"int": list(range(1, 4))})
with tm.ensure_clean("tmp.parquet") as path:
df.to_parquet(path)
with pytest.raises(ValueError, match=msg):
read_parquet(path, dtype_backend="numpy")
@pytest.mark.skipif(using_copy_on_write(), reason="fastparquet writes into Index")
def test_empty_columns(self, fp):
# GH 52034
df = pd.DataFrame(index=pd.Index(["a", "b", "c"], name="custom name"))
expected = pd.DataFrame(index=pd.Index(["a", "b", "c"], name="custom name"))
check_round_trip(df, fp, expected=expected)