"""
This module contains the base class for all densities as used in the web
application, as well as all of its concrete implementations. Also, it
contains enumerations and typings that describe datasets.
"""
import pandas as pd
import numpy as np
from abc import ABC
from typing import Callable, Iterable, Literal, Union, TypedDict
from typing_extensions import Self
from nptyping import NDArray, Shape, Float
from itertools import combinations
from metrics_as_scores.distribution.fitting import StatisticalTest
from metrics_as_scores.tools.funcs import cdf_to_ppf
from metrics_as_scores.distribution.fitting import StatTest_Types
from statsmodels.distributions import ECDF as SMEcdf
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from scipy.stats import gaussian_kde, f_oneway, mode, kruskal
from scipy.integrate import quad
from scipy.optimize import direct
from scipy.stats import ks_2samp, ttest_ind
from scipy.stats._distn_infrastructure import rv_generic, rv_continuous
from joblib import Parallel, delayed
from tqdm import tqdm
from strenum import StrEnum
[docs]class JsonDataset(TypedDict):
    """
    This class is the base class for the :py:class:`LocalDataset` and the
    :py:class:`KnownDataset`. Each manifest should have a name, id, description,
    and author.
    """
    name: str
    desc: str
    id: str
    author: list[str] 
[docs]class LocalDataset(JsonDataset):
    """
    This dataset extends the :py:class:`JsonDataset` and adds properties that
    are filled out when locally creating a new dataset.
    """
    origin: str
    colname_data: str
    colname_type: str
    colname_context: str
    qtypes: dict[str, Literal['continuous', 'discrete']]
    desc_qtypes: dict[str, str]
    contexts: list[str]
    desc_contexts: dict[str, str]
    ideal_values: dict[str, Union[int, float]] 
[docs]class KnownDataset(JsonDataset):
    """
    This dataset extends the :py:class:`JsonDataset` with properties that are
    known about datasets that are available to Metrics As Scores online.
    """
    info_url: str
    download: str
    size: int
    size_extracted: int 
[docs]class Density(ABC):
    """
    This is the abstract base class for parametric and empirical densities. A
    :py:class:`Density` represents a concrete instance of some random variable
    and its PDF, CDF, and PPF. It also stores information about this concrete
    instance came to be (e.g., by some concrete transform).
    This class provides a set of common getters and setters and also provides
    some often needed conveniences, such as computing the practical domain.
    As for the PDF, CDF, and PPF, all known sub-classes have a specific way of
    obtaining these, and this class' responsibility lies in vectorizing these
    functions.
    """
[docs]    def __init__(
        self,
        range: tuple[float, float],
        pdf: Callable[[float], float],
        cdf: Callable[[float], float],
        ppf: Callable[[float], float]=None,
        ideal_value: float=None,
        dist_transform: DistTransform=DistTransform.NONE,
        transform_value: float=None,
        qtype: str=None,
        context: str=None,
        **kwargs
    ) -> None:
        """
        range: ``tuple[float, float]``
            The range of the data.
        
        pdf: ``Callable[[float], float]``
            The probability density function.
        
        cdf: ``Callable[[float], float]``
            The cumulative distribution function.
        
        ppf: ``Callable[[float], float]``
            The percent point (quantile) function.
        
        ideal_value: ``float``
            Some quantities have an ideal value. It can be provided here.
        
        dist_transform: ``DistTransform``
            The data transform that was applied while obtaining this density.
        
        transform_value: ``float``
            Optional transform value that was applied during transformation.
        
        qtype: ``str``
            The type of quantity for this density.
        
        context: ``str``
            The context of this quantity.
        """
        self.range = range
        self._pdf = pdf
        self._cdf = cdf
        self._ppf = ppf
        self._ideal_value = ideal_value
        self._dist_transform = dist_transform
        self._transform_value: float = None
        self._qtype = qtype
        self._context = context
        self._practical_domain: tuple[float, float] = None
        self._practical_range_pdf: tuple[float, float] = None
        self.transform_value = transform_value
        self.pdf = np.vectorize(self._pdf)
        self.cdf = np.vectorize(self._min_max)
        if ppf is None:
            self.ppf = lambda *args: exec('raise(NotImplementedError())')
        else:
            self.ppf = np.vectorize(self._ppf) 
    
    @property
    def qtype(self) -> Union[str, None]:
        """Getter for the quantity type."""
        return self._qtype
    
    @property
    def context(self) -> Union[str, None]:
        """Getter for the context."""
        return self._context
    @property
    def ideal_value(self) -> Union[float, None]:
        """Getter for the ideal value (if any)."""
        return self._ideal_value
    @property
    def dist_transform(self) -> DistTransform:
        """Getter for the data transformation."""
        return self._dist_transform
    @property
    def transform_value(self) -> Union[float, None]:
        """Getter for the used transformation value (if any)."""
        return self._transform_value
    
    @transform_value.setter
    def transform_value(self, value: Union[float, None]) -> Self:
        """Setter for the used transformation value."""
        self._transform_value = value
        return self
