import torch from torch import nan from torch.distributions import constraints from torch.distributions.distribution import Distribution from torch.distributions.utils import lazy_property, logits_to_probs, probs_to_logits __all__ = ["Categorical"] class Categorical(Distribution): r""" Creates a categorical distribution parameterized by either :attr:`probs` or :attr:`logits` (but not both). .. note:: It is equivalent to the distribution that :func:`torch.multinomial` samples from. Samples are integers from :math:`\{0, \ldots, K-1\}` where `K` is ``probs.size(-1)``. If `probs` is 1-dimensional with length-`K`, each element is the relative probability of sampling the class at that index. If `probs` is N-dimensional, the first N-1 dimensions are treated as a batch of relative probability vectors. .. note:: The `probs` argument must be non-negative, finite and have a non-zero sum, and it will be normalized to sum to 1 along the last dimension. :attr:`probs` will return this normalized value. The `logits` argument will be interpreted as unnormalized log probabilities and can therefore be any real number. It will likewise be normalized so that the resulting probabilities sum to 1 along the last dimension. :attr:`logits` will return this normalized value. See also: :func:`torch.multinomial` Example:: >>> # xdoctest: +IGNORE_WANT("non-deterministic") >>> m = Categorical(torch.tensor([ 0.25, 0.25, 0.25, 0.25 ])) >>> m.sample() # equal probability of 0, 1, 2, 3 tensor(3) Args: probs (Tensor): event probabilities logits (Tensor): event log probabilities (unnormalized) """ arg_constraints = {"probs": constraints.simplex, "logits": constraints.real_vector} has_enumerate_support = True def __init__(self, probs=None, logits=None, validate_args=None): if (probs is None) == (logits is None): raise ValueError( "Either `probs` or `logits` must be specified, but not both." ) if probs is not None: if probs.dim() < 1: raise ValueError("`probs` parameter must be at least one-dimensional.") self.probs = probs / probs.sum(-1, keepdim=True) else: if logits.dim() < 1: raise ValueError("`logits` parameter must be at least one-dimensional.") # Normalize self.logits = logits - logits.logsumexp(dim=-1, keepdim=True) self._param = self.probs if probs is not None else self.logits self._num_events = self._param.size()[-1] batch_shape = ( self._param.size()[:-1] if self._param.ndimension() > 1 else torch.Size() ) super().__init__(batch_shape, validate_args=validate_args) def expand(self, batch_shape, _instance=None): new = self._get_checked_instance(Categorical, _instance) batch_shape = torch.Size(batch_shape) param_shape = batch_shape + torch.Size((self._num_events,)) if "probs" in self.__dict__: new.probs = self.probs.expand(param_shape) new._param = new.probs if "logits" in self.__dict__: new.logits = self.logits.expand(param_shape) new._param = new.logits new._num_events = self._num_events super(Categorical, new).__init__(batch_shape, validate_args=False) new._validate_args = self._validate_args return new def _new(self, *args, **kwargs): return self._param.new(*args, **kwargs) @constraints.dependent_property(is_discrete=True, event_dim=0) def support(self): return constraints.integer_interval(0, self._num_events - 1) @lazy_property def logits(self): return probs_to_logits(self.probs) @lazy_property def probs(self): return logits_to_probs(self.logits) @property def param_shape(self): return self._param.size() @property def mean(self): return torch.full( self._extended_shape(), nan, dtype=self.probs.dtype, device=self.probs.device, ) @property def mode(self): return self.probs.argmax(axis=-1) @property def variance(self): return torch.full( self._extended_shape(), nan, dtype=self.probs.dtype, device=self.probs.device, ) def sample(self, sample_shape=torch.Size()): if not isinstance(sample_shape, torch.Size): sample_shape = torch.Size(sample_shape) probs_2d = self.probs.reshape(-1, self._num_events) samples_2d = torch.multinomial(probs_2d, sample_shape.numel(), True).T return samples_2d.reshape(self._extended_shape(sample_shape)) def log_prob(self, value): if self._validate_args: self._validate_sample(value) value = value.long().unsqueeze(-1) value, log_pmf = torch.broadcast_tensors(value, self.logits) value = value[..., :1] return log_pmf.gather(-1, value).squeeze(-1) def entropy(self): min_real = torch.finfo(self.logits.dtype).min logits = torch.clamp(self.logits, min=min_real) p_log_p = logits * self.probs return -p_log_p.sum(-1) def enumerate_support(self, expand=True): num_events = self._num_events values = torch.arange(num_events, dtype=torch.long, device=self._param.device) values = values.view((-1,) + (1,) * len(self._batch_shape)) if expand: values = values.expand((-1,) + self._batch_shape) return values