from __future__ import annotations
import math
from typing import Callable, Tuple
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
from scipy.stats import chi2_contingency, fisher_exact, ks_2samp, mannwhitneyu
from scipy.stats.contingency import crosstab
from statsmodels.stats.power import TTestIndPower
from statsmodels.stats.weightstats import ttest_ind, ztest
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.eventstream.types import EventstreamType
from retentioneering.tooling.stattests.constants import STATTEST_NAMES
def _effect_size_cohend(sample1: list, sample2: list) -> float:
"""
Calculate an effect size value using Cohen’s D measure (difference between two means).
Parameters
----------
sample1 : list
First group's statistics.
sample2 : list
Second group's statistics.
Returns
-------
float
Calculated an effect size value.
"""
sample_size1, sample_size2 = len(sample1), len(sample2)
standard_deviation1, standard_deviation2 = np.var(sample1, ddof=1), np.var(sample2, ddof=1)
pooled_standard_deviation = float(
math.sqrt(
((sample_size1 - 1) * standard_deviation1 + (sample_size2 - 1) * standard_deviation2)
/ (sample_size1 + sample_size2 - 2)
)
)
mean_sample1, mean_sample2 = float(np.mean(sample1)), float(np.mean(sample2))
return (mean_sample1 - mean_sample2) / pooled_standard_deviation
def _effect_size_cohenh(sample1: list, sample2: list) -> float:
"""
Calculate an effect size value using Cohen’s D measure (between proportions)
Parameters
----------
sample1 : list
First group's statistics.
sample2 : list
Second group's statistics.
Returns
-------
float
Calculated an effect size value.
"""
mean_sample1, mean_sample2 = np.mean(sample1), np.mean(sample2)
return 2 * (math.asin(math.sqrt(mean_sample1)) - math.asin(math.sqrt(mean_sample2)))
[docs]class StatTests:
"""
A class for determining statistical difference between two groups of users.
Parameters
----------
eventstream : EventstreamType
See Also
--------
.Eventstream.stattests : Call StatTests tool as an eventstream method.
Notes
-----
See :doc:`StatTests user guide</user_guides/stattests>` for the details.
"""
__eventstream: EventstreamType
test: STATTEST_NAMES
groups: Tuple[list[str | int], list[str | int]]
func: Callable
group_names: Tuple[str, str]
alpha: float
g1_data: list[str | int]
g2_data: list[str | int]
is_fitted: bool
output_template_numerical = "{0} (mean ± SD): {1:.3f} ± {2:.3f}, n = {3}"
output_template_categorical = "{0} (size): n = {1}"
p_val, power, label_min, label_max = 0.0, 0.0, "", ""
@time_performance(
scope="stattests",
event_name="init",
)
def __init__(self, eventstream: EventstreamType) -> None:
self.__eventstream = eventstream
self.user_col = self.__eventstream.schema.user_id
self.event_col = self.__eventstream.schema.event_name
self.time_col = self.__eventstream.schema.event_timestamp
self.g1_data = list()
self.g2_data = list()
self.is_fitted = False
def _get_group_values(self) -> Tuple[list, list]:
data = self.__eventstream.to_dataframe()
# obtain two populations for each group
g1 = data[data[self.user_col].isin(self.groups[0])].copy()
g2 = data[data[self.user_col].isin(self.groups[1])].copy()
# obtain two distributions:
if self.test not in ["chi2_contingency", "fisher_exact"]:
g1_data = list(g1.groupby(self.user_col).apply(self.func).dropna().astype(float).values)
g2_data = list(g2.groupby(self.user_col).apply(self.func).dropna().astype(float).values)
else:
g1_data = list(g1.groupby(self.user_col).apply(self.func).dropna().values)
g2_data = list(g2.groupby(self.user_col).apply(self.func).dropna().values)
return g1_data, g2_data
def _get_freq_table(self, a: list, b: list) -> list:
labels = ["A"] * len(a) + ["B"] * len(b)
values = np.concatenate([a, b])
if self.test == "fisher_exact" and np.unique(values).shape[0] != 2:
raise ValueError("For Fisher exact test, there should be exactly 2 categories of observations in the data")
return crosstab(labels, values)[1]
def _get_test_results(self, data_max: list, data_min: list) -> Tuple[float, float]:
if self.test in ["ztest", "ttest", "mannwhitneyu", "ks_2samp"]:
# calculate effect size
if max(data_max) <= 1 and min(data_max) >= 0 and max(data_min) <= 1 and min(data_min) >= 0:
# if analyze proportions use Cohen's h:
effect_size = _effect_size_cohenh(data_max, data_min)
else:
# for other variables use Cohen's d:
effect_size = _effect_size_cohend(data_max, data_min)
# calculate power
power = TTestIndPower().power(
effect_size=effect_size,
nobs1=len(data_max),
ratio=len(data_min) / len(data_max),
alpha=self.alpha,
alternative="larger",
)
if self.test == "ks_2samp":
p_val = ks_2samp(data_max, data_min, alternative="less")[1]
elif self.test == "mannwhitneyu":
p_val = mannwhitneyu(data_max, data_min, alternative="greater")[1]
elif self.test == "ttest":
p_val = ttest_ind(data_max, data_min, alternative="larger")[1]
elif self.test == "ztest":
p_val = ztest(data_max, data_min, alternative="larger")[1]
elif self.test in ["chi2_contingency", "fisher_exact"]:
power = None
if self.test == "chi2_contingency":
freq_table = self._get_freq_table(data_max, data_min)
p_val = chi2_contingency(freq_table)[1]
elif self.test == "fisher_exact":
freq_table = self._get_freq_table(data_max, data_min)
p_val = fisher_exact(freq_table, alternative="greater")[1]
else:
raise ValueError(f"The argument test is not supported. Supported tests are: {STATTEST_NAMES}")
return p_val, power # type: ignore
def _get_sorted_test_results(self) -> Tuple[float, float, str, str]:
p_val_norm, power_norm = self._get_test_results(self.g1_data, self.g2_data)
p_val_rev, power_rev = self._get_test_results(self.g2_data, self.g1_data)
if p_val_norm < p_val_rev:
p_val = p_val_norm
power = power_norm
label_max = self.group_names[0]
label_min = self.group_names[1]
else:
p_val = p_val_rev
power = power_rev
label_max = self.group_names[1]
label_min = self.group_names[0]
return p_val, power, label_min, label_max
[docs] @time_performance(
scope="stattests",
event_name="fit",
)
def fit(
self,
test: STATTEST_NAMES,
groups: Tuple[list[str | int], list[str | int]],
func: Callable,
group_names: Tuple[str, str] = ("group_1", "group_2"),
alpha: float = 0.05,
) -> None:
"""
Calculates the stattests internal values with the defined parameters.
Applying ``fit`` method is necessary for the following usage
of any visualization or descriptive ``StatTests`` methods.
Parameters
----------
test : {'mannwhitneyu', 'ttest', 'ztest', 'ks_2samp', 'chi2_contingency', 'fisher_exact'}
Test the null hypothesis that 2 independent samples are drawn from the same
distribution. Supported tests are:
- ``mannwhitneyu`` see :mannwhitneyu:`scipy documentation<>`.
- ``ttest`` see :statsmodel_ttest:`statsmodels documentation<>`.
- ``ztest`` see :statsmodel_ztest:`statsmodels documentation<>`.
- ``ks_2samp`` see :scipy_ks:`scipy documentation<>`.
- ``chi2_contingency`` see :scipy_chi2:`scipy documentation<>`.
- ``fisher_exact`` see :scipy_fisher:`scipy documentation<>`.
groups : tuple of list
Must contain a tuple of two elements (g_1, g_2): g_1 and g_2 are collections
of user_id`s.
func : Callable
Selected metrics. Must contain a function that takes a dataset as an argument for
a single user trajectory and returns a single numerical value.
group_names : tuple, default ('group_1', 'group_2')
Names for selected groups g_1 and g_2.
alpha : float, default 0.05
Selected level of significance.
"""
called_params = {
"test": test,
"groups": groups,
"func": func,
"group_names": group_names,
"alpha": alpha,
}
not_hash_values = ["test"]
self.groups = groups
self.func = func
self.test = test
self.group_names = group_names
self.alpha = alpha
self.g1_data, self.g2_data = self._get_group_values()
self.p_val, self.power, self.label_min, self.label_max = self._get_sorted_test_results()
self.is_fitted = True
collect_data_performance(
scope="stattests",
event_name="metadata",
called_params=called_params,
not_hash_values=not_hash_values,
performance_data={},
eventstream_index=self.__eventstream._eventstream_index,
)
[docs] @time_performance(
scope="stattests",
event_name="plot",
)
def plot(self) -> Tuple[go.Figure, str]:
"""
Plots a barplot comparing the metric values between two groups.
Should be used after :py:func:`fit`.
Returns
-------
go.Figure
"""
data1 = pd.DataFrame(data={"data": self.g1_data, "groups": self.group_names[0]})
data2 = pd.DataFrame(data={"data": self.g2_data, "groups": self.group_names[1]})
combined_stats = pd.concat([data1, data2]).reset_index()
compare_plot = sns.displot(data=combined_stats, x="data", hue="groups", multiple="dodge")
compare_plot.set(xlabel=None)
return compare_plot
@property
@time_performance(
scope="stattests",
event_name="values",
)
def values(self) -> dict:
"""
Returns the comprehensive results of the comparison between the two groups.
Should be used after :py:func:`fit`.
Returns
-------
dict
"""
if not self.is_fitted:
raise ValueError("The StatTests instance needs to be fitted before returning values")
if self.test in ["ztest", "ttest", "mannwhitneyu", "ks_2samp"]:
res_dict = {
"group_one_name": self.group_names[0],
"group_one_size": len(self.g1_data),
"group_one_mean": np.array(self.g1_data).mean(),
"group_one_SD": np.array(self.g1_data).std(),
"group_two_name": self.group_names[1],
"group_two_size": len(self.g2_data),
"group_two_mean": np.array(self.g2_data).mean(),
"group_two_SD": np.array(self.g2_data).std(),
"greatest_group_name": self.label_max,
"least_group_name": self.label_min,
"is_group_one_greatest": self.label_max == self.group_names[0],
"p_val": self.p_val,
"power_estimated": self.power,
}
elif self.test in ["chi2_contingency", "fisher_exact"]:
res_dict = {
"group_one_name": self.group_names[0],
"group_one_size": len(self.g1_data),
"group_two_name": self.group_names[1],
"group_two_size": len(self.g2_data),
"p_val": self.p_val,
}
else:
raise ValueError("Wrong test passed")
return res_dict
@time_performance( # type: ignore
scope="stattests",
event_name="display_results",
)
def display_results(self) -> None:
if not self.is_fitted:
raise ValueError("The StatTests instance needs to be fitted before displaying results")
values = self.values
if self.test in ["ztest", "ttest", "mannwhitneyu", "ks_2samp"]:
print(
self.output_template_numerical.format(
values["group_one_name"], values["group_one_mean"], values["group_one_SD"], values["group_one_size"]
)
)
print(
self.output_template_numerical.format(
values["group_two_name"], values["group_two_mean"], values["group_two_SD"], values["group_two_size"]
)
)
print(
"'{0}' is greater than '{1}' with p-value: {2:.5f}".format(
values["greatest_group_name"], values["least_group_name"], values["p_val"]
)
)
print("power of the test: {0:.2f}%".format(100 * values["power_estimated"]))
elif self.test in ["chi2_contingency", "fisher_exact"]:
print(self.output_template_categorical.format(values["group_one_name"], values["group_one_size"]))
print(self.output_template_categorical.format(values["group_two_name"], values["group_two_size"]))
print("Group difference test with p-value: {:.5f}".format(values["p_val"]))
else:
raise ValueError("Wrong test passed")
@property
@time_performance(
scope="stattests",
event_name="params",
)
def params(self) -> dict:
"""
Returns the parameters used for the last fitting.
Should be used after :py:func:`fit`.
"""
return {
"test": self.test,
"groups": self.groups,
"func": self.func,
"group_names": self.group_names,
"alpha": self.alpha,
}