Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion gemact/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,7 @@
'pareto2': 'distributions.Pareto2',
'pareto1': 'distributions.Pareto1',
'uniform': 'distributions.Uniform',
'multinomial': 'distributions.Multinomial'
'multinomial': 'distributions.Multinomial',
'dirichlet multinomial' : 'distributions.Dirichlet_Multinomial',
'negative multinomial' : 'distributions.NegMultinom'
}
343 changes: 342 additions & 1 deletion gemact/distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6261,7 +6261,7 @@ def name():

@staticmethod
def category():
return {'frequency'}
return {''}


def cov(self):
Expand Down Expand Up @@ -6326,4 +6326,345 @@ def logpmf(self, x):
return stats.multinomial.logpmf(n=self.n, p=self.p, x=x)



class Dirichlet_Multinomial(_MultDiscreteDistribution):
"""
Dirichlet Multinomial distribution.
Wrapper to scipy dirichlet multinomial distribution (``scipy.stats.dirichlet_multinomial``).
Refer to :py:class:'~__MultDiscreteDistribution' for additional details.


:param seed: Used to set a specific seed (default=np.random.RandomState).
:type seed: int
:param \\**kwargs:
See below

:Keyword Arguments:
* *alpha* ( ``int`` or ``numpy.ndarray``) --
Concentration parameters.
* *n* (``int``) --
Number of trials.
"""

def __init__(self, seed=None, **kwargs):
_DiscreteDistribution.__init__(self)
self.n = kwargs['n']
self.alpha = kwargs['alpha']
self.seed = seed

@property
def seed(self):
return self.__seed

@seed.setter
def seed(self, value):
if value is None:
value = np.random.randint(1, 1001)

hf.assert_type_value(value, 'seed', logger, (float,int))
value = int(value)
self.__seed = value

@property
def n(self):
return self.__n

@n.setter
def n(self, value):
hf.assert_type_value(value, 'n', logger, (float, int), lower_bound=1, lower_close=True)
value = int(value)
self.__n = value

@property
def alpha(self):
return self.__alpha

@alpha.setter
def alpha(self, value):
for element in value:
hf.assert_type_value(element, 'alpha', logger, (float, int))
value = np.array(value)
self.__alpha = value

@property
def _dist(self):
return stats.dirichlet_multinomial(n=self.n, alpha=self.alpha, seed=self.seed)

@staticmethod
def name():
return 'dirichlet multinomial'

@staticmethod
def category():
return {''}

def cov(self):
"""
Covariance Matrix of a Dirichlet Multinomial Distribution.

:return: Covariance Matrix.
:rtype: ``float``
"""

return stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha)


def var(self):
"""
Variances of a Dirichlet Multinomial Distribution.

:return: Array of Variances.
:rtype: numpy.ndarray
"""

return np.diag(stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha))

def pmf(self, x):
"""
Probability mass function of the Dirichlet Multinomial Distribution.

:param x: quantile where probability mass function is evaluated.
:type x: ``int``

:return: probability mass function.
:rtype: ``numpy.float64`` or ``numpy.ndarray``
"""

if sum(x) != self.n:
raise ValueError("n != sum(x), i.e. one is wrong")
return stats.dirichlet_multinomial.pmf(n=self.n, alpha=self.alpha, x=x)

def logpmf(self, x):
"""
Natural logarithm of the probability mass function of the Dirichlet Multinomial Distribution.

:param x: quantile where the (natural) probability mass function logarithm is evaluated.
:type x: ``int``
:return: natural logarithm of the probability mass function
:rtype: ``numpy.float64`` or ``numpy.ndarray``
"""

if sum(x) != self.n:
raise ValueError("n != sum(x), i.e. one is wrong")
return stats.dirichlet_multinomial.logpmf(n=self.n, alpha=self.alpha, x=x)


def mean(self):
"""
Mean of the Dirichlet Multinomial Distribution.

:param x: quantile where the (natural) probability mass function logarithm is evaluated.
:type x: ``int``
:return: natural logarithm of the probability mass function
:rtype: ``numpy.float64`` or ``numpy.ndarray``
"""
return stats.dirichlet_multinomial.mean(n=self.n, alpha=self.alpha)


def rvs(self, size=1, random_state=None):
"""
Random variates generator function.

:param size: random variates sample size (default is 1).
:type size: ``int``, optional
:param random_state: random state for the random number generator.
:type random_state: ``int``, optional
:return: random variates.
:rtype: ``numpy.int`` or ``numpy.ndarray``

"""