[docs]    def _min_max(self, x: float) -> float:
        """
        Used to safely vectorize a CDF, such that it returns `0.0` for when
        `x` lies before our range, and `1.0` if `x` lies beyond our range.
        x: ``float``
            The `x` to obtain the CDF's `y` for.
        
        :return:
            A value in the range :math:`[0,1]`.
        """
        return np.clip(a=self._cdf(x), a_min=0.0, a_max=1.0) 
    
[docs]    def compute_practical_domain(self, cutoff: float=0.995) -> tuple[float, float]:
        """
        It is quite common that domains extend into distant regions to accommodate
        even the farthest outliers. This is often counter-productive, especially in
        the web application. There, we often want to show most of the distribution,
        so we compute a practical range that cuts off the most extreme outliers.
        This is useful to showing some default window.
        cutoff: ``float``
            The percentage of values to include. The CDF is optimized to find some `x`
            for which it peaks at the cutoff. For the lower bound, we subtract from
            CDF the cutoff.
        
        :rtype: tuple[float, float]
        :return:
            The practical domain, cut off for both directions.
        """
        def obj_lb(x):
            return np.square(self.cdf(x) - (1. - cutoff))
        def obj_ub(x):
            return np.square(self.cdf(x) - cutoff)
        r = self.range
        m_lb = direct(func=obj_lb, bounds=(r,), f_min=0.)
        m_ub = direct(func=obj_ub, bounds=(r,), f_min=0.)
        return (m_lb.x[0], m_ub.x[0]) 
    
    @property
    def practical_domain(self) -> tuple[float, float]:
        """
        Getter for the practical domain. This is a lazy getter that only
        computes the practical domain if it was not done before.
        """
        if self._practical_domain is None:
            self._practical_domain = self.compute_practical_domain()
        return self._practical_domain
    
[docs]    def compute_practical_range_pdf(self) -> tuple[float, float]:
        """
        Similar to :py:meth:compute_practical_domain(), this method computes a practical
        range for the PDF. This method determines the location of the PDF's highest mode.
        :return:
            Returns a tuple where the first element is always `0.0` and the second is
            the `y` of the highest mode (i.e., returns the `y` of the mode, not `x`, its
            location).
        """
        def obj(x):
            return -1. * np.log(1. + self.pdf(x))
        m = direct(func=obj, bounds=(self.range,), locally_biased=False)#, maxiter=15)
        return (0., self.pdf(m.x[0])[0]) 
    
    @property
    def practical_range_pdf(self) -> tuple[float, float]:
        """
        Lazy getter for the practical range of the PDF.
        """
        if self._practical_range_pdf is None:
            self._practical_range_pdf = self.compute_practical_range_pdf()
        return self._practical_range_pdf
    
[docs]    def __call__(self, x: Union[float, list[float], NDArray[Shape["*"], Float]]) -> NDArray[Shape["*"], Float]:
        """
        Allow objects of type :py:class:`Density` to be callable. Calls the vectorized
        CDF under the hood.
        """
        if np.isscalar(x) or isinstance(x, list):
            x = np.asarray(x)
        return self.cdf(x)  
[docs]class KDE_integrate(Density):
    r"""
    The purpose of this class is to use an empirical (typically Gaussian) PDF and to
    also provide a smooth CDF that is obtained by integrating the PDF:
    :math:`F_X(x)=\int_{-\infty}^{x} f_X(t) dt`.
    While this kind of CDF is smooth and precise, evaluating it is obviously slow.
    Therefore, :py:class:`KDE_approx` is used in practice.
    """
