"""
This module contains the workflow for fitting parametric distributions
to features of own datasets.
"""
from typing import Any, Iterable
from pathlib import Path
from nptyping import Float, NDArray, Shape
from scipy.stats._distn_infrastructure import rv_continuous, rv_discrete
from metrics_as_scores.cli.helpers import get_local_datasets, isint
from metrics_as_scores.cli.Workflow import Workflow
from metrics_as_scores.distribution.distribution import DistTransform, LocalDataset, Dataset
from metrics_as_scores.distribution.fitting import Fitter, FitterPymoo, Continuous_RVs, Discrete_Problems
from metrics_as_scores.data.pregenerate_fit import get_data_tuple
from metrics_as_scores.data.pregenerate_distns import generate_parametric_fits
from metrics_as_scores.tools.funcs import flatten_dict
from questionary import Choice
from pickle import dump
from os import cpu_count
from joblib import Parallel, delayed
from tqdm import tqdm
import pandas as pd
from metrics_as_scores.__init__ import DATASETS_DIR
from scipy.stats._continuous_distns import norminvgauss_gen, gausshyper_gen, genhyperbolic_gen, geninvgauss_gen, invgauss_gen, studentized_range_gen
from scipy.stats._discrete_distns import betabinom_gen, logser_gen, planck_gen, nbinom_gen, nchypergeom_fisher_gen, nchypergeom_wallenius_gen, nhypergeom_gen, hypergeom_gen, yulesimon_gen, zipfian_gen
[docs]class FitParametricWorkflow(Workflow):
__doc__ = '''
This workflow fits distributions to an existing dataset. For each
feature, and for each group, a large number of random
variables are fit, and a number of statistical tests are carried
out such that the best-fitting distribution may be selected/used.
Regardless of whether a quantity is continuous or discrete, many
continuous random variables are attempted to fit. If a quantity
is discrete, however, an additional set of discrete random variables
is attempted to fit. Especially the latter might be extraordinarily
expensive.
Therefore, you may only select a subset of random variables that
you want to attempt to fit. However, if you intend to share your
dataset and make it available to others, then you should include
and attempt to fit all distributions.
The following process, once begun, will save the result of fitting
a single feature (from within a single group) as a separate file.
If the file already exists, no new fit is attempted. This is so that
this process can be interrupted and resumed.
'''.strip()
[docs] def __init__(self) -> None:
super().__init__()
self.use_ds: LocalDataset = None
self.ds: Dataset = None
self.df: pd.DataFrame = None
self.fits_dir: Path = None
self.selected_rvs_c: list[type[rv_continuous]] = None
self.selected_rvs_d: list[type[rv_discrete]] = None
self.use_fitter: type[Fitter] = FitterPymoo
self.num_cpus = 1
def _select_continuous_rvs(self) -> Iterable[type[rv_continuous]]:
self.q.print('''
You can now select the continuous random variables that you want to
attempt to fit to the data. Select all in case you intend to re-
distribute and publicize your dataset. It is often not worth to de-
select distributions unless you have a specific reason to do so,
because fitting a continuous distribution is comparatively cheap and
fast.'''.strip())
recommend_ignore = [norminvgauss_gen, gausshyper_gen, genhyperbolic_gen, geninvgauss_gen, invgauss_gen, studentized_range_gen]
return self.q.checkbox(message='Select continuous random variables:', choices=[Choice(title=f'{type(rv).__name__} [{rv.name}]', value=type(rv), checked=not type(rv) in recommend_ignore) for rv in Continuous_RVs]).ask()
def _select_discrete_rvs(self) -> Iterable[type[rv_discrete]]:
self.q.print('''
You can now select the discrete random variables that you want to
additionally attempt to fit to discrete features. Select all in
case you intend to redistribute and publicize your dataset.
While there are much fewer discrete random variables, their fitting
is dramatically more computationally expensive to compute, since a
global search has to be performed.'''.strip())
# Note how we will use the RV's type, not the problem's!
from scipy.stats import _discrete_distns
recommended_ignore = list(rv.__name__ for rv in [betabinom_gen, logser_gen, planck_gen, nbinom_gen, nchypergeom_fisher_gen, nchypergeom_wallenius_gen, nhypergeom_gen, hypergeom_gen, yulesimon_gen, zipfian_gen])
return self.q.checkbox(message='Select discrete random variables:', choices=[Choice(title=dp[0], value=getattr(_discrete_distns, dp[0]), checked=not dp[0] in recommended_ignore) for dp in Discrete_Problems.items()]).ask()
def _get_data_tuples(self, dist_transform: DistTransform, continuous: bool) -> tuple[dict[str, float], dict[str, NDArray[Shape["*"], Float]]]:
"""
Prepares all required datasets for one distribution transform in parallel,
either for continuous or discrete data.
dist_transform: ``DistTransform``
The chosen distribution transform.
continuous: ``bool``
Passed forward to :meth:`get_data_tuple()`:
whether the transform is real-valued or must be converted to integer.
:rtype: ``tuple[dict[str, float], dict[str, NDArray[Shape["*"], Float]]]``
Returns two dictionaries.
:return: Two dictionaries, where the keys are the same in either. The values
in the first are computed ideal values for the selected transform. The
values in the later are 1-D arrays of the data (the distances).
"""
res = Parallel(n_jobs=min(self.num_cpus, len(self.ds.quantity_types)))(delayed(get_data_tuple)(ds=self.ds, qtype=qtype, dist_transform=dist_transform, continuous_transform=continuous) for qtype in tqdm(self.ds.quantity_types))
data_dict = dict([(item[0], item[1]) for sublist in res for item in sublist])
transform_values_dict = dict([(item[0], item[2]) for sublist in res for item in sublist])
return (transform_values_dict, data_dict)
def _fit_parametric(self, dist_transform: DistTransform, do_print: bool=True) -> list[dict[str, Any]]:
# There are two steps to this:
# 1) Get all data required
# 2) Fit
# Continuous:
s = 'Performing distribution transforms for: '
if do_print:
self.print_info(text_normal=s, text_vital='continuous')
transform_values_dict, data_dict = self._get_data_tuples(dist_transform=dist_transform, continuous=True)
# Discrete:
if do_print:
self.print_info(text_normal=s, text_vital='discrete')
transform_values_discrete_dict, data_discrete_dict = self._get_data_tuples(dist_transform=dist_transform, continuous=False)
if do_print:
self.print_info(text_normal='', text_vital='Starting fitting of distributions, in randomized order.')
return generate_parametric_fits(
ds=self.ds,
num_jobs=self.num_cpus,
fitter_type=self.use_fitter,
dist_transform=dist_transform,
selected_rvs_c=self.selected_rvs_c,
selected_rvs_d=self.selected_rvs_d,
data_dict=data_dict,
data_discrete_dict=data_discrete_dict,
transform_values_dict=transform_values_dict,
transform_values_discrete_dict=transform_values_discrete_dict)
[docs] def fit_parametric(self) -> None:
"""Main entry point for this workflow."""
self._print_doc()
datasets = list(get_local_datasets())
self.use_ds = self.askt(
prompt='Select the local dataset you want to generate fits for:',
options=list([(f'{ds["name"]} [{ds["id"]}] by {", ".join(ds["author"])}', ds) for ds in datasets]))
self.fits_dir = DATASETS_DIR.joinpath(f'./{self.use_ds["id"]}/fits')
datafile = DATASETS_DIR.joinpath(f'./{self.use_ds["id"]}/org-data.csv')
self.print_info(text_normal='Reading original data file: ', text_vital=str(datafile))
self.df = pd.read_csv(filepath_or_buffer=str(datafile), index_col=False)
self.df[self.use_ds['colname_type']] = self.df[self.use_ds['colname_type']].astype(str)
self.df[self.use_ds['colname_context']] = self.df[self.use_ds['colname_context']].astype(str)
self.ds = Dataset(ds=self.use_ds, df=self.df)
self.selected_rvs_c = list(self._select_continuous_rvs())
self.print_info(text_normal='Having selected ', text_vital=str(len(self.selected_rvs_c)), end='')
self.q.print(' continuous random variables.')
self.selected_rvs_d = list(self._select_discrete_rvs())
self.print_info(text_normal='Having selected ', text_vital=str(len(self.selected_rvs_d)), end='')
self.q.print(' discrete random variables.')
self.q.print(10*'-')
self.q.print('''
You need to choose how fits should be computed. Metrics As Scores offers
two classes: Fitter and FitterPymoo, where the latter is recommended. As
far as continuous random variables are concerned, it does not make a
difference. For discrete random variables, however, the original Fitter
uses scipy's approach of differential evolution, which does not always
scale well. The class FitterPymoo, on the other hand, uses separate, mixed
variable genetic algorithm problems, tailored to each random variable.
'''.strip())
self.use_fitter = self.askt(options=[
(f'{FitterPymoo.__name__} [Recommended]', FitterPymoo),
(f'{Fitter.__name__}', Fitter)
], prompt='Which fitter do you want to use?')
self.q.print(10*'-')
self.q.print('''
We are ready now to fit random variables. Between each of the transform types
(expectation/mean, median, mode, supremum, infimum) you get the chance to pause
and resume later, and the process will skip existing computed transforms. It is
recommended to run the following on a resourceful computer and to exploit the
highest possible degree of parallelism available. Next, you will be asked how
many cores you would like to use.
'''.strip())
max_cpus = cpu_count()
self.num_cpus = int(self.q.text(f'How many cores should I use (1-{max_cpus})?', default=str(max_cpus), validate=lambda s: isint(s) and int(s) > 0 and int(s) <= max_cpus).ask())
for dist_transform in list(DistTransform):
result_file = self.fits_dir.joinpath(f'./pregen_distns_{dist_transform.name}.pickle')
self.q.print(10*'-')
if result_file.exists():
self.print_info(text_normal='Fits already computed for transform: ', text_vital=f'{dist_transform.value} [{dist_transform.name}]')
continue
self.print_info(text_normal='Generating parametric fits for: ', text_vital=f'{dist_transform.value} [{dist_transform.name}]')
res = self._fit_parametric(dist_transform=dist_transform)
with open(file=str(result_file), mode='wb') as fp:
dump(obj=res, file=fp)
# Also, let's dump a CSV with all results.
result_file = self.fits_dir.joinpath(f'./pregen_distns_{dist_transform.name}.csv')
df = pd.DataFrame([flatten_dict(d=d) for d in res])
df.to_csv(path_or_buf=str(result_file), index=False)