random_state = hf.handle_random_state(random_state, logger)
np.random.seed(random_state)
hf.assert_type_value(size, 'size', logger, (float, int), lower_bound=1)
size = int(size)
alpha = self.alpha
n = self.n
if isinstance(alpha, np.ndarray) and len(alpha.shape) == 1:
alpha = np.tile(alpha, (size, 1))
n = np.full(size, n)
G = np.random.gamma(shape=alpha, scale=1.0)
prob = G / np.sum(G, axis=1, keepdims=True)
ridx = np.sum(G, axis=1) == 0
if np.any(ridx):
for i in np.where(ridx)[0]:
prob[i, :] = np.random.multinomial(1, alpha[i, :] / np.sum(alpha[i, :]), n=1).flatten()
rdm = np.array([np.random.multinomial(n[i], prob[i, :]) for i in range(size)])
return rdm


class NegMultinom(_MultDiscreteDistribution):

"""
Negative Multinomial distribution.

:param loc: Location parameter to shift the support (default=0).
:type loc: ``int``, optional

:param \\**kwargs:
See below

:Keyword Arguments:
* *x0* (``int``) --
Size parameter of the negative multinomial distribution.
* *p* (``float``) --
Probability parameter of the negative multinomial distribution.

"""

def __init__(self, x0, p, loc=0):
_DiscreteDistribution.__init__(self)
self.x0 = x0
self.p = p
self.loc = loc

@property
def x0(self):
return self.__x0

@x0.setter
def x0(self, value):
hf.assert_type_value(value, 'x0', logger, (float, int), lower_bound=0, lower_close=False)
self.__x0 = value

@property
def p(self):
return self.__p

@p.setter
def p(self, value):
value = np.array(value)
for element in value:
hf.assert_type_value(
element, 'p', logger, (float, np.floating),
lower_bound=0, upper_bound=1, lower_close=True, upper_close=True
)

if np.sum(value) >= 1:
raise ValueError("Sum of success probabilities must be less than 1")

self.__p = value
self.__p0 = 1 - np.sum(value) # Failure probability

@property
def p0(self):
"""Probability of failure (computed as 1 - sum(p))"""
return self.__p0

@staticmethod
def name():
return 'negative multinomial'

@staticmethod
def category():
return {''}

def pmf(self, x):
"""
Probability mass function.

PMF formula from reference:
Γ(∑x_i) * p0^x0 / Γ(x0) * ∏(p_i^x_i / x_i!)

:param x: Quantile where PMF is evaluated.
:type x: ``numpy.ndarray``
:return: Probability mass function evaluated at x.
:rtype: ``numpy.float64``
"""
x = np.array(x)
x0 = self.x0

gamma_term = (special.gamma(np.sum(x)+x0) / special.gamma(x0))
prob_term = (self.p0 ** x0) * np.prod((self.p ** x) / special.factorial(x))

return gamma_term * prob_term


def logpmf(self, x):
"""
Natural logarithm of the probability mass function.

:param x: Quantile where log-PMF is evaluated.
:type x: ``numpy.ndarray``
:return: Log of probability mass function evaluated at x.
:rtype: ``numpy.float64``
"""
return np.log(self.pmf(x))

def mean(self):
"""
Mean vector of the distribution.

:return: Mean vector.
:rtype: ``numpy.ndarray``
"""
return (self.x0 / self.p0) * self.p

def var(self):
"""
Variances of a Negative Multinomial Distribution.

:return: Array of Variances.
:rtype: ``numpy.ndarray``
"""
return (self.x0 / self.p0**2) * self.p**2 + (self.x0 / self.p0) * self.p

def cov(self):
"""
Covariance matrix of a Negative Multinomial Distribution.

:return: Covariance matrix.
:rtype: ``numpy.ndarray``
"""
p = self.p
x0 = self.x0
p0 = self.p0

# Diagonal terms
diag = (x0 / p0**2) * p**2 + (x0 / p0) * p

# Off-diagonal terms
off_diag = (x0 / p0**2) * np.outer(p, p)

# Create full covariance matrix
cov_matrix = np.diag(diag) + off_diag - np.diag(np.diag(off_diag))

return cov_matrix

def rvs(self, size=1, random_state=None):
"""
Random variates generator function.

:param size: Number of random variates to generate (default=1).
:type size: ``int``
:param random_state: Random state for reproducibility.
:type random_state: ``int``, optional
:return: Random variates.
:rtype: ``numpy.ndarray``
"""
random_state = hf.handle_random_state(random_state, logger)
np.random.seed(random_state)

samples = []
for _ in range(size):
total = np.random.negative_binomial(self.x0, self.p0)
if total > 0:
counts = np.random.multinomial(total, self.p / (1 - self.p0))
else:
counts = np.zeros_like(self.p)
samples.append(counts)

return np.array(samples)

def mgf(self, t):
"""
Moment generating function.

:param t: Vector where MGF is evaluated.
:type t: ``numpy.ndarray``
:return: Moment generating function evaluated at t.
:rtype: ``numpy.float64``
"""
exponent = np.sum(self.p * np.exp(t))
return (self.p0 / (1 - exponent)) ** self.x0

2 changes: 2 additions & 0 deletions gemact/libraries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
import timeit
from twiggy import quick_setup, log
from itertools import groupby
from scipy.special import gammaln
import math as mt