You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
351 lines
12 KiB
351 lines
12 KiB
from sympy.concrete.summations import (Sum, summation)
|
|
from sympy.core.basic import Basic
|
|
from sympy.core.cache import cacheit
|
|
from sympy.core.function import Lambda
|
|
from sympy.core.numbers import I
|
|
from sympy.core.relational import (Eq, Ne)
|
|
from sympy.core.singleton import S
|
|
from sympy.core.symbol import (Dummy, symbols)
|
|
from sympy.core.sympify import sympify
|
|
from sympy.functions.combinatorial.factorials import factorial
|
|
from sympy.functions.elementary.exponential import exp
|
|
from sympy.functions.elementary.integers import floor
|
|
from sympy.functions.elementary.piecewise import Piecewise
|
|
from sympy.logic.boolalg import And
|
|
from sympy.polys.polytools import poly
|
|
from sympy.series.series import series
|
|
|
|
from sympy.polys.polyerrors import PolynomialError
|
|
from sympy.stats.crv import reduce_rational_inequalities_wrap
|
|
from sympy.stats.rv import (NamedArgsMixin, SinglePSpace, SingleDomain,
|
|
random_symbols, PSpace, ConditionalDomain, RandomDomain,
|
|
ProductDomain, Distribution)
|
|
from sympy.stats.symbolic_probability import Probability
|
|
from sympy.sets.fancysets import Range, FiniteSet
|
|
from sympy.sets.sets import Union
|
|
from sympy.sets.contains import Contains
|
|
from sympy.utilities import filldedent
|
|
from sympy.core.sympify import _sympify
|
|
|
|
|
|
class DiscreteDistribution(Distribution):
|
|
def __call__(self, *args):
|
|
return self.pdf(*args)
|
|
|
|
|
|
class SingleDiscreteDistribution(DiscreteDistribution, NamedArgsMixin):
|
|
""" Discrete distribution of a single variable.
|
|
|
|
Serves as superclass for PoissonDistribution etc....
|
|
|
|
Provides methods for pdf, cdf, and sampling
|
|
|
|
See Also:
|
|
sympy.stats.crv_types.*
|
|
"""
|
|
|
|
set = S.Integers
|
|
|
|
def __new__(cls, *args):
|
|
args = list(map(sympify, args))
|
|
return Basic.__new__(cls, *args)
|
|
|
|
@staticmethod
|
|
def check(*args):
|
|
pass
|
|
|
|
@cacheit
|
|
def compute_cdf(self, **kwargs):
|
|
""" Compute the CDF from the PDF.
|
|
|
|
Returns a Lambda.
|
|
"""
|
|
x = symbols('x', integer=True, cls=Dummy)
|
|
z = symbols('z', real=True, cls=Dummy)
|
|
left_bound = self.set.inf
|
|
|
|
# CDF is integral of PDF from left bound to z
|
|
pdf = self.pdf(x)
|
|
cdf = summation(pdf, (x, left_bound, floor(z)), **kwargs)
|
|
# CDF Ensure that CDF left of left_bound is zero
|
|
cdf = Piecewise((cdf, z >= left_bound), (0, True))
|
|
return Lambda(z, cdf)
|
|
|
|
def _cdf(self, x):
|
|
return None
|
|
|
|
def cdf(self, x, **kwargs):
|
|
""" Cumulative density function """
|
|
if not kwargs:
|
|
cdf = self._cdf(x)
|
|
if cdf is not None:
|
|
return cdf
|
|
return self.compute_cdf(**kwargs)(x)
|
|
|
|
@cacheit
|
|
def compute_characteristic_function(self, **kwargs):
|
|
""" Compute the characteristic function from the PDF.
|
|
|
|
Returns a Lambda.
|
|
"""
|
|
x, t = symbols('x, t', real=True, cls=Dummy)
|
|
pdf = self.pdf(x)
|
|
cf = summation(exp(I*t*x)*pdf, (x, self.set.inf, self.set.sup))
|
|
return Lambda(t, cf)
|
|
|
|
def _characteristic_function(self, t):
|
|
return None
|
|
|
|
def characteristic_function(self, t, **kwargs):
|
|
""" Characteristic function """
|
|
if not kwargs:
|
|
cf = self._characteristic_function(t)
|
|
if cf is not None:
|
|
return cf
|
|
return self.compute_characteristic_function(**kwargs)(t)
|
|
|
|
@cacheit
|
|
def compute_moment_generating_function(self, **kwargs):
|
|
t = Dummy('t', real=True)
|
|
x = Dummy('x', integer=True)
|
|
pdf = self.pdf(x)
|
|
mgf = summation(exp(t*x)*pdf, (x, self.set.inf, self.set.sup))
|
|
return Lambda(t, mgf)
|
|
|
|
def _moment_generating_function(self, t):
|
|
return None
|
|
|
|
def moment_generating_function(self, t, **kwargs):
|
|
if not kwargs:
|
|
mgf = self._moment_generating_function(t)
|
|
if mgf is not None:
|
|
return mgf
|
|
return self.compute_moment_generating_function(**kwargs)(t)
|
|
|
|
@cacheit
|
|
def compute_quantile(self, **kwargs):
|
|
""" Compute the Quantile from the PDF.
|
|
|
|
Returns a Lambda.
|
|
"""
|
|
x = Dummy('x', integer=True)
|
|
p = Dummy('p', real=True)
|
|
left_bound = self.set.inf
|
|
pdf = self.pdf(x)
|
|
cdf = summation(pdf, (x, left_bound, x), **kwargs)
|
|
set = ((x, p <= cdf), )
|
|
return Lambda(p, Piecewise(*set))
|
|
|
|
def _quantile(self, x):
|
|
return None
|
|
|
|
def quantile(self, x, **kwargs):
|
|
""" Cumulative density function """
|
|
if not kwargs:
|
|
quantile = self._quantile(x)
|
|
if quantile is not None:
|
|
return quantile
|
|
return self.compute_quantile(**kwargs)(x)
|
|
|
|
def expectation(self, expr, var, evaluate=True, **kwargs):
|
|
""" Expectation of expression over distribution """
|
|
# TODO: support discrete sets with non integer stepsizes
|
|
|
|
if evaluate:
|
|
try:
|
|
p = poly(expr, var)
|
|
|
|
t = Dummy('t', real=True)
|
|
|
|
mgf = self.moment_generating_function(t)
|
|
deg = p.degree()
|
|
taylor = poly(series(mgf, t, 0, deg + 1).removeO(), t)
|
|
result = 0
|
|
for k in range(deg+1):
|
|
result += p.coeff_monomial(var ** k) * taylor.coeff_monomial(t ** k) * factorial(k)
|
|
|
|
return result
|
|
|
|
except PolynomialError:
|
|
return summation(expr * self.pdf(var),
|
|
(var, self.set.inf, self.set.sup), **kwargs)
|
|
|
|
else:
|
|
return Sum(expr * self.pdf(var),
|
|
(var, self.set.inf, self.set.sup), **kwargs)
|
|
|
|
def __call__(self, *args):
|
|
return self.pdf(*args)
|
|
|
|
|
|
class DiscreteDomain(RandomDomain):
|
|
"""
|
|
A domain with discrete support with step size one.
|
|
Represented using symbols and Range.
|
|
"""
|
|
is_Discrete = True
|
|
|
|
class SingleDiscreteDomain(DiscreteDomain, SingleDomain):
|
|
def as_boolean(self):
|
|
return Contains(self.symbol, self.set)
|
|
|
|
|
|
class ConditionalDiscreteDomain(DiscreteDomain, ConditionalDomain):
|
|
"""
|
|
Domain with discrete support of step size one, that is restricted by
|
|
some condition.
|
|
"""
|
|
@property
|
|
def set(self):
|
|
rv = self.symbols
|
|
if len(self.symbols) > 1:
|
|
raise NotImplementedError(filldedent('''
|
|
Multivariate conditional domains are not yet implemented.'''))
|
|
rv = list(rv)[0]
|
|
return reduce_rational_inequalities_wrap(self.condition,
|
|
rv).intersect(self.fulldomain.set)
|
|
|
|
|
|
class DiscretePSpace(PSpace):
|
|
is_real = True
|
|
is_Discrete = True
|
|
|
|
@property
|
|
def pdf(self):
|
|
return self.density(*self.symbols)
|
|
|
|
def where(self, condition):
|
|
rvs = random_symbols(condition)
|
|
assert all(r.symbol in self.symbols for r in rvs)
|
|
if len(rvs) > 1:
|
|
raise NotImplementedError(filldedent('''Multivariate discrete
|
|
random variables are not yet supported.'''))
|
|
conditional_domain = reduce_rational_inequalities_wrap(condition,
|
|
rvs[0])
|
|
conditional_domain = conditional_domain.intersect(self.domain.set)
|
|
return SingleDiscreteDomain(rvs[0].symbol, conditional_domain)
|
|
|
|
def probability(self, condition):
|
|
complement = isinstance(condition, Ne)
|
|
if complement:
|
|
condition = Eq(condition.args[0], condition.args[1])
|
|
try:
|
|
_domain = self.where(condition).set
|
|
if condition == False or _domain is S.EmptySet:
|
|
return S.Zero
|
|
if condition == True or _domain == self.domain.set:
|
|
return S.One
|
|
prob = self.eval_prob(_domain)
|
|
except NotImplementedError:
|
|
from sympy.stats.rv import density
|
|
expr = condition.lhs - condition.rhs
|
|
dens = density(expr)
|
|
if not isinstance(dens, DiscreteDistribution):
|
|
from sympy.stats.drv_types import DiscreteDistributionHandmade
|
|
dens = DiscreteDistributionHandmade(dens)
|
|
z = Dummy('z', real=True)
|
|
space = SingleDiscretePSpace(z, dens)
|
|
prob = space.probability(condition.__class__(space.value, 0))
|
|
if prob is None:
|
|
prob = Probability(condition)
|
|
return prob if not complement else S.One - prob
|
|
|
|
def eval_prob(self, _domain):
|
|
sym = list(self.symbols)[0]
|
|
if isinstance(_domain, Range):
|
|
n = symbols('n', integer=True)
|
|
inf, sup, step = (r for r in _domain.args)
|
|
summand = ((self.pdf).replace(
|
|
sym, n*step))
|
|
rv = summation(summand,
|
|
(n, inf/step, (sup)/step - 1)).doit()
|
|
return rv
|
|
elif isinstance(_domain, FiniteSet):
|
|
pdf = Lambda(sym, self.pdf)
|
|
rv = sum(pdf(x) for x in _domain)
|
|
return rv
|
|
elif isinstance(_domain, Union):
|
|
rv = sum(self.eval_prob(x) for x in _domain.args)
|
|
return rv
|
|
|
|
def conditional_space(self, condition):
|
|
# XXX: Converting from set to tuple. The order matters to Lambda
|
|
# though so we should be starting with a set...
|
|
density = Lambda(tuple(self.symbols), self.pdf/self.probability(condition))
|
|
condition = condition.xreplace({rv: rv.symbol for rv in self.values})
|
|
domain = ConditionalDiscreteDomain(self.domain, condition)
|
|
return DiscretePSpace(domain, density)
|
|
|
|
class ProductDiscreteDomain(ProductDomain, DiscreteDomain):
|
|
def as_boolean(self):
|
|
return And(*[domain.as_boolean for domain in self.domains])
|
|
|
|
class SingleDiscretePSpace(DiscretePSpace, SinglePSpace):
|
|
""" Discrete probability space over a single univariate variable """
|
|
is_real = True
|
|
|
|
@property
|
|
def set(self):
|
|
return self.distribution.set
|
|
|
|
@property
|
|
def domain(self):
|
|
return SingleDiscreteDomain(self.symbol, self.set)
|
|
|
|
def sample(self, size=(), library='scipy', seed=None):
|
|
"""
|
|
Internal sample method.
|
|
|
|
Returns dictionary mapping RandomSymbol to realization value.
|
|
"""
|
|
return {self.value: self.distribution.sample(size, library=library, seed=seed)}
|
|
|
|
def compute_expectation(self, expr, rvs=None, evaluate=True, **kwargs):
|
|
rvs = rvs or (self.value,)
|
|
if self.value not in rvs:
|
|
return expr
|
|
|
|
expr = _sympify(expr)
|
|
expr = expr.xreplace({rv: rv.symbol for rv in rvs})
|
|
|
|
x = self.value.symbol
|
|
try:
|
|
return self.distribution.expectation(expr, x, evaluate=evaluate,
|
|
**kwargs)
|
|
except NotImplementedError:
|
|
return Sum(expr * self.pdf, (x, self.set.inf, self.set.sup),
|
|
**kwargs)
|
|
|
|
def compute_cdf(self, expr, **kwargs):
|
|
if expr == self.value:
|
|
x = Dummy("x", real=True)
|
|
return Lambda(x, self.distribution.cdf(x, **kwargs))
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
def compute_density(self, expr, **kwargs):
|
|
if expr == self.value:
|
|
return self.distribution
|
|
raise NotImplementedError()
|
|
|
|
def compute_characteristic_function(self, expr, **kwargs):
|
|
if expr == self.value:
|
|
t = Dummy("t", real=True)
|
|
return Lambda(t, self.distribution.characteristic_function(t, **kwargs))
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
def compute_moment_generating_function(self, expr, **kwargs):
|
|
if expr == self.value:
|
|
t = Dummy("t", real=True)
|
|
return Lambda(t, self.distribution.moment_generating_function(t, **kwargs))
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
def compute_quantile(self, expr, **kwargs):
|
|
if expr == self.value:
|
|
p = Dummy("p", real=True)
|
|
return Lambda(p, self.distribution.quantile(p, **kwargs))
|
|
else:
|
|
raise NotImplementedError()
|