[docs]    def __init__(self, data: NDArray[Shape["*"], Float], ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
        self._data = data
        self._kde = gaussian_kde(dataset=np.asarray(data))
        lb, ub = np.min(data), np.max(data)
        ext = np.max(data) - lb
        def pdf(x):
            return self._kde.evaluate(points=np.asarray(x))
        
        def cdf(x):
            y, _ = quad(func=pdf, a=self.range[0], b=x)
            return y
        
        m_lb = direct(func=pdf, bounds=((lb - ext, lb),), f_min=1e-6)
        m_ub = direct(func=pdf, bounds=((ub, ub + ext),), f_min=1e-6)
        super().__init__(range=(m_lb.x, m_ub.x), pdf=pdf, cdf=cdf, ppf=None, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs) 
    
[docs]    def init_ppf(self, cdf_samples: int=100) -> 'KDE_integrate':
        """
        Initializes the PPF. We get `x` and `y` from the CDF. Then, we swap the two
        and interpolate a PPF. Since obtaining each `y` from the CDF means we need
        to compute an integral, be careful with setting a high number of ``cdf_samples``.
        cdf_samples: ``int``
            The number of samples to take from the CDF (which is computed by integrating the
            PDF, so be careful).
        """
        self._ppf = cdf_to_ppf(cdf=self.cdf, x=self._data, y_left=np.min(self._data), y_right=np.max(self._data), cdf_samples=cdf_samples)
        self.ppf = np.vectorize(self._ppf)
        return self  
[docs]class KDE_approx(Density):
    """
    This kind of density uses Kernel Density Estimation to obtain a PDF, and an empirical
    CDF (ECDF) to provide a cumulative distribution function. The advantage is that both,
    PDF and CDF, are fast.
    The PPF is the inverted and interpolated CDF, so it is fast, too. The data used for the
    PDF is limited to 10_000 samples using deterministic sampling without replacement. The
    used for CDF is obtained by sampling a large number (typically 200_000) of data points
    from the Gaussian KDE, in order to make it smooth.
    """
[docs]    def __init__(self, data: NDArray[Shape["*"], Float], resample_samples: int=200_000, compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
        """
        For the other parameters, please refer to :py:meth:`Density.__init__()`.
        resample_samples: ``int``
            The amount of samples to take from the Gaussian KDE. These samples are then used
            to estimate an as-smooth-as-possible CDF (and PPF thereof).
        
        compute_ranges: ``bool``
            Whether or not to compute the practical domain of the data and the practical range
            of the PDF. Both of these use optimization to find the results.
        """
        # First, we'll fit an extra KDE for an approximate PDF.
        # It is used to also roughly estimate its mode.
        rng = np.random.default_rng(seed=1)
        data_pdf = data if data.shape[0] <= 10_000 else rng.choice(a=data, size=10_000, replace=False)
        self._kde_for_pdf = gaussian_kde(dataset=data_pdf)
        self._range_data = (np.min(data), np.max(data))   
        self._kde = gaussian_kde(dataset=np.asarray(data))
        data = self._kde.resample(size=resample_samples, seed=1).reshape((-1,))
        self._ecdf = SMEcdf(x=data)
        self._ppf_interp = cdf_to_ppf(cdf=np.vectorize(self._ecdf), x=data, y_left=np.min(data), y_right=np.max(data))
        super().__init__(range=(np.min(data), np.max(data)), pdf=self._pdf_from_kde, cdf=self._ecdf, ppf=self._ppf_from_ecdf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs)
        self.stat_test = StatisticalTest(data1=data, cdf=self._ecdf, ppf_or_data2=self._ppf_from_ecdf)
        if compute_ranges:
            self.practical_domain
            self.practical_range_pdf 
    
    def _pdf_from_kde(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        return self._kde_for_pdf.evaluate(points=np.asarray(x))
    
    def _ppf_from_ecdf(self, q: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        return self._ppf_interp(q)
    
    @property
    def pval(self) -> float:
        """
        Shortcut getter for the jittered, two-sample KS-test's p-value.
        """
        return self.stat_test.tests['ks_2samp_jittered']['pval']
    
    @property
    def stat(self) -> float:
        """
        Shortcut getter for the jittered, two-sample KS-test's test statistic (D-value).
        """
        return self.stat_test.tests['ks_2samp_jittered']['stat'] 
[docs]class Empirical(Density):
    """
    This kind of density does not apply any smoothing for CDF, but rather uses a
    straightforward ECDF for the data as given. The PDF is determined using Gaussian
    KDE.
    """
[docs]    def __init__(self, data: NDArray[Shape["*"], Float], compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
        """
        For the other parameters, please refer to :py:meth:`Density.__init__()`.
        compute_ranges: ``bool``
            Whether or not to compute the practical domain of the data and the practical range
            of the PDF. Both of these use optimization to find the results.
        """
        self._ecdf = SMEcdf(data)
        self._ppf_interp = cdf_to_ppf(cdf=np.vectorize(self._ecdf), x=data, y_left=np.min(data), y_right=np.max(data))
        super().__init__(range=(np.min(data), np.max(data)), pdf=gaussian_kde(dataset=data).pdf, cdf=self._ecdf, ppf=self._ppf_from_ecdf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, kwargs=kwargs)
        if compute_ranges:
            self.practical_domain 
    
    def _ppf_from_ecdf(self, q: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        return self._ppf_interp(q) 
[docs]class Empirical_discrete(Empirical):
    """
    Inherits from :py:class:`Empirical` and is used when the underlying quantity is
    discrete and not continuous. As PDF, this function uses a PMF that is determined
    by the frequencies of each discrete datum.
    """
[docs]    def __init__(self, data: NDArray[Shape["*"], Float], ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
        self._data_valid = not (data.shape[0] == 1 and np.isnan(data[0]))
        data_int = np.full(shape=data.shape, fill_value=np.nan) if not self._data_valid else np.rint(data).astype(int)
        if self._data_valid and not np.allclose(a=data, b=data_int, rtol=1e-10, atol=1e-12):
            raise Exception('The data does not appear to be integral.')
        self._unique, self._counts = np.unique(data_int, return_counts=True)
        self._unique, self._counts = np.full(shape=self._unique.shape, fill_value=np.nan) if not self._data_valid else self._unique.astype(int), self._counts.astype(int)
        self._idx: dict[int, int] = { self._unique[i]: self._counts[i] for i in range(self._unique.shape[0]) }
        if self._data_valid:
            super().__init__(data=data_int, compute_ranges=False, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, **kwargs)
        else:
            self._dist_transform = dist_transform
            self._transform_value = transform_value
            self._qtype = qtype
            self._context = context
            self._cdf = self.cdf = self._non_fit_cdf_ppf
            self._ppf = self.ppf = self._non_fit_cdf_ppf
        self._pdf = self._pmf = self._pmf_from_frequencies
        self.pdf = self.pmf = np.vectorize(self._pdf)
        self._practical_domain = (np.min(data_int), np.max(data_int))
        self._practical_range_pdf = (0., float(np.max(self._counts)) / float(np.sum(self._counts))) 
    @property
    def is_fit(self) -> bool:
        """Returns `True` if the given data is valid."""
        return self._data_valid
    
    def _non_fit_cdf_ppf(self, x) -> NDArray[Shape["*"], Float]:
        x = np.asarray([x] if np.isscalar(x) else x)
        return np.asarray([np.nan] * x.shape[0])
    def _pmf_from_frequencies(self, x: int) -> float:
        x = int(x)
        if not x in self._idx.keys():
            return 0.
        return float(self._idx[x]) / float(np.sum(self._counts))
[docs]    @staticmethod
    def unfitted(dist_transform: DistTransform) -> 'Empirical_discrete':
        """
        Used to return an explicit unfit instance of :py:class:`Empirical_discrete`.
        This is used when, for example, continuous (real) data is given to the
        constructor. We still need an instance of this density in the web application
        to show an error (e.g., that there are no discrete empirical densities for
        continuous data).
        """
        return Empirical_discrete(data=np.asarray([np.nan]), dist_transform=dist_transform)  
[docs]class Parametric(Density):
    """
    This density encapsulates a parameterized and previously fitted random variable.
    Random variables in :py:mod:`scipy.stats` come with PDF/PMF, CDF, PPF, etc. so
    we just use these and forward calls to them.
    """
[docs]    def __init__(self, dist: rv_generic, dist_params: tuple, range: tuple[float, float], stat_tests: dict[str, float], use_stat_test: StatTest_Types='ks_2samp_jittered', compute_ranges: bool=False, ideal_value: float=None, dist_transform: DistTransform=DistTransform.NONE, transform_value: float=None, qtype: str=None, context: str=None, **kwargs) -> None:
        """
        For the other parameters, please refer to :py:meth:`Density.__init__()`.
        dist: ``rv_generic``
            An instance of the random variable to use.
        
        dist_params: ``tuple``
            A tuple of parameters for the random variable. The order of the parameters
            is important since it is not a dictionary.
        
        stat_tests: ``dict[str, float]``
            A (flattened) dictionary of previously conducted statistical tests. This is
            used later to choose some best-fitting parametric density by a specific test.
        
        use_stat_test: ``StatTest_Types``
            The name of the chosen statistical test used to determine the goodnes of fit.
        
        compute_ranges: ``bool``
            Whether or not to compute the practical domain of the data and the practical range
            of the PDF. Both of these use optimization to find the results.
        """
        if not isinstance(stat_tests, dict) or isinstance(stat_tests, StatisticalTest):
            raise Exception(f'This class requires a dictionary of statistical tests, not an instance of {StatisticalTest.__name__}.')
        for (key, val) in stat_tests.items():
            if not key.endswith('_pval') and not key.endswith('_stat'):
                raise Exception(f'Key not allowed: "{key}".')
            if not isinstance(val, float):
                raise Exception(f'Value for key "{key}" is not numeric.')
        self.dist: Union[rv_generic, rv_continuous] = dist
        self.stat_tests = stat_tests
        self._use_stat_test = use_stat_test
        self.dist_params = dist_params
        super().__init__(range=range, pdf=self.pdf, cdf=self.cdf, ppf=self.ppf, ideal_value=ideal_value, dist_transform=dist_transform, transform_value=transform_value, qtype=qtype, context=context, **kwargs)
        if compute_ranges:
            self.practical_domain
            self.practical_range_pdf 
    
[docs]    @staticmethod
    def unfitted(dist_transform: DistTransform) -> 'Parametric':
        """
        Used to return an explicit unfit instance of :py:class:`Parametric`. This is
        used in case when not a single maximum likelihood fit was successful for a
        number of random variables. We still need an instance of this density in the
        web application to show an error (e.g., that it was not possible to fit any
        random variable to the selected quantity).
        """
        from scipy.stats._continuous_distns import norm_gen
        return Parametric(dist=norm_gen(), dist_params=None, range=(np.nan, np.nan), stat_tests={}, dist_transform=dist_transform) 
    @property
    def use_stat_test(self) -> StatTest_Types:
        """Getter for the selected statistical test."""
        return self._use_stat_test
    
    @use_stat_test.setter
    def use_stat_test(self, st: StatTest_Types) -> Self:
        """Setter for the type of statistical test to use."""
        self._use_stat_test = st
        return self
    
    @property
    def pval(self) -> float:
        """Shortcut getter for the p-value of the selected statistical test."""
        if not self.is_fit:
            raise Exception('Cannot return p-value for non-fitted random variable.')
        return self.stat_tests[f'{self.use_stat_test}_pval']
    
    @property
    def stat(self) -> float:
        """Shortcut getter for the test statistic of the selected statistical test."""
        if not self.is_fit:
            raise Exception('Cannot return statistical test statistic for non-fitted random variable.')
        return self.stat_tests[f'{self.use_stat_test}_stat']
    
    @property
    def is_fit(self) -> bool:
        """Returns `True` if this instance is not an explicitly unfit instance."""
        return not self.dist_params is None and not np.any(np.isnan(self.dist_params))
    
    @property
    def practical_domain(self) -> tuple[float, float]:
        """Overridden to return a practical domain of :math:`[0,0]` in case this instance is unfit."""
        if not self.is_fit:
            return (0., 0.)
        return super().practical_domain
    
    @property
    def practical_range_pdf(self) -> tuple[float, float]:
        """Overridden to return a practical PDF range of :math:`[0,0]` in case this instance is unfit."""
        if not self.is_fit:
            return (0., 0.)
        return super().practical_range_pdf
    
    @property
    def dist_name(self) -> str:
        """Shortcut getter for the this density's random variable's class' name."""
        return self.dist.__class__.__name__
    
[docs]    def pdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        """
        Overridden to call the encapsulated distribution's PDF. If this density is unfit,
        always returns an array of zeros of same shape as the input.
        """
        x = np.asarray(x)
        if not self.is_fit:
            return np.zeros((x.size,))
        return self.dist.pdf(*(x, *self.dist_params)).reshape((x.size,)) 
    
[docs]    def cdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        """
        Overridden to call the encapsulated distribution's CDF. If this density is unfit,
        always returns an array of zeros of same shape as the input.
        """
        x = np.asarray(x)
        if not self.is_fit:
            return np.zeros((x.size,))
        return self.dist.cdf(*(x, *self.dist_params)).reshape((x.size,)) 
    
[docs]    def ppf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        """
        Overridden to call the encapsulated distribution's PPF. If this density is unfit,
        always returns an array of zeros of same shape as the input.
        """
        x = np.asarray(x)
        if not self.is_fit:
            return np.zeros((x.size,))
        return self.dist.ppf(*(x, *self.dist_params)).reshape((x.size,)) 
    
[docs]    def compute_practical_domain(self, cutoff: float=0.9985) -> tuple[float, float]:
        """
        Overridden to exploit having available a PPF of a fitted random variable.
        It can be used to find the practical domain instantaneously instead of
        having to solve an optimization problem.
        cutoff: ``float``
            The percentage of values to include. The CDF is optimized to find some `x`
            for which it peaks at the cutoff. For the lower bound, we subtract from
            CDF the cutoff. Note that the default value for the cutoff was adjusted
            here to extend a little beyond what is good for other types of densities.
        
        :rtype: tuple[float, float]
        :return:
            The practical domain, cut off for both directions. If this random variable
            is unfit, returns :py:class:`Density`'s :py:meth:`compute_practical_domain()`.
        """
        if not self.is_fit:
            return self.range
        
        return (self.ppf(1.0 - cutoff)[0], self.ppf(cutoff)[0])  
[docs]class Parametric_discrete(Parametric):
    """
    This type of density inherits from :py:class:`Parametric` and is its counterpart
    for discrete (integral) data. It adds an explicit function for the probability mass
    and makes the inherited PDF return the PMF's result.
    """
[docs]    def pmf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        """
        Implemented to call the encapsulated distribution's PMF. If this density is unfit,
        always returns an array of zeros of same shape as the input.
        """
        x = np.asarray(x)
        if not self.is_fit:
            return np.zeros((x.size,))
        return self.dist.pmf(*(x, *self.dist_params)).reshape((x.size,)) 
        
[docs]    def pdf(self, x: NDArray[Shape["*"], Float]) -> NDArray[Shape["*"], Float]:
        """
        Overridden to return the result of the :py:meth:`pmf()`. Note that in any case,
        a density's function :py:meth:`pdf()` is called (i.e., the callers never call
        the PMF). Therefore, it is easier catch these calls and redirect them to the PMF.
        """
        return self.pmf(x=x) 
    
[docs]    @staticmethod
    def unfitted(dist_transform: DistTransform) -> 'Parametric_discrete':
        """
        Used to return an explicit unfit instance of :py:class:`Parametric`. This is
        used in case when not a single maximum likelihood fit was successful for a
        number of random variables. We still need an instance of this density in the
        web application to show an error (e.g., that it was not possible to fit any
        random variable to the selected quantity).
        """
        from scipy.stats._continuous_distns import norm_gen
        return Parametric_discrete(dist=norm_gen(), dist_params=None, range=(np.nan, np.nan), stat_tests={}, dist_transform=dist_transform)  
[docs]class Dataset:
    """
    This class encapsulates a local (self created) dataset and provides help with transforming
    it, as well as giving some convenience getters.
    """
[docs]    def __init__(self, ds: LocalDataset, df: pd.DataFrame) -> None:
        self.ds = ds
        self.df = df 
    
    @property
    def quantity_types(self) -> list[str]:
        """Shortcut getter for the manifest's quantity types."""
        return list(self.ds['qtypes'].keys())
[docs]    def contexts(self, include_all_contexts: bool=False) -> Iterable[str]:
        """
        Returns the manifest's defined contexts as a generator. Sometimes we need to ignore
        the context and aggregate a quantity type across all defined contexts. Then, a virtual
        context called `__ALL__` is used.
        include_all_contexts: ``bool``
            Whether to also yield the virtual `__ALL__`-context.
        """
        yield from self.ds['contexts']
        if include_all_contexts:
            yield '__ALL__' 
    
    @property
    def ideal_values(self) -> dict[str, Union[float, int, None]]:
        """Shortcut getter for the manifest's ideal values."""
        return self.ds['ideal_values']
    
[docs]    def is_qtype_discrete(self, qtype: str) -> bool:
        """
        Returns whether a given quantity type is discrete.
        """
        return self.ds['qtypes'][qtype] == 'discrete' 
    
[docs]    def qytpe_desc(self, qtype: str) -> str:
        """
        Returns the description associated with a quantity type.
        """
        return self.ds['desc_qtypes'][qtype] 
    
[docs]    def context_desc(self, context: str) -> Union[str, None]:
        """
        Returns the description associated with a context (if any).
        """
        return self.ds['desc_contexts'][context] 
    
    @property
    def quantity_types_continuous(self) -> list[str]:
        """
        Returns a list of quantity types that are continuous (real-valued).
        """
        return list(filter(lambda qtype: not self.is_qtype_discrete(qtype=qtype), self.quantity_types))
    @property
    def quantity_types_discrete(self) -> list[str]:
        """
        Returns a list of quantity types that are discrete (integer-valued).
        """
        return list(filter(lambda qtype: self.is_qtype_discrete(qtype=qtype), self.quantity_types))
[docs]    def data(self, qtype: str, context: Union[str, None, Literal['__ALL__']]=None, unique_vals: bool=True, sub_sample: int=None) -> NDArray[Shape["*"], Float]:
        """
        This method is used to select a subset of the data, that is specific to at least
        a type of quantity, and optionally to a context, too.
        qtype: ``str``
            The name of the quantity type to get data for.
        context: ``Union[str, None, Literal['__ALL__']]``
            You may specify a context to further filter the data by. Data is always specific
            to a quantity type, and sometimes to a context. If not context-based filtering is
            desired, pass ``None`` or ``__ALL__``.
        
        unique_vals: ``bool``
            If `True`, some small jitter will be added to the data in order to make it unique.
        
        sub_sample: ``int``
            Optional unsigned integer with number of samples to take in case the dataset is
            very large. It is only applied if the number is smaller than the data's size.
        """
        new_df = self.df[self.df[self.ds['colname_type']] == qtype]
        if context is not None and context != '__ALL__':
            # Check if context exists:
            if not context in list(self.contexts(include_all_contexts=False)):
                raise Exception(f'The context "{context}" is not known.')
            new_df = new_df[new_df[self.ds['colname_context']] == context]
        
        vals = new_df[self.ds['colname_data']]
        if unique_vals:
            rng = np.random.default_rng(seed=1_337)
            r = rng.choice(a=np.linspace(1e-8, 1e-6, vals.size), size=vals.size, replace=False)
            # Add small but insignificant perturbations to the data to produce unique
            # values that would otherwise be eliminated by certain methods.
            vals += r
        
        vals = vals.to_numpy()
        
        if sub_sample is not None and sub_sample < vals.size:
            rng = np.random.default_rng(seed=1_338)
            vals = rng.choice(a=vals, size=sub_sample, replace=False)
        
        return vals 
    
[docs]    def num_observations(self) -> Iterable[tuple[str, str, int]]:
        """
        Returns the number of observations for each quantity type in each context.
        :rtype: ``Iterable[tuple[str, str, int]]``
            The first element is the context, the second the quantity type, and the
            third is the number of observations.
        :return: Returns an iterable generator.
        """
        for ctx in self.contexts(include_all_contexts=False):
            for qtype in self.quantity_types:
                yield (ctx, qtype, self.data(qtype=qtype, context=ctx, unique_vals=False).shape[0]) 
    
[docs]    def has_sufficient_observations(self, raise_if_not: bool=True) -> bool:
        """
        Helper method to check whether each quantity type in each context has at least
        two observations.
        raise_if_not: ``bool``
            If set to `True`, will raise an exception instead of returning `False` in
            case there are insufficiently many observations. The exception is more
            informative as it includes the the context and quantity type.
        
        :rtype: ``bool``
        :return: A boolean indicating whether this Dataset has sufficiently many observations
            for each and every quantity type.
        """
        for ctx, qtype, num_obs in self.num_observations():
            if num_obs < 2:
                if raise_if_not:
                    raise Exception(f'The quantity type "{qtype}" in context "{ctx}" has insufficient ({num_obs}) observation(s).')
                else:
                    return False
        return True 
    
    
[docs]    def analyze_groups(self, use: Literal['anova', 'kruskal'], qtypes: Iterable[str], contexts: Iterable[str], unique_vals: bool=True) -> pd.DataFrame:
        """
        For each given type of quantity, this method performs an ANOVA across all
        given contexts.
        use: ``Literal['anova', 'kruskal']``
            Indicates which method for comparing groups to use. We can either conduct
            an ANOVA or a Kruskal-Wallis test.
        qtypes: ``Iterable[str]``
            An iterable of quantity types to conduct the analysis for. For each given
            type, a separate analysis is performed and the result appended to the
            returned data frame.
        
        contexts: ``Iterable[str]``
            An iterable of contexts across which each of the quantity types shall be
            analyzed.
        
        unique_vals: ``bool``
            Passed to :meth:`self.data()`. If true, than small, random, and unique
            noise is added to the data before it is analyzed. This will effectively
            deduplicate any samples in the data (if any).
        :rtype: ``pd.DataFrame``
        
        :return: A data frame with the columns ``qtype`` (name of the quantity type),
            ``stat`` (ANOVA test statistic), ``pval``, and ``across_contexts``, where
            the latter is a semicolon-separated list of contexts the quantity type was
            compared across.
        """
        use_method: Callable = None
        if use == 'anova':
            use_method = f_oneway
        elif use == 'kruskal':
            use_method = kruskal
        else:
            raise Exception(f'Method "{use}" is not supported.')
        
        # We first have to build the data; f_oneway requires *args, where each
        # arg is a data series.
        if len(list(qtypes)) < 1 or len(list(contexts)) < 2:
            raise Exception('Requires one or quantity types and two or more contexts.')
        def anova_for_qtype(qtype: str) -> dict[str, Union[str, str, float]]:
            samples = ()
            for ctx in contexts:
                samples += (self.data(qtype=qtype, context=None if ctx == '__ALL__' else ctx, unique_vals=unique_vals),)
            
            stat, pval = use_method(*samples)
            return { 'qtype': qtype, 'stat': stat, 'pval': pval, 'across_contexts': ';'.join(contexts) }
        res_dicts = Parallel(n_jobs=-1)(delayed(anova_for_qtype)(qtype) for qtype in tqdm(qtypes))
        return pd.DataFrame(res_dicts) 
    
[docs]    def analyze_TukeyHSD(self, qtypes: Iterable[str]) -> pd.DataFrame:
        r"""
        Calculate all pairwise comparisons for the given quantity types with Tukey's
        Honest Significance Test (HSD) and return the confidence intervals. For each
        type of quantity, this method performs all of its associated contexts pairwise
        comparisons.
        For example, given a quantity :math:`Q` and its contexts :math:`C_1,C_2,C_3`,
        this method will examine the pairs :math:`\left[\{C_1,C_2\},\{C_1,C_3\},\{C_2,C_3\}\right]`.
        For a single type of quantity, e.g., this test is useful to understand how
        different the quantity manifests across contexts. For multiple quantities, it
        also allows understanding how contexts distinguish from one another, holistically.
        qtypes: ``Iterable[str]``
            An iterable of quantity types to conduct the analysis for. For each given
            type, a separate analysis is performed and the result appended to the
            returned data frame.
        :rtype: ``pd.DataFrame``
        :return: A data frame with the columns ``group1``, ``group2``, ``meandiff``,
            ``p-adj``, ``lower``, ``upper``, and ``reject``. For details see
            :meth:`statsmodels.stats.multicomp.pairwise_tukeyhsd()`.
        """
        if len(list(qtypes)) < 1:
            raise Exception('Requires one or quantity types.')
        
        temp = self.df.copy()
        temp[self.ds['colname_context']] = '__ALL__' # Erase context
        all_data = pd.concat([temp, self.df])
        
        def tukeyHSD_for_qtype(qtype: str) -> pd.DataFrame:
            data = all_data[all_data[self.ds['colname_type']] == qtype]
            tukey = pairwise_tukeyhsd(endog=data[self.ds['colname_data']], groups=data[self.ds['colname_context']])
            temp = tukey.summary().data
            return pd.DataFrame(data=temp[1:], columns=temp[0])
        res_dfs = Parallel(n_jobs=-1)(delayed(tukeyHSD_for_qtype)(qtype) for qtype in tqdm(qtypes))
        return pd.concat(res_dfs) 
    
[docs]    def analyze_distr(self, qtypes: Iterable[str], use_ks_2samp: bool=True, ks2_max_samples=40_000) -> pd.DataFrame:
        """
        Performs the two-sample Kolmogorov--Smirnov test or Welch's t-test for two or
        more types of quantity. Performs the test for all unique pairs of quantity types.
        qtypes: ``Iterable[str]``
            An iterable of quantity types to test in a pair-wise manner.
        
        use_ks_2samp: ``bool``
            If `True`, use the two-sample Kolmogorov--Smirnov; Welch's t-test, otherwise.
        
        ks2_max_samples: ``int``
            Unsigned integer used to limit the number of samples used in KS2-test. For larger
            numbers than the default, it may not be possible to compute it exactly.
        :rtype: ``pd.DataFrame``
        :return:
            A data frame with columns `qtype`, `stat`, `pval`, `group1`, and `group2`.
        """
        if len(list(qtypes)) < 1:
            raise Exception('Requires one or more quantity types.')
        
        unique_context_pairs: list[tuple[str, str]] = list(combinations(iterable=self.contexts(include_all_contexts=True), r=2))
        
        def compare(qtype: str) -> pd.DataFrame:
            dict_list: list[dict[str, Union[str, float]]] = [ ]
            for udp in unique_context_pairs:
                data1 = self.data(qtype=qtype, context=udp[0])
                data2 = self.data(qtype=qtype, context=udp[1])
                stat = pval = None
                if use_ks_2samp:
                    rng = np.random.default_rng(seed=1)
                    data1 = data1 if data1.shape[0] <= ks2_max_samples else rng.choice(a=data1, size=ks2_max_samples, replace=False)
                    data2 = data2 if data2.shape[0] <= ks2_max_samples else rng.choice(a=data2, size=ks2_max_samples, replace=False)
                    stat, pval = ks_2samp(data1=data1, data2=data2, alternative='two-sided', method='exact')
                else:
                    stat, pval = ttest_ind(a=data1, b=data2, equal_var=False, alternative='two-sided')
                dict_list.append({
                    'qtype': qtype, 'stat': stat, 'pval': pval, 'group1': udp[0], 'group2': udp[1]
                })
            
            return pd.DataFrame(dict_list)
        res_dfs = Parallel(n_jobs=-1)(delayed(compare)(qtype) for qtype in tqdm(self.quantity_types))
        return pd.concat(res_dfs)