You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1980 lines
60 KiB

from datetime import (
datetime,
timedelta,
)
import numpy as np
import pytest
from pandas.compat import (
IS64,
is_platform_arm,
is_platform_power,
)
from pandas import (
DataFrame,
DatetimeIndex,
MultiIndex,
Series,
Timedelta,
Timestamp,
date_range,
period_range,
to_datetime,
to_timedelta,
)
import pandas._testing as tm
from pandas.api.indexers import BaseIndexer
from pandas.core.indexers.objects import VariableOffsetWindowIndexer
from pandas.tseries.offsets import BusinessDay
def test_doc_string():
df = DataFrame({"B": [0, 1, 2, np.nan, 4]})
df
df.rolling(2).sum()
df.rolling(2, min_periods=1).sum()
def test_constructor(frame_or_series):
# GH 12669
c = frame_or_series(range(5)).rolling
# valid
c(0)
c(window=2)
c(window=2, min_periods=1)
c(window=2, min_periods=1, center=True)
c(window=2, min_periods=1, center=False)
# GH 13383
msg = "window must be an integer 0 or greater"
with pytest.raises(ValueError, match=msg):
c(-1)
@pytest.mark.parametrize("w", [2.0, "foo", np.array([2])])
def test_invalid_constructor(frame_or_series, w):
# not valid
c = frame_or_series(range(5)).rolling
msg = "|".join(
[
"window must be an integer",
"passed window foo is not compatible with a datetimelike index",
]
)
with pytest.raises(ValueError, match=msg):
c(window=w)
msg = "min_periods must be an integer"
with pytest.raises(ValueError, match=msg):
c(window=2, min_periods=w)
msg = "center must be a boolean"
with pytest.raises(ValueError, match=msg):
c(window=2, min_periods=1, center=w)
@pytest.mark.parametrize(
"window",
[
timedelta(days=3),
Timedelta(days=3),
"3D",
VariableOffsetWindowIndexer(
index=date_range("2015-12-25", periods=5), offset=BusinessDay(1)
),
],
)
def test_freq_window_not_implemented(window):
# GH 15354
df = DataFrame(
np.arange(10),
index=date_range("2015-12-24", periods=10, freq="D"),
)
with pytest.raises(
NotImplementedError, match="^step (not implemented|is not supported)"
):
df.rolling(window, step=3).sum()
@pytest.mark.parametrize("agg", ["cov", "corr"])
def test_step_not_implemented_for_cov_corr(agg):
# GH 15354
roll = DataFrame(range(2)).rolling(1, step=2)
with pytest.raises(NotImplementedError, match="step not implemented"):
getattr(roll, agg)()
@pytest.mark.parametrize("window", [timedelta(days=3), Timedelta(days=3)])
def test_constructor_with_timedelta_window(window):
# GH 15440
n = 10
df = DataFrame(
{"value": np.arange(n)},
index=date_range("2015-12-24", periods=n, freq="D"),
)
expected_data = np.append([0.0, 1.0], np.arange(3.0, 27.0, 3))
result = df.rolling(window=window).sum()
expected = DataFrame(
{"value": expected_data},
index=date_range("2015-12-24", periods=n, freq="D"),
)
tm.assert_frame_equal(result, expected)
expected = df.rolling("3D").sum()
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("window", [timedelta(days=3), Timedelta(days=3), "3D"])
def test_constructor_timedelta_window_and_minperiods(window, raw):
# GH 15305
n = 10
df = DataFrame(
{"value": np.arange(n)},
index=date_range("2017-08-08", periods=n, freq="D"),
)
expected = DataFrame(
{"value": np.append([np.nan, 1.0], np.arange(3.0, 27.0, 3))},
index=date_range("2017-08-08", periods=n, freq="D"),
)
result_roll_sum = df.rolling(window=window, min_periods=2).sum()
result_roll_generic = df.rolling(window=window, min_periods=2).apply(sum, raw=raw)
tm.assert_frame_equal(result_roll_sum, expected)
tm.assert_frame_equal(result_roll_generic, expected)
def test_closed_fixed(closed, arithmetic_win_operators):
# GH 34315
func_name = arithmetic_win_operators
df_fixed = DataFrame({"A": [0, 1, 2, 3, 4]})
df_time = DataFrame({"A": [0, 1, 2, 3, 4]}, index=date_range("2020", periods=5))
result = getattr(
df_fixed.rolling(2, closed=closed, min_periods=1),
func_name,
)()
expected = getattr(
df_time.rolling("2D", closed=closed, min_periods=1),
func_name,
)().reset_index(drop=True)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
"closed, window_selections",
[
(
"both",
[
[True, True, False, False, False],
[True, True, True, False, False],
[False, True, True, True, False],
[False, False, True, True, True],
[False, False, False, True, True],
],
),
(
"left",
[
[True, False, False, False, False],
[True, True, False, False, False],
[False, True, True, False, False],
[False, False, True, True, False],
[False, False, False, True, True],
],
),
(
"right",
[
[True, True, False, False, False],
[False, True, True, False, False],
[False, False, True, True, False],
[False, False, False, True, True],
[False, False, False, False, True],
],
),
(
"neither",
[
[True, False, False, False, False],
[False, True, False, False, False],
[False, False, True, False, False],
[False, False, False, True, False],
[False, False, False, False, True],
],
),
],
)
def test_datetimelike_centered_selections(
closed, window_selections, arithmetic_win_operators
):
# GH 34315
func_name = arithmetic_win_operators
df_time = DataFrame(
{"A": [0.0, 1.0, 2.0, 3.0, 4.0]}, index=date_range("2020", periods=5)
)
expected = DataFrame(
{"A": [getattr(df_time["A"].iloc[s], func_name)() for s in window_selections]},
index=date_range("2020", periods=5),
)
if func_name == "sem":
kwargs = {"ddof": 0}
else:
kwargs = {}
result = getattr(
df_time.rolling("2D", closed=closed, min_periods=1, center=True),
func_name,
)(**kwargs)
tm.assert_frame_equal(result, expected, check_dtype=False)
@pytest.mark.parametrize(
"window,closed,expected",
[
("3s", "right", [3.0, 3.0, 3.0]),
("3s", "both", [3.0, 3.0, 3.0]),
("3s", "left", [3.0, 3.0, 3.0]),
("3s", "neither", [3.0, 3.0, 3.0]),
("2s", "right", [3.0, 2.0, 2.0]),
("2s", "both", [3.0, 3.0, 3.0]),
("2s", "left", [1.0, 3.0, 3.0]),
("2s", "neither", [1.0, 2.0, 2.0]),
],
)
def test_datetimelike_centered_offset_covers_all(
window, closed, expected, frame_or_series
):
# GH 42753
index = [
Timestamp("20130101 09:00:01"),
Timestamp("20130101 09:00:02"),
Timestamp("20130101 09:00:02"),
]
df = frame_or_series([1, 1, 1], index=index)
result = df.rolling(window, closed=closed, center=True).sum()
expected = frame_or_series(expected, index=index)
tm.assert_equal(result, expected)
@pytest.mark.parametrize(
"window,closed,expected",
[
("2D", "right", [4, 4, 4, 4, 4, 4, 2, 2]),
("2D", "left", [2, 2, 4, 4, 4, 4, 4, 4]),
("2D", "both", [4, 4, 6, 6, 6, 6, 4, 4]),
("2D", "neither", [2, 2, 2, 2, 2, 2, 2, 2]),
],
)
def test_datetimelike_nonunique_index_centering(
window, closed, expected, frame_or_series
):
index = DatetimeIndex(
[
"2020-01-01",
"2020-01-01",
"2020-01-02",
"2020-01-02",
"2020-01-03",
"2020-01-03",
"2020-01-04",
"2020-01-04",
]
)
df = frame_or_series([1] * 8, index=index, dtype=float)
expected = frame_or_series(expected, index=index, dtype=float)
result = df.rolling(window, center=True, closed=closed).sum()
tm.assert_equal(result, expected)
@pytest.mark.parametrize(
"closed,expected",
[
("left", [np.nan, np.nan, 1, 1, 1, 10, 14, 14, 18, 21]),
("neither", [np.nan, np.nan, 1, 1, 1, 9, 5, 5, 13, 8]),
("right", [0, 1, 3, 6, 10, 14, 11, 18, 21, 17]),
("both", [0, 1, 3, 6, 10, 15, 20, 27, 26, 30]),
],
)
def test_variable_window_nonunique(closed, expected, frame_or_series):
# GH 20712
index = DatetimeIndex(
[
"2011-01-01",
"2011-01-01",
"2011-01-02",
"2011-01-02",
"2011-01-02",
"2011-01-03",
"2011-01-04",
"2011-01-04",
"2011-01-05",
"2011-01-06",
]
)
df = frame_or_series(range(10), index=index, dtype=float)
expected = frame_or_series(expected, index=index, dtype=float)
result = df.rolling("2D", closed=closed).sum()
tm.assert_equal(result, expected)
@pytest.mark.parametrize(
"closed,expected",
[
("left", [np.nan, np.nan, 1, 1, 1, 10, 15, 15, 18, 21]),
("neither", [np.nan, np.nan, 1, 1, 1, 10, 15, 15, 13, 8]),
("right", [0, 1, 3, 6, 10, 15, 21, 28, 21, 17]),
("both", [0, 1, 3, 6, 10, 15, 21, 28, 26, 30]),
],
)
def test_variable_offset_window_nonunique(closed, expected, frame_or_series):
# GH 20712
index = DatetimeIndex(
[
"2011-01-01",
"2011-01-01",
"2011-01-02",
"2011-01-02",
"2011-01-02",
"2011-01-03",
"2011-01-04",
"2011-01-04",
"2011-01-05",
"2011-01-06",
]
)
df = frame_or_series(range(10), index=index, dtype=float)
expected = frame_or_series(expected, index=index, dtype=float)
offset = BusinessDay(2)
indexer = VariableOffsetWindowIndexer(index=index, offset=offset)
result = df.rolling(indexer, closed=closed, min_periods=1).sum()
tm.assert_equal(result, expected)
def test_even_number_window_alignment():
# see discussion in GH 38780
s = Series(range(3), index=date_range(start="2020-01-01", freq="D", periods=3))
# behavior of index- and datetime-based windows differs here!
# s.rolling(window=2, min_periods=1, center=True).mean()
result = s.rolling(window="2D", min_periods=1, center=True).mean()
expected = Series([0.5, 1.5, 2], index=s.index)
tm.assert_series_equal(result, expected)
def test_closed_fixed_binary_col(center, step):
# GH 34315
data = [0, 1, 1, 0, 0, 1, 0, 1]
df = DataFrame(
{"binary_col": data},
index=date_range(start="2020-01-01", freq="min", periods=len(data)),
)
if center:
expected_data = [2 / 3, 0.5, 0.4, 0.5, 0.428571, 0.5, 0.571429, 0.5]
else:
expected_data = [np.nan, 0, 0.5, 2 / 3, 0.5, 0.4, 0.5, 0.428571]
expected = DataFrame(
expected_data,
columns=["binary_col"],
index=date_range(start="2020-01-01", freq="min", periods=len(expected_data)),
)[::step]
rolling = df.rolling(
window=len(df), closed="left", min_periods=1, center=center, step=step
)
result = rolling.mean()
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("closed", ["neither", "left"])
def test_closed_empty(closed, arithmetic_win_operators):
# GH 26005
func_name = arithmetic_win_operators
ser = Series(data=np.arange(5), index=date_range("2000", periods=5, freq="2D"))
roll = ser.rolling("1D", closed=closed)
result = getattr(roll, func_name)()
expected = Series([np.nan] * 5, index=ser.index)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize("func", ["min", "max"])
def test_closed_one_entry(func):
# GH24718
ser = Series(data=[2], index=date_range("2000", periods=1))
result = getattr(ser.rolling("10D", closed="left"), func)()
tm.assert_series_equal(result, Series([np.nan], index=ser.index))
@pytest.mark.parametrize("func", ["min", "max"])
def test_closed_one_entry_groupby(func):
# GH24718
ser = DataFrame(
data={"A": [1, 1, 2], "B": [3, 2, 1]},
index=date_range("2000", periods=3),
)
result = getattr(
ser.groupby("A", sort=False)["B"].rolling("10D", closed="left"), func
)()
exp_idx = MultiIndex.from_arrays(arrays=[[1, 1, 2], ser.index], names=("A", None))
expected = Series(data=[np.nan, 3, np.nan], index=exp_idx, name="B")
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize("input_dtype", ["int", "float"])
@pytest.mark.parametrize(
"func,closed,expected",
[
("min", "right", [0.0, 0, 0, 1, 2, 3, 4, 5, 6, 7]),
("min", "both", [0.0, 0, 0, 0, 1, 2, 3, 4, 5, 6]),
("min", "neither", [np.nan, 0, 0, 1, 2, 3, 4, 5, 6, 7]),
("min", "left", [np.nan, 0, 0, 0, 1, 2, 3, 4, 5, 6]),
("max", "right", [0.0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
("max", "both", [0.0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
("max", "neither", [np.nan, 0, 1, 2, 3, 4, 5, 6, 7, 8]),
("max", "left", [np.nan, 0, 1, 2, 3, 4, 5, 6, 7, 8]),
],
)
def test_closed_min_max_datetime(input_dtype, func, closed, expected):
# see gh-21704
ser = Series(
data=np.arange(10).astype(input_dtype),
index=date_range("2000", periods=10),
)
result = getattr(ser.rolling("3D", closed=closed), func)()
expected = Series(expected, index=ser.index)
tm.assert_series_equal(result, expected)
def test_closed_uneven():
# see gh-21704
ser = Series(data=np.arange(10), index=date_range("2000", periods=10))
# uneven
ser = ser.drop(index=ser.index[[1, 5]])
result = ser.rolling("3D", closed="left").min()
expected = Series([np.nan, 0, 0, 2, 3, 4, 6, 6], index=ser.index)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize(
"func,closed,expected",
[
("min", "right", [np.nan, 0, 0, 1, 2, 3, 4, 5, np.nan, np.nan]),
("min", "both", [np.nan, 0, 0, 0, 1, 2, 3, 4, 5, np.nan]),
("min", "neither", [np.nan, np.nan, 0, 1, 2, 3, 4, 5, np.nan, np.nan]),
("min", "left", [np.nan, np.nan, 0, 0, 1, 2, 3, 4, 5, np.nan]),
("max", "right", [np.nan, 1, 2, 3, 4, 5, 6, 6, np.nan, np.nan]),
("max", "both", [np.nan, 1, 2, 3, 4, 5, 6, 6, 6, np.nan]),
("max", "neither", [np.nan, np.nan, 1, 2, 3, 4, 5, 6, np.nan, np.nan]),
("max", "left", [np.nan, np.nan, 1, 2, 3, 4, 5, 6, 6, np.nan]),
],
)
def test_closed_min_max_minp(func, closed, expected):
# see gh-21704
ser = Series(data=np.arange(10), index=date_range("2000", periods=10))
# Explicit cast to float to avoid implicit cast when setting nan
ser = ser.astype("float")
ser[ser.index[-3:]] = np.nan
result = getattr(ser.rolling("3D", min_periods=2, closed=closed), func)()
expected = Series(expected, index=ser.index)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize(
"closed,expected",
[
("right", [0, 0.5, 1, 2, 3, 4, 5, 6, 7, 8]),
("both", [0, 0.5, 1, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5]),
("neither", [np.nan, 0, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5]),
("left", [np.nan, 0, 0.5, 1, 2, 3, 4, 5, 6, 7]),
],
)
def test_closed_median_quantile(closed, expected):
# GH 26005
ser = Series(data=np.arange(10), index=date_range("2000", periods=10))
roll = ser.rolling("3D", closed=closed)
expected = Series(expected, index=ser.index)
result = roll.median()
tm.assert_series_equal(result, expected)
result = roll.quantile(0.5)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize("roller", ["1s", 1])
def tests_empty_df_rolling(roller):
# GH 15819 Verifies that datetime and integer rolling windows can be
# applied to empty DataFrames
expected = DataFrame()
result = DataFrame().rolling(roller).sum()
tm.assert_frame_equal(result, expected)
# Verifies that datetime and integer rolling windows can be applied to
# empty DataFrames with datetime index
expected = DataFrame(index=DatetimeIndex([]))
result = DataFrame(index=DatetimeIndex([])).rolling(roller).sum()
tm.assert_frame_equal(result, expected)
def test_empty_window_median_quantile():
# GH 26005
expected = Series([np.nan, np.nan, np.nan])
roll = Series(np.arange(3)).rolling(0)
result = roll.median()
tm.assert_series_equal(result, expected)
result = roll.quantile(0.1)
tm.assert_series_equal(result, expected)
def test_missing_minp_zero():
# https://github.com/pandas-dev/pandas/pull/18921
# minp=0
x = Series([np.nan])
result = x.rolling(1, min_periods=0).sum()
expected = Series([0.0])
tm.assert_series_equal(result, expected)
# minp=1
result = x.rolling(1, min_periods=1).sum()
expected = Series([np.nan])
tm.assert_series_equal(result, expected)
def test_missing_minp_zero_variable():
# https://github.com/pandas-dev/pandas/pull/18921
x = Series(
[np.nan] * 4,
index=DatetimeIndex(["2017-01-01", "2017-01-04", "2017-01-06", "2017-01-07"]),
)
result = x.rolling(Timedelta("2d"), min_periods=0).sum()
expected = Series(0.0, index=x.index)
tm.assert_series_equal(result, expected)
def test_multi_index_names():
# GH 16789, 16825
cols = MultiIndex.from_product([["A", "B"], ["C", "D", "E"]], names=["1", "2"])
df = DataFrame(np.ones((10, 6)), columns=cols)
result = df.rolling(3).cov()
tm.assert_index_equal(result.columns, df.columns)
assert result.index.names == [None, "1", "2"]
def test_rolling_axis_sum(axis_frame):
# see gh-23372.
df = DataFrame(np.ones((10, 20)))
axis = df._get_axis_number(axis_frame)
if axis == 0:
msg = "The 'axis' keyword in DataFrame.rolling"
expected = DataFrame({i: [np.nan] * 2 + [3.0] * 8 for i in range(20)})
else:
# axis == 1
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
expected = DataFrame([[np.nan] * 2 + [3.0] * 18] * 10)
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(3, axis=axis_frame).sum()
tm.assert_frame_equal(result, expected)
def test_rolling_axis_count(axis_frame):
# see gh-26055
df = DataFrame({"x": range(3), "y": range(3)})
axis = df._get_axis_number(axis_frame)
if axis in [0, "index"]:
msg = "The 'axis' keyword in DataFrame.rolling"
expected = DataFrame({"x": [1.0, 2.0, 2.0], "y": [1.0, 2.0, 2.0]})
else:
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
expected = DataFrame({"x": [1.0, 1.0, 1.0], "y": [2.0, 2.0, 2.0]})
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(2, axis=axis_frame, min_periods=0).count()
tm.assert_frame_equal(result, expected)
def test_readonly_array():
# GH-27766
arr = np.array([1, 3, np.nan, 3, 5])
arr.setflags(write=False)
result = Series(arr).rolling(2).mean()
expected = Series([np.nan, 2, np.nan, np.nan, 4])
tm.assert_series_equal(result, expected)
def test_rolling_datetime(axis_frame, tz_naive_fixture):
# GH-28192
tz = tz_naive_fixture
df = DataFrame(
{i: [1] * 2 for i in date_range("2019-8-01", "2019-08-03", freq="D", tz=tz)}
)
if axis_frame in [0, "index"]:
msg = "The 'axis' keyword in DataFrame.rolling"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.T.rolling("2D", axis=axis_frame).sum().T
else:
msg = "Support for axis=1 in DataFrame.rolling"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling("2D", axis=axis_frame).sum()
expected = DataFrame(
{
**{
i: [1.0] * 2
for i in date_range("2019-8-01", periods=1, freq="D", tz=tz)
},
**{
i: [2.0] * 2
for i in date_range("2019-8-02", "2019-8-03", freq="D", tz=tz)
},
}
)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("center", [True, False])
def test_rolling_window_as_string(center):
# see gh-22590
date_today = datetime.now()
days = date_range(date_today, date_today + timedelta(365), freq="D")
data = np.ones(len(days))
df = DataFrame({"DateCol": days, "metric": data})
df.set_index("DateCol", inplace=True)
result = df.rolling(window="21D", min_periods=2, closed="left", center=center)[
"metric"
].agg("max")
index = days.rename("DateCol")
index = index._with_freq(None)
expected_data = np.ones(len(days), dtype=np.float64)
if not center:
expected_data[:2] = np.nan
expected = Series(expected_data, index=index, name="metric")
tm.assert_series_equal(result, expected)
def test_min_periods1():
# GH#6795
df = DataFrame([0, 1, 2, 1, 0], columns=["a"])
result = df["a"].rolling(3, center=True, min_periods=1).max()
expected = Series([1.0, 2.0, 2.0, 2.0, 1.0], name="a")
tm.assert_series_equal(result, expected)
def test_rolling_count_with_min_periods(frame_or_series):
# GH 26996
result = frame_or_series(range(5)).rolling(3, min_periods=3).count()
expected = frame_or_series([np.nan, np.nan, 3.0, 3.0, 3.0])
tm.assert_equal(result, expected)
def test_rolling_count_default_min_periods_with_null_values(frame_or_series):
# GH 26996
values = [1, 2, 3, np.nan, 4, 5, 6]
expected_counts = [1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 3.0]
# GH 31302
result = frame_or_series(values).rolling(3, min_periods=0).count()
expected = frame_or_series(expected_counts)
tm.assert_equal(result, expected)
@pytest.mark.parametrize(
"df,expected,window,min_periods",
[
(
DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}),
[
({"A": [1], "B": [4]}, [0]),
({"A": [1, 2], "B": [4, 5]}, [0, 1]),
({"A": [1, 2, 3], "B": [4, 5, 6]}, [0, 1, 2]),
],
3,
None,
),
(
DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}),
[
({"A": [1], "B": [4]}, [0]),
({"A": [1, 2], "B": [4, 5]}, [0, 1]),
({"A": [2, 3], "B": [5, 6]}, [1, 2]),
],
2,
1,
),
(
DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}),
[
({"A": [1], "B": [4]}, [0]),
({"A": [1, 2], "B": [4, 5]}, [0, 1]),
({"A": [2, 3], "B": [5, 6]}, [1, 2]),
],
2,
2,
),
(
DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}),
[
({"A": [1], "B": [4]}, [0]),
({"A": [2], "B": [5]}, [1]),
({"A": [3], "B": [6]}, [2]),
],
1,
1,
),
(
DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]}),
[
({"A": [1], "B": [4]}, [0]),
({"A": [2], "B": [5]}, [1]),
({"A": [3], "B": [6]}, [2]),
],
1,
0,
),
(DataFrame({"A": [1], "B": [4]}), [], 2, None),
(DataFrame({"A": [1], "B": [4]}), [], 2, 1),
(DataFrame(), [({}, [])], 2, None),
(
DataFrame({"A": [1, np.nan, 3], "B": [np.nan, 5, 6]}),
[
({"A": [1.0], "B": [np.nan]}, [0]),
({"A": [1, np.nan], "B": [np.nan, 5]}, [0, 1]),
({"A": [1, np.nan, 3], "B": [np.nan, 5, 6]}, [0, 1, 2]),
],
3,
2,
),
],
)
def test_iter_rolling_dataframe(df, expected, window, min_periods):
# GH 11704
expected = [DataFrame(values, index=index) for (values, index) in expected]
for expected, actual in zip(expected, df.rolling(window, min_periods=min_periods)):
tm.assert_frame_equal(actual, expected)
@pytest.mark.parametrize(
"expected,window",
[
(
[
({"A": [1], "B": [4]}, [0]),
({"A": [1, 2], "B": [4, 5]}, [0, 1]),
({"A": [2, 3], "B": [5, 6]}, [1, 2]),
],
"2D",
),
(
[
({"A": [1], "B": [4]}, [0]),
({"A": [1, 2], "B": [4, 5]}, [0, 1]),
({"A": [1, 2, 3], "B": [4, 5, 6]}, [0, 1, 2]),
],
"3D",
),
(
[
({"A": [1], "B": [4]}, [0]),
({"A": [2], "B": [5]}, [1]),
({"A": [3], "B": [6]}, [2]),
],
"1D",
),
],
)
def test_iter_rolling_on_dataframe(expected, window):
# GH 11704, 40373
df = DataFrame(
{
"A": [1, 2, 3, 4, 5],
"B": [4, 5, 6, 7, 8],
"C": date_range(start="2016-01-01", periods=5, freq="D"),
}
)
expected = [
DataFrame(values, index=df.loc[index, "C"]) for (values, index) in expected
]
for expected, actual in zip(expected, df.rolling(window, on="C")):
tm.assert_frame_equal(actual, expected)
def test_iter_rolling_on_dataframe_unordered():
# GH 43386
df = DataFrame({"a": ["x", "y", "x"], "b": [0, 1, 2]})
results = list(df.groupby("a").rolling(2))
expecteds = [df.iloc[idx, [1]] for idx in [[0], [0, 2], [1]]]
for result, expected in zip(results, expecteds):
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
"ser,expected,window, min_periods",
[
(
Series([1, 2, 3]),
[([1], [0]), ([1, 2], [0, 1]), ([1, 2, 3], [0, 1, 2])],
3,
None,
),
(
Series([1, 2, 3]),
[([1], [0]), ([1, 2], [0, 1]), ([1, 2, 3], [0, 1, 2])],
3,
1,
),
(
Series([1, 2, 3]),
[([1], [0]), ([1, 2], [0, 1]), ([2, 3], [1, 2])],
2,
1,
),
(
Series([1, 2, 3]),
[([1], [0]), ([1, 2], [0, 1]), ([2, 3], [1, 2])],
2,
2,
),
(Series([1, 2, 3]), [([1], [0]), ([2], [1]), ([3], [2])], 1, 0),
(Series([1, 2, 3]), [([1], [0]), ([2], [1]), ([3], [2])], 1, 1),
(Series([1, 2]), [([1], [0]), ([1, 2], [0, 1])], 2, 0),
(Series([], dtype="int64"), [], 2, 1),
],
)
def test_iter_rolling_series(ser, expected, window, min_periods):
# GH 11704
expected = [Series(values, index=index) for (values, index) in expected]
for expected, actual in zip(expected, ser.rolling(window, min_periods=min_periods)):
tm.assert_series_equal(actual, expected)
@pytest.mark.parametrize(
"expected,expected_index,window",
[
(
[[0], [1], [2], [3], [4]],
[
date_range("2020-01-01", periods=1, freq="D"),
date_range("2020-01-02", periods=1, freq="D"),
date_range("2020-01-03", periods=1, freq="D"),
date_range("2020-01-04", periods=1, freq="D"),
date_range("2020-01-05", periods=1, freq="D"),
],
"1D",
),
(
[[0], [0, 1], [1, 2], [2, 3], [3, 4]],
[
date_range("2020-01-01", periods=1, freq="D"),
date_range("2020-01-01", periods=2, freq="D"),
date_range("2020-01-02", periods=2, freq="D"),
date_range("2020-01-03", periods=2, freq="D"),
date_range("2020-01-04", periods=2, freq="D"),
],
"2D",
),
(
[[0], [0, 1], [0, 1, 2], [1, 2, 3], [2, 3, 4]],
[
date_range("2020-01-01", periods=1, freq="D"),
date_range("2020-01-01", periods=2, freq="D"),
date_range("2020-01-01", periods=3, freq="D"),
date_range("2020-01-02", periods=3, freq="D"),
date_range("2020-01-03", periods=3, freq="D"),
],
"3D",
),
],
)
def test_iter_rolling_datetime(expected, expected_index, window):
# GH 11704
ser = Series(range(5), index=date_range(start="2020-01-01", periods=5, freq="D"))
expected = [
Series(values, index=idx) for (values, idx) in zip(expected, expected_index)
]
for expected, actual in zip(expected, ser.rolling(window)):
tm.assert_series_equal(actual, expected)
@pytest.mark.parametrize(
"grouping,_index",
[
(
{"level": 0},
MultiIndex.from_tuples(
[(0, 0), (0, 0), (1, 1), (1, 1), (1, 1)], names=[None, None]
),
),
(
{"by": "X"},
MultiIndex.from_tuples(
[(0, 0), (1, 0), (2, 1), (3, 1), (4, 1)], names=["X", None]
),
),
],
)
def test_rolling_positional_argument(grouping, _index, raw):
# GH 34605
def scaled_sum(*args):
if len(args) < 2:
raise ValueError("The function needs two arguments")
array, scale = args
return array.sum() / scale
df = DataFrame(data={"X": range(5)}, index=[0, 0, 1, 1, 1])
expected = DataFrame(data={"X": [0.0, 0.5, 1.0, 1.5, 2.0]}, index=_index)
# GH 40341
if "by" in grouping:
expected = expected.drop(columns="X", errors="ignore")
result = df.groupby(**grouping).rolling(1).apply(scaled_sum, raw=raw, args=(2,))
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("add", [0.0, 2.0])
def test_rolling_numerical_accuracy_kahan_mean(add, unit):
# GH: 36031 implementing kahan summation
dti = DatetimeIndex(
[
Timestamp("19700101 09:00:00"),
Timestamp("19700101 09:00:03"),
Timestamp("19700101 09:00:06"),
]
).as_unit(unit)
df = DataFrame(
{"A": [3002399751580331.0 + add, -0.0, -0.0]},
index=dti,
)
result = (
df.resample("1s").ffill().rolling("3s", closed="left", min_periods=3).mean()
)
dates = date_range("19700101 09:00:00", periods=7, freq="s", unit=unit)
expected = DataFrame(
{
"A": [
np.nan,
np.nan,
np.nan,
3002399751580330.5,
2001599834386887.25,
1000799917193443.625,
0.0,
]
},
index=dates,
)
tm.assert_frame_equal(result, expected)
def test_rolling_numerical_accuracy_kahan_sum():
# GH: 13254
df = DataFrame([2.186, -1.647, 0.0, 0.0, 0.0, 0.0], columns=["x"])
result = df["x"].rolling(3).sum()
expected = Series([np.nan, np.nan, 0.539, -1.647, 0.0, 0.0], name="x")
tm.assert_series_equal(result, expected)
def test_rolling_numerical_accuracy_jump():
# GH: 32761
index = date_range(start="2020-01-01", end="2020-01-02", freq="60s").append(
DatetimeIndex(["2020-01-03"])
)
data = np.random.default_rng(2).random(len(index))
df = DataFrame({"data": data}, index=index)
result = df.rolling("60s").mean()
tm.assert_frame_equal(result, df[["data"]])
def test_rolling_numerical_accuracy_small_values():
# GH: 10319
s = Series(
data=[0.00012456, 0.0003, -0.0, -0.0],
index=date_range("1999-02-03", "1999-02-06"),
)
result = s.rolling(1).mean()
tm.assert_series_equal(result, s)
def test_rolling_numerical_too_large_numbers():
# GH: 11645
dates = date_range("2015-01-01", periods=10, freq="D")
ds = Series(data=range(10), index=dates, dtype=np.float64)
ds.iloc[2] = -9e33
result = ds.rolling(5).mean()
expected = Series(
[
np.nan,
np.nan,
np.nan,
np.nan,
-1.8e33,
-1.8e33,
-1.8e33,
5.0,
6.0,
7.0,
],
index=dates,
)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize(
("func", "value"),
[("sum", 2.0), ("max", 1.0), ("min", 1.0), ("mean", 1.0), ("median", 1.0)],
)
def test_rolling_mixed_dtypes_axis_1(func, value):
# GH: 20649
df = DataFrame(1, index=[1, 2], columns=["a", "b", "c"])
df["c"] = 1.0
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
roll = df.rolling(window=2, min_periods=1, axis=1)
result = getattr(roll, func)()
expected = DataFrame(
{"a": [1.0, 1.0], "b": [value, value], "c": [value, value]},
index=[1, 2],
)
tm.assert_frame_equal(result, expected)
def test_rolling_axis_one_with_nan():
# GH: 35596
df = DataFrame(
[
[0, 1, 2, 4, np.nan, np.nan, np.nan],
[0, 1, 2, np.nan, np.nan, np.nan, np.nan],
[0, 2, 2, np.nan, 2, np.nan, 1],
]
)
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(window=7, min_periods=1, axis="columns").sum()
expected = DataFrame(
[
[0.0, 1.0, 3.0, 7.0, 7.0, 7.0, 7.0],
[0.0, 1.0, 3.0, 3.0, 3.0, 3.0, 3.0],
[0.0, 2.0, 4.0, 4.0, 6.0, 6.0, 7.0],
]
)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
"value",
["test", to_datetime("2019-12-31"), to_timedelta("1 days 06:05:01.00003")],
)
def test_rolling_axis_1_non_numeric_dtypes(value):
# GH: 20649
df = DataFrame({"a": [1, 2]})
df["b"] = value
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(window=2, min_periods=1, axis=1).sum()
expected = DataFrame({"a": [1.0, 2.0]})
tm.assert_frame_equal(result, expected)
def test_rolling_on_df_transposed():
# GH: 32724
df = DataFrame({"A": [1, None], "B": [4, 5], "C": [7, 8]})
expected = DataFrame({"A": [1.0, np.nan], "B": [5.0, 5.0], "C": [11.0, 13.0]})
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(min_periods=1, window=2, axis=1).sum()
tm.assert_frame_equal(result, expected)
result = df.T.rolling(min_periods=1, window=2).sum().T
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
("index", "window"),
[
(
period_range(start="2020-01-01 08:00", end="2020-01-01 08:08", freq="min"),
"2min",
),
(
period_range(
start="2020-01-01 08:00", end="2020-01-01 12:00", freq="30min"
),
"1h",
),
],
)
@pytest.mark.parametrize(
("func", "values"),
[
("min", [np.nan, 0, 0, 1, 2, 3, 4, 5, 6]),
("max", [np.nan, 0, 1, 2, 3, 4, 5, 6, 7]),
("sum", [np.nan, 0, 1, 3, 5, 7, 9, 11, 13]),
],
)
def test_rolling_period_index(index, window, func, values):
# GH: 34225
ds = Series([0, 1, 2, 3, 4, 5, 6, 7, 8], index=index)
result = getattr(ds.rolling(window, closed="left"), func)()
expected = Series(values, index=index)
tm.assert_series_equal(result, expected)
def test_rolling_sem(frame_or_series):
# GH: 26476
obj = frame_or_series([0, 1, 2])
result = obj.rolling(2, min_periods=1).sem()
if isinstance(result, DataFrame):
result = Series(result[0].values)
expected = Series([np.nan] + [0.7071067811865476] * 2)
tm.assert_series_equal(result, expected)
@pytest.mark.xfail(
is_platform_arm() or is_platform_power(),
reason="GH 38921",
)
@pytest.mark.parametrize(
("func", "third_value", "values"),
[
("var", 1, [5e33, 0, 0.5, 0.5, 2, 0]),
("std", 1, [7.071068e16, 0, 0.7071068, 0.7071068, 1.414214, 0]),
("var", 2, [5e33, 0.5, 0, 0.5, 2, 0]),
("std", 2, [7.071068e16, 0.7071068, 0, 0.7071068, 1.414214, 0]),
],
)
def test_rolling_var_numerical_issues(func, third_value, values):
# GH: 37051
ds = Series([99999999999999999, 1, third_value, 2, 3, 1, 1])
result = getattr(ds.rolling(2), func)()
expected = Series([np.nan] + values)
tm.assert_series_equal(result, expected)
# GH 42064
# new `roll_var` will output 0.0 correctly
tm.assert_series_equal(result == 0, expected == 0)
def test_timeoffset_as_window_parameter_for_corr(unit):
# GH: 28266
dti = DatetimeIndex(
[
Timestamp("20130101 09:00:00"),
Timestamp("20130102 09:00:02"),
Timestamp("20130103 09:00:03"),
Timestamp("20130105 09:00:05"),
Timestamp("20130106 09:00:06"),
]
).as_unit(unit)
mi = MultiIndex.from_product([dti, ["B", "A"]])
exp = DataFrame(
{
"B": [
np.nan,
np.nan,
0.9999999999999998,
-1.0,
1.0,
-0.3273268353539892,
0.9999999999999998,
1.0,
0.9999999999999998,
1.0,
],
"A": [
np.nan,
np.nan,
-1.0,
1.0000000000000002,
-0.3273268353539892,
0.9999999999999966,
1.0,
1.0000000000000002,
1.0,
1.0000000000000002,
],
},
index=mi,
)
df = DataFrame(
{"B": [0, 1, 2, 4, 3], "A": [7, 4, 6, 9, 3]},
index=dti,
)
res = df.rolling(window="3d").corr()
tm.assert_frame_equal(exp, res)
@pytest.mark.parametrize("method", ["var", "sum", "mean", "skew", "kurt", "min", "max"])
def test_rolling_decreasing_indices(method):
"""
Make sure that decreasing indices give the same results as increasing indices.
GH 36933
"""
df = DataFrame({"values": np.arange(-15, 10) ** 2})
df_reverse = DataFrame({"values": df["values"][::-1]}, index=df.index[::-1])
increasing = getattr(df.rolling(window=5), method)()
decreasing = getattr(df_reverse.rolling(window=5), method)()
assert np.abs(decreasing.values[::-1][:-4] - increasing.values[4:]).max() < 1e-12
@pytest.mark.parametrize(
"window,closed,expected",
[
("2s", "right", [1.0, 3.0, 5.0, 3.0]),
("2s", "left", [0.0, 1.0, 3.0, 5.0]),
("2s", "both", [1.0, 3.0, 6.0, 5.0]),
("2s", "neither", [0.0, 1.0, 2.0, 3.0]),
("3s", "right", [1.0, 3.0, 6.0, 5.0]),
("3s", "left", [1.0, 3.0, 6.0, 5.0]),
("3s", "both", [1.0, 3.0, 6.0, 5.0]),
("3s", "neither", [1.0, 3.0, 6.0, 5.0]),
],
)
def test_rolling_decreasing_indices_centered(window, closed, expected, frame_or_series):
"""
Ensure that a symmetrical inverted index return same result as non-inverted.
"""
# GH 43927
index = date_range("2020", periods=4, freq="1s")
df_inc = frame_or_series(range(4), index=index)
df_dec = frame_or_series(range(4), index=index[::-1])
expected_inc = frame_or_series(expected, index=index)
expected_dec = frame_or_series(expected, index=index[::-1])
result_inc = df_inc.rolling(window, closed=closed, center=True).sum()
result_dec = df_dec.rolling(window, closed=closed, center=True).sum()
tm.assert_equal(result_inc, expected_inc)
tm.assert_equal(result_dec, expected_dec)
@pytest.mark.parametrize(
"window,expected",
[
("1ns", [1.0, 1.0, 1.0, 1.0]),
("3ns", [2.0, 3.0, 3.0, 2.0]),
],
)
def test_rolling_center_nanosecond_resolution(
window, closed, expected, frame_or_series
):
index = date_range("2020", periods=4, freq="1ns")
df = frame_or_series([1, 1, 1, 1], index=index, dtype=float)
expected = frame_or_series(expected, index=index, dtype=float)
result = df.rolling(window, closed=closed, center=True).sum()
tm.assert_equal(result, expected)
@pytest.mark.parametrize(
"method,expected",
[
(
"var",
[
float("nan"),
43.0,
float("nan"),
136.333333,
43.5,
94.966667,
182.0,
318.0,
],
),
(
"mean",
[float("nan"), 7.5, float("nan"), 21.5, 6.0, 9.166667, 13.0, 17.5],
),
(
"sum",
[float("nan"), 30.0, float("nan"), 86.0, 30.0, 55.0, 91.0, 140.0],
),
(
"skew",
[
float("nan"),
0.709296,
float("nan"),
0.407073,
0.984656,
0.919184,
0.874674,
0.842418,
],
),
(
"kurt",
[
float("nan"),
-0.5916711736073559,
float("nan"),
-1.0028993131317954,
-0.06103844629409494,
-0.254143227116194,
-0.37362637362637585,
-0.45439658241367054,
],
),
],
)
def test_rolling_non_monotonic(method, expected):
"""
Make sure the (rare) branch of non-monotonic indices is covered by a test.
output from 1.1.3 is assumed to be the expected output. Output of sum/mean has
manually been verified.
GH 36933.
"""
# Based on an example found in computation.rst
use_expanding = [True, False, True, False, True, True, True, True]
df = DataFrame({"values": np.arange(len(use_expanding)) ** 2})
class CustomIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed, step):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)
for i in range(num_values):
if self.use_expanding[i]:
start[i] = 0
end[i] = i + 1
else:
start[i] = i
end[i] = i + self.window_size
return start, end
indexer = CustomIndexer(window_size=4, use_expanding=use_expanding)
result = getattr(df.rolling(indexer), method)()
expected = DataFrame({"values": expected})
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize(
("index", "window"),
[
([0, 1, 2, 3, 4], 2),
(date_range("2001-01-01", freq="D", periods=5), "2D"),
],
)
def test_rolling_corr_timedelta_index(index, window):
# GH: 31286
x = Series([1, 2, 3, 4, 5], index=index)
y = x.copy()
x.iloc[0:2] = 0.0
result = x.rolling(window).corr(y)
expected = Series([np.nan, np.nan, 1, 1, 1], index=index)
tm.assert_almost_equal(result, expected)
def test_groupby_rolling_nan_included():
# GH 35542
data = {"group": ["g1", np.nan, "g1", "g2", np.nan], "B": [0, 1, 2, 3, 4]}
df = DataFrame(data)
result = df.groupby("group", dropna=False).rolling(1, min_periods=1).mean()
expected = DataFrame(
{"B": [0.0, 2.0, 3.0, 1.0, 4.0]},
# GH-38057 from_tuples puts the NaNs in the codes, result expects them
# to be in the levels, at the moment
# index=MultiIndex.from_tuples(
# [("g1", 0), ("g1", 2), ("g2", 3), (np.nan, 1), (np.nan, 4)],
# names=["group", None],
# ),
index=MultiIndex(
[["g1", "g2", np.nan], [0, 1, 2, 3, 4]],
[[0, 0, 1, 2, 2], [0, 2, 3, 1, 4]],
names=["group", None],
),
)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("method", ["skew", "kurt"])
def test_rolling_skew_kurt_numerical_stability(method):
# GH#6929
ser = Series(np.random.default_rng(2).random(10))
ser_copy = ser.copy()
expected = getattr(ser.rolling(3), method)()
tm.assert_series_equal(ser, ser_copy)
ser = ser + 50000
result = getattr(ser.rolling(3), method)()
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize(
("method", "values"),
[
("skew", [2.0, 0.854563, 0.0, 1.999984]),
("kurt", [4.0, -1.289256, -1.2, 3.999946]),
],
)
def test_rolling_skew_kurt_large_value_range(method, values):
# GH: 37557
s = Series([3000000, 1, 1, 2, 3, 4, 999])
result = getattr(s.rolling(4), method)()
expected = Series([np.nan] * 3 + values)
tm.assert_series_equal(result, expected)
def test_invalid_method():
with pytest.raises(ValueError, match="method must be 'table' or 'single"):
Series(range(1)).rolling(1, method="foo")
@pytest.mark.parametrize("window", [1, "1d"])
def test_rolling_descending_date_order_with_offset(window, frame_or_series):
# GH#40002
idx = date_range(start="2020-01-01", end="2020-01-03", freq="1d")
obj = frame_or_series(range(1, 4), index=idx)
result = obj.rolling("1d", closed="left").sum()
expected = frame_or_series([np.nan, 1, 2], index=idx)
tm.assert_equal(result, expected)
result = obj.iloc[::-1].rolling("1d", closed="left").sum()
idx = date_range(start="2020-01-03", end="2020-01-01", freq="-1d")
expected = frame_or_series([np.nan, 3, 2], index=idx)
tm.assert_equal(result, expected)
def test_rolling_var_floating_artifact_precision():
# GH 37051
s = Series([7, 5, 5, 5])
result = s.rolling(3).var()
expected = Series([np.nan, np.nan, 4 / 3, 0])
tm.assert_series_equal(result, expected, atol=1.0e-15, rtol=1.0e-15)
# GH 42064
# new `roll_var` will output 0.0 correctly
tm.assert_series_equal(result == 0, expected == 0)
def test_rolling_std_small_values():
# GH 37051
s = Series(
[
0.00000054,
0.00000053,
0.00000054,
]
)
result = s.rolling(2).std()
expected = Series([np.nan, 7.071068e-9, 7.071068e-9])
tm.assert_series_equal(result, expected, atol=1.0e-15, rtol=1.0e-15)
@pytest.mark.parametrize(
"start, exp_values",
[
(1, [0.03, 0.0155, 0.0155, 0.011, 0.01025]),
(2, [0.001, 0.001, 0.0015, 0.00366666]),
],
)
def test_rolling_mean_all_nan_window_floating_artifacts(start, exp_values):
# GH#41053
df = DataFrame(
[
0.03,
0.03,
0.001,
np.nan,
0.002,
0.008,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
np.nan,
0.005,
0.2,
]
)
values = exp_values + [
0.00366666,
0.005,
0.005,
0.008,
np.nan,
np.nan,
0.005,
0.102500,
]
expected = DataFrame(
values,
index=list(range(start, len(values) + start)),
)
result = df.iloc[start:].rolling(5, min_periods=0).mean()
tm.assert_frame_equal(result, expected)
def test_rolling_sum_all_nan_window_floating_artifacts():
# GH#41053
df = DataFrame([0.002, 0.008, 0.005, np.nan, np.nan, np.nan])
result = df.rolling(3, min_periods=0).sum()
expected = DataFrame([0.002, 0.010, 0.015, 0.013, 0.005, 0.0])
tm.assert_frame_equal(result, expected)
def test_rolling_zero_window():
# GH 22719
s = Series(range(1))
result = s.rolling(0).min()
expected = Series([np.nan])
tm.assert_series_equal(result, expected)
def test_rolling_float_dtype(float_numpy_dtype):
# GH#42452
df = DataFrame({"A": range(5), "B": range(10, 15)}, dtype=float_numpy_dtype)
expected = DataFrame(
{"A": [np.nan] * 5, "B": range(10, 20, 2)},
dtype=float_numpy_dtype,
)
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(2, axis=1).sum()
tm.assert_frame_equal(result, expected, check_dtype=False)
def test_rolling_numeric_dtypes():
# GH#41779
df = DataFrame(np.arange(40).reshape(4, 10), columns=list("abcdefghij")).astype(
{
"a": "float16",
"b": "float32",
"c": "float64",
"d": "int8",
"e": "int16",
"f": "int32",
"g": "uint8",
"h": "uint16",
"i": "uint32",
"j": "uint64",
}
)
msg = "Support for axis=1 in DataFrame.rolling is deprecated"
with tm.assert_produces_warning(FutureWarning, match=msg):
result = df.rolling(window=2, min_periods=1, axis=1).min()
expected = DataFrame(
{
"a": range(0, 40, 10),
"b": range(0, 40, 10),
"c": range(1, 40, 10),
"d": range(2, 40, 10),
"e": range(3, 40, 10),
"f": range(4, 40, 10),
"g": range(5, 40, 10),
"h": range(6, 40, 10),
"i": range(7, 40, 10),
"j": range(8, 40, 10),
},
dtype="float64",
)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("window", [1, 3, 10, 20])
@pytest.mark.parametrize("method", ["min", "max", "average"])
@pytest.mark.parametrize("pct", [True, False])
@pytest.mark.parametrize("ascending", [True, False])
@pytest.mark.parametrize("test_data", ["default", "duplicates", "nans"])
def test_rank(window, method, pct, ascending, test_data):
length = 20
if test_data == "default":
ser = Series(data=np.random.default_rng(2).random(length))
elif test_data == "duplicates":
ser = Series(data=np.random.default_rng(2).choice(3, length))
elif test_data == "nans":
ser = Series(
data=np.random.default_rng(2).choice(
[1.0, 0.25, 0.75, np.nan, np.inf, -np.inf], length
)
)
expected = ser.rolling(window).apply(
lambda x: x.rank(method=method, pct=pct, ascending=ascending).iloc[-1]
)
result = ser.rolling(window).rank(method=method, pct=pct, ascending=ascending)
tm.assert_series_equal(result, expected)
def test_rolling_quantile_np_percentile():
# #9413: Tests that rolling window's quantile default behavior
# is analogous to Numpy's percentile
row = 10
col = 5
idx = date_range("20100101", periods=row, freq="B")
df = DataFrame(
np.random.default_rng(2).random(row * col).reshape((row, -1)), index=idx
)
df_quantile = df.quantile([0.25, 0.5, 0.75], axis=0)
np_percentile = np.percentile(df, [25, 50, 75], axis=0)
tm.assert_almost_equal(df_quantile.values, np.array(np_percentile))
@pytest.mark.parametrize("quantile", [0.0, 0.1, 0.45, 0.5, 1])
@pytest.mark.parametrize(
"interpolation", ["linear", "lower", "higher", "nearest", "midpoint"]
)
@pytest.mark.parametrize(
"data",
[
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
[8.0, 1.0, 3.0, 4.0, 5.0, 2.0, 6.0, 7.0],
[0.0, np.nan, 0.2, np.nan, 0.4],
[np.nan, np.nan, np.nan, np.nan],
[np.nan, 0.1, np.nan, 0.3, 0.4, 0.5],
[0.5],
[np.nan, 0.7, 0.6],
],
)
def test_rolling_quantile_interpolation_options(quantile, interpolation, data):
# Tests that rolling window's quantile behavior is analogous to
# Series' quantile for each interpolation option
s = Series(data)
q1 = s.quantile(quantile, interpolation)
q2 = s.expanding(min_periods=1).quantile(quantile, interpolation).iloc[-1]
if np.isnan(q1):
assert np.isnan(q2)
else:
if not IS64:
# Less precision on 32-bit
assert np.allclose([q1], [q2], rtol=1e-07, atol=0)
else:
assert q1 == q2
def test_invalid_quantile_value():
data = np.arange(5)
s = Series(data)
msg = "Interpolation 'invalid' is not supported"
with pytest.raises(ValueError, match=msg):
s.rolling(len(data), min_periods=1).quantile(0.5, interpolation="invalid")
def test_rolling_quantile_param():
ser = Series([0.0, 0.1, 0.5, 0.9, 1.0])
msg = "quantile value -0.1 not in \\[0, 1\\]"
with pytest.raises(ValueError, match=msg):
ser.rolling(3).quantile(-0.1)
msg = "quantile value 10.0 not in \\[0, 1\\]"
with pytest.raises(ValueError, match=msg):
ser.rolling(3).quantile(10.0)
msg = "must be real number, not str"
with pytest.raises(TypeError, match=msg):
ser.rolling(3).quantile("foo")
def test_rolling_std_1obs():
vals = Series([1.0, 2.0, 3.0, 4.0, 5.0])
result = vals.rolling(1, min_periods=1).std()
expected = Series([np.nan] * 5)
tm.assert_series_equal(result, expected)
result = vals.rolling(1, min_periods=1).std(ddof=0)
expected = Series([0.0] * 5)
tm.assert_series_equal(result, expected)
result = Series([np.nan, np.nan, 3, 4, 5]).rolling(3, min_periods=2).std()
assert np.isnan(result[2])
def test_rolling_std_neg_sqrt():
# unit test from Bottleneck
# Test move_nanstd for neg sqrt.
a = Series(
[
0.0011448196318903589,
0.00028718669878572767,
0.00028718669878572767,
0.00028718669878572767,
0.00028718669878572767,
]
)
b = a.rolling(window=3).std()
assert np.isfinite(b[2:]).all()
b = a.ewm(span=3).std()
assert np.isfinite(b[2:]).all()
def test_step_not_integer_raises():
with pytest.raises(ValueError, match="step must be an integer"):
DataFrame(range(2)).rolling(1, step="foo")
def test_step_not_positive_raises():
with pytest.raises(ValueError, match="step must be >= 0"):
DataFrame(range(2)).rolling(1, step=-1)
@pytest.mark.parametrize(
["values", "window", "min_periods", "expected"],
[
[
[20, 10, 10, np.inf, 1, 1, 2, 3],
3,
1,
[np.nan, 50, 100 / 3, 0, 40.5, 0, 1 / 3, 1],
],
[
[20, 10, 10, np.nan, 10, 1, 2, 3],
3,
1,
[np.nan, 50, 100 / 3, 0, 0, 40.5, 73 / 3, 1],
],
[
[np.nan, 5, 6, 7, 5, 5, 5],
3,
3,
[np.nan] * 3 + [1, 1, 4 / 3, 0],
],
[
[5, 7, 7, 7, np.nan, np.inf, 4, 3, 3, 3],
3,
3,
[np.nan] * 2 + [4 / 3, 0] + [np.nan] * 4 + [1 / 3, 0],
],
[
[5, 7, 7, 7, np.nan, np.inf, 7, 3, 3, 3],
3,
3,
[np.nan] * 2 + [4 / 3, 0] + [np.nan] * 4 + [16 / 3, 0],
],
[
[5, 7] * 4,
3,
3,
[np.nan] * 2 + [4 / 3] * 6,
],
[
[5, 7, 5, np.nan, 7, 5, 7],
3,
2,
[np.nan, 2, 4 / 3] + [2] * 3 + [4 / 3],
],
],
)
def test_rolling_var_same_value_count_logic(values, window, min_periods, expected):
# GH 42064.
expected = Series(expected)
sr = Series(values)
# With new algo implemented, result will be set to .0 in rolling var
# if sufficient amount of consecutively same values are found.
result_var = sr.rolling(window, min_periods=min_periods).var()
# use `assert_series_equal` twice to check for equality,
# because `check_exact=True` will fail in 32-bit tests due to
# precision loss.
# 1. result should be close to correct value
# non-zero values can still differ slightly from "truth"
# as the result of online algorithm
tm.assert_series_equal(result_var, expected)
# 2. zeros should be exactly the same since the new algo takes effect here
tm.assert_series_equal(expected == 0, result_var == 0)
# std should also pass as it's just a sqrt of var
result_std = sr.rolling(window, min_periods=min_periods).std()
tm.assert_series_equal(result_std, np.sqrt(expected))
tm.assert_series_equal(expected == 0, result_std == 0)
def test_rolling_mean_sum_floating_artifacts():
# GH 42064.
sr = Series([1 / 3, 4, 0, 0, 0, 0, 0])
r = sr.rolling(3)
result = r.mean()
assert (result[-3:] == 0).all()
result = r.sum()
assert (result[-3:] == 0).all()
def test_rolling_skew_kurt_floating_artifacts():
# GH 42064 46431
sr = Series([1 / 3, 4, 0, 0, 0, 0, 0])
r = sr.rolling(4)
result = r.skew()
assert (result[-2:] == 0).all()
result = r.kurt()
assert (result[-2:] == -3).all()
def test_numeric_only_frame(arithmetic_win_operators, numeric_only):
# GH#46560
kernel = arithmetic_win_operators
df = DataFrame({"a": [1], "b": 2, "c": 3})
df["c"] = df["c"].astype(object)
rolling = df.rolling(2, min_periods=1)
op = getattr(rolling, kernel)
result = op(numeric_only=numeric_only)
columns = ["a", "b"] if numeric_only else ["a", "b", "c"]
expected = df[columns].agg([kernel]).reset_index(drop=True).astype(float)
assert list(expected.columns) == columns
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("kernel", ["corr", "cov"])
@pytest.mark.parametrize("use_arg", [True, False])
def test_numeric_only_corr_cov_frame(kernel, numeric_only, use_arg):
# GH#46560
df = DataFrame({"a": [1, 2, 3], "b": 2, "c": 3})
df["c"] = df["c"].astype(object)
arg = (df,) if use_arg else ()
rolling = df.rolling(2, min_periods=1)
op = getattr(rolling, kernel)
result = op(*arg, numeric_only=numeric_only)
# Compare result to op using float dtypes, dropping c when numeric_only is True
columns = ["a", "b"] if numeric_only else ["a", "b", "c"]
df2 = df[columns].astype(float)
arg2 = (df2,) if use_arg else ()
rolling2 = df2.rolling(2, min_periods=1)
op2 = getattr(rolling2, kernel)
expected = op2(*arg2, numeric_only=numeric_only)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize("dtype", [int, object])
def test_numeric_only_series(arithmetic_win_operators, numeric_only, dtype):
# GH#46560
kernel = arithmetic_win_operators
ser = Series([1], dtype=dtype)
rolling = ser.rolling(2, min_periods=1)
op = getattr(rolling, kernel)
if numeric_only and dtype is object:
msg = f"Rolling.{kernel} does not implement numeric_only"
with pytest.raises(NotImplementedError, match=msg):
op(numeric_only=numeric_only)
else:
result = op(numeric_only=numeric_only)
expected = ser.agg([kernel]).reset_index(drop=True).astype(float)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize("kernel", ["corr", "cov"])
@pytest.mark.parametrize("use_arg", [True, False])
@pytest.mark.parametrize("dtype", [int, object])
def test_numeric_only_corr_cov_series(kernel, use_arg, numeric_only, dtype):
# GH#46560
ser = Series([1, 2, 3], dtype=dtype)
arg = (ser,) if use_arg else ()
rolling = ser.rolling(2, min_periods=1)
op = getattr(rolling, kernel)
if numeric_only and dtype is object:
msg = f"Rolling.{kernel} does not implement numeric_only"
with pytest.raises(NotImplementedError, match=msg):
op(*arg, numeric_only=numeric_only)
else:
result = op(*arg, numeric_only=numeric_only)
ser2 = ser.astype(float)
arg2 = (ser2,) if use_arg else ()
rolling2 = ser2.rolling(2, min_periods=1)
op2 = getattr(rolling2, kernel)
expected = op2(*arg2, numeric_only=numeric_only)
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize("unit", ["s", "ms", "us", "ns"])
@pytest.mark.parametrize("tz", [None, "UTC", "Europe/Prague"])
def test_rolling_timedelta_window_non_nanoseconds(unit, tz):
# Test Sum, GH#55106
df_time = DataFrame(
{"A": range(5)}, index=date_range("2013-01-01", freq="1s", periods=5, tz=tz)
)
sum_in_nanosecs = df_time.rolling("1s").sum()
# microseconds / milliseconds should not break the correct rolling
df_time.index = df_time.index.as_unit(unit)
sum_in_microsecs = df_time.rolling("1s").sum()
sum_in_microsecs.index = sum_in_microsecs.index.as_unit("ns")
tm.assert_frame_equal(sum_in_nanosecs, sum_in_microsecs)
# Test max, GH#55026
ref_dates = date_range("2023-01-01", "2023-01-10", unit="ns", tz=tz)
ref_series = Series(0, index=ref_dates)
ref_series.iloc[0] = 1
ref_max_series = ref_series.rolling(Timedelta(days=4)).max()
dates = date_range("2023-01-01", "2023-01-10", unit=unit, tz=tz)
series = Series(0, index=dates)
series.iloc[0] = 1
max_series = series.rolling(Timedelta(days=4)).max()
ref_df = DataFrame(ref_max_series)
df = DataFrame(max_series)
df.index = df.index.as_unit("ns")
tm.assert_frame_equal(ref_df, df)