"""
This module contains the base class for all densities as used in the web
application, as well as all of its concrete implementations. Also, it
contains enumerations and typings that describe datasets.
"""
import pandas as pd
import numpy as np
from abc import ABC
from typing import Callable, Iterable, Literal, Union, TypedDict
from typing_extensions import Self
from nptyping import NDArray, Shape, Float
from itertools import combinations
from metrics_as_scores.distribution.fitting import StatisticalTest
from metrics_as_scores.tools.funcs import cdf_to_ppf
from metrics_as_scores.distribution.fitting import StatTest_Types
from statsmodels.distributions import ECDF as SMEcdf
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from scipy.stats import gaussian_kde, f_oneway, mode, kruskal
from scipy.integrate import quad
from scipy.optimize import direct
from scipy.stats import ks_2samp, ttest_ind
from scipy.stats._distn_infrastructure import rv_generic, rv_continuous
from joblib import Parallel, delayed
from tqdm import tqdm
from strenum import StrEnum
[docs]class JsonDataset(TypedDict):
"""
This class is the base class for the :py:class:`LocalDataset` and the
:py:class:`KnownDataset`. Each manifest should have a name, id, description,
and author.
"""
name: str
desc: str
id: str
author: list[str]
[docs]class LocalDataset(JsonDataset):
"""
This dataset extends the :py:class:`JsonDataset` and adds properties that
are filled out when locally creating a new dataset.
"""
origin: str
colname_data: str
colname_type: str
colname_context: str
qtypes: dict[str, Literal['continuous', 'discrete']]
desc_qtypes: dict[str, str]
contexts: list[str]
desc_contexts: dict[str, str]
ideal_values: dict[str, Union[int, float]]
[docs]class KnownDataset(JsonDataset):
"""
This dataset extends the :py:class:`JsonDataset` with properties that are
known about datasets that are available to Metrics As Scores online.
"""
info_url: str
download: str
size: int
size_extracted: int
[docs]class Density(ABC):
"""
This is the abstract base class for parametric and empirical densities. A
:py:class:`Density` represents a concrete instance of some random variable
and its PDF, CDF, and PPF. It also stores information about this concrete
instance came to be (e.g., by some concrete transform).
This class provides a set of common getters and setters and also provides
some often needed conveniences, such as computing the practical domain.
As for the PDF, CDF, and PPF, all known sub-classes have a specific way of
obtaining these, and this class' responsibility lies in vectorizing these
functions.
"""
[docs] def __init__(
self,
range: tuple[float, float],
pdf: Callable[[float], float],
cdf: Callable[[float], float],
ppf: Callable[[float], float]=None,
ideal_value: float=None,
dist_transform: DistTransform=DistTransform.NONE,
transform_value: float=None,
qtype: str=None,
context: str=None,
**kwargs
) -> None:
"""
range: ``tuple[float, float]``
The range of the data.
pdf: ``Callable[[float], float]``
The probability density function.
cdf: ``Callable[[float], float]``
The cumulative distribution function.
ppf: ``Callable[[float], float]``
The percent point (quantile) function.
ideal_value: ``float``
Some quantities have an ideal value. It can be provided here.
dist_transform: ``DistTransform``
The data transform that was applied while obtaining this density.
transform_value: ``float``
Optional transform value that was applied during transformation.
qtype: ``str``
The type of quantity for this density.
context: ``str``
The context of this quantity.
"""
self.range = range
self._pdf = pdf
self._cdf = cdf
self._ppf = ppf
self._ideal_value = ideal_value
self._dist_transform = dist_transform
self._transform_value: float = None
self._qtype = qtype
self._context = context
self._practical_domain: tuple[float, float] = None
self._practical_range_pdf: tuple[float, float] = None
self.transform_value = transform_value
self.pdf = np.vectorize(self._pdf)
self.cdf = np.vectorize(self._min_max)
if ppf is None:
self.ppf = lambda *args: exec('raise(NotImplementedError())')
else:
self.ppf = np.vectorize(self._ppf)
@property
def qtype(self) -> Union[str, None]:
"""Getter for the quantity type."""
return self._qtype
@property
def context(self) -> Union[str, None]:
"""Getter for the context."""
return self._context
@property
def ideal_value(self) -> Union[float, None]:
"""Getter for the ideal value (if any)."""
return self._ideal_value
@property
def dist_transform(self) -> DistTransform:
"""Getter for the data transformation."""
return self._dist_transform
@property
def transform_value(self) -> Union[float, None]:
"""Getter for the used transformation value (if any)."""
return self._transform_value
@transform_value.setter
def transform_value(self, value: Union[float, None]) -> Self:
"""Setter for the used transformation value."""
self._transform_value = value
return self
[docs] def _min_max(self, x: float) -> float:
"""
Used to safely vectorize a CDF, such that it returns `0.0` for when
`x` lies before our range, and `1.0` if `x` lies beyond our range.
x: ``float``
The `x` to obtain the CDF's `y` for.
:return:
A value in the range :math:`[0,1]`.
"""
return np.clip(a=self._cdf(x), a_min=0.0, a_max=1.0)
[docs] def compute_practical_domain(self, cutoff: float=0.995) -> tuple[float, float]:
"""
It is quite common that domains extend into distant regions to accommodate
even the farthest outliers. This is often counter-productive, especially in
the web application. There, we often want to show most of the distribution,
so we compute a practical range that cuts off the most extreme outliers.
This is useful to showing some default window.
cutoff: ``float``
The percentage of values to include. The CDF is optimized to find some `x`
for which it peaks at the cutoff. For the lower bound, we subtract from
CDF the cutoff.
:rtype: tuple[float, float]
:return:
The practical domain, cut off for both directions.
"""
def obj_lb(x):
return np.square(self.cdf(x) - (1. - cutoff))
def obj_ub(x):
return np.square(self.cdf(x) - cutoff)
r = self.range
m_lb = direct(func=obj_lb, bounds=(r,), f_min=0.)
m_ub = direct(func=obj_ub, bounds=(r,), f_min=0.)
return (m_lb.x[0], m_ub.x[0])
@property
def practical_domain(self) -> tuple[float, float]:
"""
Getter for the practical domain. This is a lazy getter that only
computes the practical domain if it was not done before.
"""
if self._practical_domain is None:
self._practical_domain = self.compute_practical_domain()
return self._practical_domain
[docs] def compute_practical_range_pdf(self) -> tuple[float, float]:
"""
Similar to :py:meth:compute_practical_domain(), this method computes a practical
range for the PDF. This method determines the location of the PDF's highest mode.
:return:
Returns a tuple where the first element is always `0.0` and the second is
the `y` of the highest mode (i.e., returns the `y` of the mode, not `x`, its
location).
"""
def obj(x):
return -1. * np.log(1. + self.pdf(x))
m = direct(func=obj, bounds=(self.range,), locally_biased=False)#, maxiter=15)
return (0., self.pdf(m.x[0])[0])
@property
def practical_range_pdf(self) -> tuple[float, float]:
"""
Lazy getter for the practical range of the PDF.
"""
if self._practical_range_pdf is None:
self._practical_range_pdf = self.compute_practical_range_pdf()
return self._practical_range_pdf
[docs] def __call__(self, x: Union[float, list[float], NDArray[Shape["*"], Float]]) -> NDArray[Shape["*"], Float]:
"""
Allow objects of type :py:class:`Density` to be callable. Calls the vectorized
CDF under the hood.
"""
if np.isscalar(x) or isinstance(x, list):
x = np.asarray(x)
return self.cdf(x)
[docs]class KDE_integrate(Density):
r"""
The purpose of this class is to use an empirical (typically Gaussian) PDF and to
also provide a smooth CDF that is obtained by integrating the PDF:
:math:`F_X(x)=\int_{-\infty}^{x} f_X(t) dt`.
While this kind of CDF is smooth and precise, evaluating it is obviously slow.
Therefore, :py:class:`KDE_approx` is used in practice.
"""
[docs] def __init__(self, data: NDArray[Shape["*"], Float], ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
self._data = data
self._kde = gaussian_kde(dataset=np.asarray(data))
lb, ub = np.min(data), np.max(data)
ext = np.max(data) - lb
def pdf(x):
return self._kde.evaluate(points=np.asarray(x))
def cdf(x):
y, _ = quad(func=pdf, a=self.range[0], b=x)
return y
m_lb = direct(func=pdf, bounds=((lb - ext, lb),), f_min=1e-6)
m_ub = direct(func=pdf, bounds=((ub, ub + ext),), f_min=1e-6)
super().__init__(range=(m_lb.x, m_ub.x), pdf=pdf, cdf=cdf, ppf=None, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs)
[docs] def init_ppf(self, cdf_samples: int=100) -> 'KDE_integrate':
"""
Initializes the PPF. We get `x` and `y` from the CDF. Then, we swap the two
and interpolate a PPF. Since obtaining each `y` from the CDF means we need
to compute an integral, be careful with setting a high number of ``cdf_samples``.
cdf_samples: ``int``
The number of samples to take from the CDF (which is computed by integrating the
PDF, so be careful).
"""
self._ppf = cdf_to_ppf(cdf=self.cdf, x=self._data, y_left=np.min(self._data), y_right=np.max(self._data), cdf_samples=cdf_samples)
self.ppf = np.vectorize(self._ppf)
return self
[docs]class KDE_approx(Density):
"""
This kind of density uses Kernel Density Estimation to obtain a PDF, and an empirical
CDF (ECDF) to provide a cumulative distribution function. The advantage is that both,
PDF and CDF, are fast.
The PPF is the inverted and interpolated CDF, so it is fast, too. The data used for the
PDF is limited to 10_000 samples using deterministic sampling without replacement. The
used for CDF is obtained by sampling a large number (typically 200_000) of data points
from the Gaussian KDE, in order to make it smooth.
"""
[docs] def __init__(self, data: NDArray[Shape["*"], Float], resample_samples: int=200_000, compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
"""
For the other parameters, please refer to :py:meth:`Density.__init__()`.
resample_samples: ``int``
The amount of samples to take from the Gaussian KDE. These samples are then used
to estimate an as-smooth-as-possible CDF (and PPF thereof).
compute_ranges: ``bool``
Whether or not to compute the practical domain of the data and the practical range
of the PDF. Both of these use optimization to find the results.
"""
# First, we'll fit an extra KDE for an approximate PDF.
# It is used to also roughly estimate its mode.
rng = np.random.default_rng(seed=1)
data_pdf = data if data.shape[0] <= 10_000 else rng.choice(a=data, size=10_000, replace=False)
self._kde_for_pdf = gaussian_kde(dataset=data_pdf)
self._range_data = (np.min(data), np.max(data))
self._kde = gaussian_kde(dataset=np.asarray(data))
data = self._kde.resample(size=resample_samples, seed=1).reshape((-1,))
self._ecdf = SMEcdf(x=data)
self._ppf_interp = cdf_to_ppf(cdf=np.vectorize(self._ecdf), x=data, y_left=np.min(data), y_right=np.max(data))
super().__init__(range=(np.min(data), np.max(data)), pdf=self._pdf_from_kde, cdf=self._ecdf, ppf=self._ppf_from_ecdf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs)
self.stat_test = StatisticalTest(data1=data, cdf=self._ecdf, ppf_or_data2=self._ppf_from_ecdf)
if compute_ranges:
self.practical_domain
self.practical_range_pdf
def _pdf_from_kde(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
return self._kde_for_pdf.evaluate(points=np.asarray(x))
def _ppf_from_ecdf(self, q: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
return self._ppf_interp(q)
@property
def pval(self) -> float:
"""
Shortcut getter for the jittered, two-sample KS-test's p-value.
"""
return self.stat_test.tests['ks_2samp_jittered']['pval']
@property
def stat(self) -> float:
"""
Shortcut getter for the jittered, two-sample KS-test's test statistic (D-value).
"""
return self.stat_test.tests['ks_2samp_jittered']['stat']
[docs]class Empirical(Density):
"""
This kind of density does not apply any smoothing for CDF, but rather uses a
straightforward ECDF for the data as given. The PDF is determined using Gaussian
KDE.
"""
[docs] def __init__(self, data: NDArray[Shape["*"], Float], compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
"""
For the other parameters, please refer to :py:meth:`Density.__init__()`.
compute_ranges: ``bool``
Whether or not to compute the practical domain of the data and the practical range
of the PDF. Both of these use optimization to find the results.
"""
self._ecdf = SMEcdf(data)
self._ppf_interp = cdf_to_ppf(cdf=np.vectorize(self._ecdf), x=data, y_left=np.min(data), y_right=np.max(data))
super().__init__(range=(np.min(data), np.max(data)), pdf=gaussian_kde(dataset=data).pdf, cdf=self._ecdf, ppf=self._ppf_from_ecdf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs)
if compute_ranges:
self.practical_domain
def _ppf_from_ecdf(self, q: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
return self._ppf_interp(q)
[docs]class Empirical_discrete(Empirical):
"""
Inherits from :py:class:`Empirical` and is used when the underlying quantity is
discrete and not continuous. As PDF, this function uses a PMF that is determined
by the frequencies of each discrete datum.
"""
[docs] def __init__(self, data: NDArray[Shape["*"], Float], ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
self._data_valid = not (data.shape[0] == 1 and np.isnan(data[0]))
data_int = np.full(shape=data.shape, fill_value=np.nan) if not self._data_valid else np.rint(data).astype(int)
if self._data_valid and not np.allclose(a=data, b=data_int, rtol=1e-10, atol=1e-12):
raise Exception('The data does not appear to be integral.')
self._unique, self._counts = np.unique(data_int, return_counts=True)
self._unique, self._counts = np.full(shape=self._unique.shape, fill_value=np.nan) if not self._data_valid else self._unique.astype(int), self._counts.astype(int)
self._idx: dict[int, int] = { self._unique[i]: self._counts[i] for i in range(self._unique.shape[0]) }
if self._data_valid:
super().__init__(data=data_int, compute_ranges=False, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, **kwargs)
else:
self._dist_transform = dist_transform
self._transform_value = transform_value
self._qtype = qtype
self._context = context
self._cdf = self.cdf = self._non_fit_cdf_ppf
self._ppf = self.ppf = self._non_fit_cdf_ppf
self._pdf = self._pmf = self._pmf_from_frequencies
self.pdf = self.pmf = np.vectorize(self._pdf)
self._practical_domain = (np.min(data_int), np.max(data_int))
self._practical_range_pdf = (0., float(np.max(self._counts)) / float(np.sum(self._counts)))
@property
def is_fit(self) -> bool:
"""Returns `True` if the given data is valid."""
return self._data_valid
def _non_fit_cdf_ppf(self, x) -> NDArray[Shape["*"], Float]:
x = np.asarray([x] if np.isscalar(x) else x)
return np.asarray([np.nan] * x.shape[0])
def _pmf_from_frequencies(self, x: int) -> float:
x = int(x)
if not x in self._idx.keys():
return 0.
return float(self._idx[x]) / float(np.sum(self._counts))
[docs] @staticmethod
def unfitted(dist_transform: DistTransform) -> 'Empirical_discrete':
"""
Used to return an explicit unfit instance of :py:class:`Empirical_discrete`.
This is used when, for example, continuous (real) data is given to the
constructor. We still need an instance of this density in the web application
to show an error (e.g., that there are no discrete empirical densities for
continuous data).
"""
return Empirical_discrete(data=np.asarray([np.nan]), dist_transform=dist_transform)
[docs]class Parametric(Density):
"""
This density encapsulates a parameterized and previously fitted random variable.
Random variables in :py:mod:`scipy.stats` come with PDF/PMF, CDF, PPF, etc. so
we just use these and forward calls to them.
"""
[docs] def __init__(self, dist: rv_generic, dist_params: tuple, range: tuple[float, float], stat_tests: dict[str, float], use_stat_test: StatTest_Types='ks_2samp_jittered', compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
"""
For the other parameters, please refer to :py:meth:`Density.__init__()`.
dist: ``rv_generic``
An instance of the random variable to use.
dist_params: ``tuple``
A tuple of parameters for the random variable. The order of the parameters
is important since it is not a dictionary.
stat_tests: ``dict[str, float]``
A (flattened) dictionary of previously conducted statistical tests. This is
used later to choose some best-fitting parametric density by a specific test.
use_stat_test: ``StatTest_Types``
The name of the chosen statistical test used to determine the goodnes of fit.
compute_ranges: ``bool``
Whether or not to compute the practical domain of the data and the practical range
of the PDF. Both of these use optimization to find the results.
"""
if not isinstance(stat_tests, dict) or isinstance(stat_tests, StatisticalTest):
raise Exception(f'This class requires a dictionary of statistical tests, not an instance of {StatisticalTest.__name__}.')
for (key, val) in stat_tests.items():
if not key.endswith('_pval') and not key.endswith('_stat'):
raise Exception(f'Key not allowed: "{key}".')
if not isinstance(val, float):
raise Exception(f'Value for key "{key}" is not numeric.')
self.dist: Union[rv_generic, rv_continuous] = dist
self.stat_tests = stat_tests
self._use_stat_test = use_stat_test
self.dist_params = dist_params
super().__init__(range=range, pdf=self.pdf, cdf=self.cdf, ppf=self.ppf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, **kwargs)
if compute_ranges:
self.practical_domain
self.practical_range_pdf
[docs] @staticmethod
def unfitted(dist_transform: DistTransform) -> 'Parametric':
"""
Used to return an explicit unfit instance of :py:class:`Parametric`. This is
used in case when not a single maximum likelihood fit was successful for a
number of random variables. We still need an instance of this density in the
web application to show an error (e.g., that it was not possible to fit any
random variable to the selected quantity).
"""
from scipy.stats._continuous_distns import norm_gen
return Parametric(dist=norm_gen(), dist_params=None, range=(np.nan, np.nan), stat_tests={}, dist_transform=dist_transform)
@property
def use_stat_test(self) -> StatTest_Types:
"""Getter for the selected statistical test."""
return self._use_stat_test
@use_stat_test.setter
def use_stat_test(self, st: StatTest_Types) -> Self:
"""Setter for the type of statistical test to use."""
self._use_stat_test = st
return self
@property
def pval(self) -> float:
"""Shortcut getter for the p-value of the selected statistical test."""
if not self.is_fit:
raise Exception('Cannot return p-value for non-fitted random variable.')
return self.stat_tests[f'{self.use_stat_test}_pval']
@property
def stat(self) -> float:
"""Shortcut getter for the test statistic of the selected statistical test."""
if not self.is_fit:
raise Exception('Cannot return statistical test statistic for non-fitted random variable.')
return self.stat_tests[f'{self.use_stat_test}_stat']
@property
def is_fit(self) -> bool:
"""Returns `True` if this instance is not an explicitly unfit instance."""
return not self.dist_params is None and not np.any(np.isnan(self.dist_params))
@property
def practical_domain(self) -> tuple[float, float]:
"""Overridden to return a practical domain of :math:`[0,0]` in case this instance is unfit."""
if not self.is_fit:
return (0., 0.)
return super().practical_domain
@property
def practical_range_pdf(self) -> tuple[float, float]:
"""Overridden to return a practical PDF range of :math:`[0,0]` in case this instance is unfit."""
if not self.is_fit:
return (0., 0.)
return super().practical_range_pdf
@property
def dist_name(self) -> str:
"""Shortcut getter for the this density's random variable's class' name."""
return self.dist.__class__.__name__
[docs] def pdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
"""
Overridden to call the encapsulated distribution's PDF. If this density is unfit,
always returns an array of zeros of same shape as the input.
"""
x = np.asarray(x)
if not self.is_fit:
return np.zeros((x.size,))
return self.dist.pdf(*(x, *self.dist_params)).reshape((x.size,))
[docs] def cdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
"""
Overridden to call the encapsulated distribution's CDF. If this density is unfit,
always returns an array of zeros of same shape as the input.
"""
x = np.asarray(x)
if not self.is_fit:
return np.zeros((x.size,))
return self.dist.cdf(*(x, *self.dist_params)).reshape((x.size,))
[docs] def ppf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
"""
Overridden to call the encapsulated distribution's PPF. If this density is unfit,
always returns an array of zeros of same shape as the input.
"""
x = np.asarray(x)
if not self.is_fit:
return np.zeros((x.size,))
return self.dist.ppf(*(x, *self.dist_params)).reshape((x.size,))
[docs] def compute_practical_domain(self, cutoff: float=0.9985) -> tuple[float, float]:
"""
Overridden to exploit having available a PPF of a fitted random variable.
It can be used to find the practical domain instantaneously instead of
having to solve an optimization problem.
cutoff: ``float``
The percentage of values to include. The CDF is optimized to find some `x`
for which it peaks at the cutoff. For the lower bound, we subtract from
CDF the cutoff. Note that the default value for the cutoff was adjusted
here to extend a little beyond what is good for other types of densities.
:rtype: tuple[float, float]
:return:
The practical domain, cut off for both directions. If this random variable
is unfit, returns :py:class:`Density`'s :py:meth:`compute_practical_domain()`.
"""
if not self.is_fit:
return self.range
return (self.ppf(1.0 - cutoff)[0], self.ppf(cutoff)[0])
[docs]class Parametric_discrete(Parametric):
"""
This type of density inherits from :py:class:`Parametric` and is its counterpart
for discrete (integral) data. It adds an explicit function for the probability mass
and makes the inherited PDF return the PMF's result.
"""
[docs] def pmf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
"""
Implemented to call the encapsulated distribution's PMF. If this density is unfit,
always returns an array of zeros of same shape as the input.
"""
x = np.asarray(x)
if not self.is_fit:
return np.zeros((x.size,))
return self.dist.pmf(*(x, *self.dist_params)).reshape((x.size,))
[docs] def pdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
"""
Overridden to return the result of the :py:meth:`pmf()`. Note that in any case,
a density's function :py:meth:`pdf()` is called (i.e., the callers never call
the PMF). Therefore, it is easier catch these calls and redirect them to the PMF.
"""
return self.pmf(x=x)
[docs] @staticmethod
def unfitted(dist_transform: DistTransform) -> 'Parametric_discrete':
"""
Used to return an explicit unfit instance of :py:class:`Parametric`. This is
used in case when not a single maximum likelihood fit was successful for a
number of random variables. We still need an instance of this density in the
web application to show an error (e.g., that it was not possible to fit any
random variable to the selected quantity).
"""
from scipy.stats._continuous_distns import norm_gen
return Parametric_discrete(dist=norm_gen(), dist_params=None, range=(np.nan, np.nan), stat_tests={}, dist_transform=dist_transform)
[docs]class Dataset:
"""
This class encapsulates a local (self created) dataset and provides help with transforming
it, as well as giving some convenience getters.
"""
[docs] def __init__(self, ds: LocalDataset, df: pd.DataFrame) -> None:
self.ds = ds
self.df = df
@property
def quantity_types(self) -> list[str]:
"""Shortcut getter for the manifest's quantity types."""
return list(self.ds['qtypes'].keys())
[docs] def contexts(self, include_all_contexts: bool=False) -> Iterable[str]:
"""
Returns the manifest's defined contexts as a generator. Sometimes we need to ignore
the context and aggregate a quantity type across all defined contexts. Then, a virtual
context called `__ALL__` is used.
include_all_contexts: ``bool``
Whether to also yield the virtual `__ALL__`-context.
"""
yield from self.ds['contexts']
if include_all_contexts:
yield '__ALL__'
@property
def ideal_values(self) -> dict[str, Union[float, int, None]]:
"""Shortcut getter for the manifest's ideal values."""
return self.ds['ideal_values']
[docs] def is_qtype_discrete(self, qtype: str) -> bool:
"""
Returns whether a given quantity type is discrete.
"""
return self.ds['qtypes'][qtype] == 'discrete'
[docs] def qytpe_desc(self, qtype: str) -> str:
"""
Returns the description associated with a quantity type.
"""
return self.ds['desc_qtypes'][qtype]
[docs] def context_desc(self, context: str) -> Union[str, None]:
"""
Returns the description associated with a context (if any).
"""
return self.ds['desc_contexts'][context]
@property
def quantity_types_continuous(self) -> list[str]:
"""
Returns a list of quantity types that are continuous (real-valued).
"""
return list(filter(lambda qtype: not self.is_qtype_discrete(qtype=qtype), self.quantity_types))
@property
def quantity_types_discrete(self) -> list[str]:
"""
Returns a list of quantity types that are discrete (integer-valued).
"""
return list(filter(lambda qtype: self.is_qtype_discrete(qtype=qtype), self.quantity_types))
[docs] def data(self, qtype: str, context: Union[str, None, Literal['__ALL__']]=None, unique_vals: bool=True, sub_sample: int=None) -> NDArray[Shape["*"], Float]:
"""
This method is used to select a subset of the data, that is specific to at least
a type of quantity, and optionally to a context, too.
qtype: ``str``
The name of the quantity type to get data for.
context: ``Union[str, None, Literal['__ALL__']]``
You may specify a context to further filter the data by. Data is always specific
to a quantity type, and sometimes to a context. If not context-based filtering is
desired, pass ``None`` or ``__ALL__``.
unique_vals: ``bool``
If `True`, some small jitter will be added to the data in order to make it unique.
sub_sample: ``int``
Optional unsigned integer with number of samples to take in case the dataset is
very large. It is only applied if the number is smaller than the data's size.
"""
new_df = self.df[self.df[self.ds['colname_type']] == qtype]
if context is not None and context != '__ALL__':
# Check if context exists:
if not context in list(self.contexts(include_all_contexts=False)):
raise Exception(f'The context "{context}" is not known.')
new_df = new_df[new_df[self.ds['colname_context']] == context]
vals = new_df[self.ds['colname_data']]
if unique_vals:
rng = np.random.default_rng(seed=1_337)
r = rng.choice(a=np.linspace(1e-8, 1e-6, vals.size), size=vals.size, replace=False)
# Add small but insignificant perturbations to the data to produce unique
# values that would otherwise be eliminated by certain methods.
vals += r
vals = vals.to_numpy()
if sub_sample is not None and sub_sample < vals.size:
rng = np.random.default_rng(seed=1_338)
vals = rng.choice(a=vals, size=sub_sample, replace=False)
return vals
[docs] def num_observations(self) -> Iterable[tuple[str, str, int]]:
"""
Returns the number of observations for each quantity type in each context.
:rtype: ``Iterable[tuple[str, str, int]]``
The first element is the context, the second the quantity type, and the
third is the number of observations.
:return: Returns an iterable generator.
"""
for ctx in self.contexts(include_all_contexts=False):
for qtype in self.quantity_types:
yield (ctx, qtype, self.data(qtype=qtype, context=ctx, unique_vals=False).shape[0])
[docs] def has_sufficient_observations(self, raise_if_not: bool=True) -> bool:
"""
Helper method to check whether each quantity type in each context has at least
two observations.
raise_if_not: ``bool``
If set to `True`, will raise an exception instead of returning `False` in
case there are insufficiently many observations. The exception is more
informative as it includes the the context and quantity type.
:rtype: ``bool``
:return: A boolean indicating whether this Dataset has sufficiently many observations
for each and every quantity type.
"""
for ctx, qtype, num_obs in self.num_observations():
if num_obs < 2:
if raise_if_not:
raise Exception(f'The quantity type "{qtype}" in context "{ctx}" has insufficient ({num_obs}) observation(s).')
else:
return False
return True
[docs] def analyze_groups(self, use: Literal['anova', 'kruskal'], qtypes: Iterable[str], contexts: Iterable[str], unique_vals: bool=True) -> pd.DataFrame:
"""
For each given type of quantity, this method performs an ANOVA across all
given contexts.
use: ``Literal['anova', 'kruskal']``
Indicates which method for comparing groups to use. We can either conduct
an ANOVA or a Kruskal-Wallis test.
qtypes: ``Iterable[str]``
An iterable of quantity types to conduct the analysis for. For each given
type, a separate analysis is performed and the result appended to the
returned data frame.
contexts: ``Iterable[str]``
An iterable of contexts across which each of the quantity types shall be
analyzed.
unique_vals: ``bool``
Passed to :meth:`self.data()`. If true, than small, random, and unique
noise is added to the data before it is analyzed. This will effectively
deduplicate any samples in the data (if any).
:rtype: ``pd.DataFrame``
:return: A data frame with the columns ``qtype`` (name of the quantity type),
``stat`` (ANOVA test statistic), ``pval``, and ``across_contexts``, where
the latter is a semicolon-separated list of contexts the quantity type was
compared across.
"""
use_method: Callable = None
if use == 'anova':
use_method = f_oneway
elif use == 'kruskal':
use_method = kruskal
else:
raise Exception(f'Method "{use}" is not supported.')
# We first have to build the data; f_oneway requires *args, where each
# arg is a data series.
if len(list(qtypes)) < 1 or len(list(contexts)) < 2:
raise Exception('Requires one or quantity types and two or more contexts.')
def anova_for_qtype(qtype: str) -> dict[str, Union[str, str, float]]:
samples = ()
for ctx in contexts:
samples += (self.data(qtype=qtype, context=None if ctx == '__ALL__' else ctx, unique_vals=unique_vals),)
stat, pval = use_method(*samples)
return { 'qtype': qtype, 'stat': stat, 'pval': pval, 'across_contexts': ';'.join(contexts) }
res_dicts = Parallel(n_jobs=-1)(delayed(anova_for_qtype)(qtype) for qtype in tqdm(qtypes))
return pd.DataFrame(res_dicts)
[docs] def analyze_TukeyHSD(self, qtypes: Iterable[str]) -> pd.DataFrame:
r"""
Calculate all pairwise comparisons for the given quantity types with Tukey's
Honest Significance Test (HSD) and return the confidence intervals. For each
type of quantity, this method performs all of its associated contexts pairwise
comparisons.
For example, given a quantity :math:`Q` and its contexts :math:`C_1,C_2,C_3`,
this method will examine the pairs :math:`\left[\{C_1,C_2\},\{C_1,C_3\},\{C_2,C_3\}\right]`.
For a single type of quantity, e.g., this test is useful to understand how
different the quantity manifests across contexts. For multiple quantities, it
also allows understanding how contexts distinguish from one another, holistically.
qtypes: ``Iterable[str]``
An iterable of quantity types to conduct the analysis for. For each given
type, a separate analysis is performed and the result appended to the
returned data frame.
:rtype: ``pd.DataFrame``
:return: A data frame with the columns ``group1``, ``group2``, ``meandiff``,
``p-adj``, ``lower``, ``upper``, and ``reject``. For details see
:meth:`statsmodels.stats.multicomp.pairwise_tukeyhsd()`.
"""
if len(list(qtypes)) < 1:
raise Exception('Requires one or quantity types.')
temp = self.df.copy()
temp[self.ds['colname_context']] = '__ALL__' # Erase context
all_data = pd.concat([temp, self.df])
def tukeyHSD_for_qtype(qtype: str) -> pd.DataFrame:
data = all_data[all_data[self.ds['colname_type']] == qtype]
tukey = pairwise_tukeyhsd(endog=data[self.ds['colname_data']], groups=data[self.ds['colname_context']])
temp = tukey.summary().data
return pd.DataFrame(data=temp[1:], columns=temp[0])
res_dfs = Parallel(n_jobs=-1)(delayed(tukeyHSD_for_qtype)(qtype) for qtype in tqdm(qtypes))
return pd.concat(res_dfs)
[docs] def analyze_distr(self, qtypes: Iterable[str], use_ks_2samp: bool=True, ks2_max_samples=40_000) -> pd.DataFrame:
"""
Performs the two-sample Kolmogorov--Smirnov test or Welch's t-test for two or
more types of quantity. Performs the test for all unique pairs of quantity types.
qtypes: ``Iterable[str]``
An iterable of quantity types to test in a pair-wise manner.
use_ks_2samp: ``bool``
If `True`, use the two-sample Kolmogorov--Smirnov; Welch's t-test, otherwise.
ks2_max_samples: ``int``
Unsigned integer used to limit the number of samples used in KS2-test. For larger
numbers than the default, it may not be possible to compute it exactly.
:rtype: ``pd.DataFrame``
:return:
A data frame with columns `qtype`, `stat`, `pval`, `group1`, and `group2`.
"""
if len(list(qtypes)) < 1:
raise Exception('Requires one or more quantity types.')
unique_context_pairs: list[tuple[str, str]] = list(combinations(iterable=self.contexts(include_all_contexts=True), r=2))
def compare(qtype: str) -> pd.DataFrame:
dict_list: list[dict[str, Union[str, float]]] = [ ]
for udp in unique_context_pairs:
data1 = self.data(qtype=qtype, context=udp[0])
data2 = self.data(qtype=qtype, context=udp[1])
stat = pval = None
if use_ks_2samp:
rng = np.random.default_rng(seed=1)
data1 = data1 if data1.shape[0] <= ks2_max_samples else rng.choice(a=data1, size=ks2_max_samples, replace=False)
data2 = data2 if data2.shape[0] <= ks2_max_samples else rng.choice(a=data2, size=ks2_max_samples, replace=False)
stat, pval = ks_2samp(data1=data1, data2=data2, alternative='two-sided', method='exact')
else:
stat, pval = ttest_ind(a=data1, b=data2, equal_var=False, alternative='two-sided')
dict_list.append({
'qtype': qtype, 'stat': stat, 'pval': pval, 'group1': udp[0], 'group2': udp[1]
})
return pd.DataFrame(dict_list)
res_dfs = Parallel(n_jobs=-1)(delayed(compare)(qtype) for qtype in tqdm(self.quantity_types))
return pd.concat(res_dfs)