from __future__ import annotations
import itertools
from typing import Callable, Tuple
import matplotlib.colors as mcolors
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import colormaps, colors
from scipy.sparse import csc_matrix
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.common.constants import NGRAM_SEP, SEQUENCES_METRIC_TYPES
from retentioneering.eventstream.segments import _split_segment
from retentioneering.eventstream.types import (
EventstreamType,
SplitExpr,
UserGroupsNamesType,
UserGroupsType,
)
[docs]class Sequences:
"""
A class that provides methods for patterns exploration.
Parameters
----------
eventstream : EventstreamType
See Also
--------
.Eventstream.sequences : Call Sequences tool as an eventstream method.
Notes
-----
See :doc:`Sequences user guide</user_guides/sequences>` for the details.
"""
__eventstream: EventstreamType
ngram_range: Tuple[int, int]
groups: UserGroupsType | None
path_id_col: str
metrics: SEQUENCES_METRIC_TYPES | None
final_columns_level_0: list[str]
group_names: UserGroupsNamesType | None
DEFAULT_GROUP_NAMES = ("group_1", "group_2")
SEQUENCE_TYPE_GROUPS_COLUMN_NAME = ("sequence_type", "")
SEQUENCE_TYPE_COLUMN_NAME = "sequence_type"
RELATIVE_DELTA_COLUMN_NAME = "delta_rel"
ABS_DELTA_COLUMN_NAME = "delta_abs"
ORIGINAL_METRICS_LIST = ["paths", "paths_share", "count", "count_share", "avg_count"]
_vec_data: pd.DataFrame
@time_performance(
scope="sequences",
event_name="init",
)
def __init__(self, eventstream: EventstreamType):
self._default_columns_level_0: list = []
self._sample_column_name: str
self._full_metrics_list: list = []
self._samples_full = None
self.__eventstream = eventstream
self.user_col = self.__eventstream.schema.user_id
self.event_col = self.__eventstream.schema.event_name
self.time_col = self.__eventstream.schema.event_timestamp
self._sequence_df = pd.DataFrame()
self._show_values_df = pd.DataFrame()
self.groups = None
def _calculate_metrics_df(self, vec_data: pd.DataFrame) -> pd.DataFrame:
sequence_df = pd.concat(
[vec_data.sum(), vec_data.astype(bool).sum(axis=0)], axis=1, keys=["count", self.path_id_col]
).astype(int)
sequence_df["count_share"] = sequence_df["count"] / sequence_df["count"].sum()
sequence_df[f"{self.path_id_col}_share"] = sequence_df[self.path_id_col] / len(vec_data)
sequence_df["avg_count"] = sequence_df["count"] / sequence_df[self.path_id_col]
sequence_df = sequence_df[self._full_metrics_list].fillna(0)
return sequence_df
def _add_support_set(self, vec_data: pd.DataFrame) -> pd.DataFrame:
sparse_matrix = csc_matrix(vec_data.values)
idx = vec_data.index
samples_full = []
for j, col in enumerate(vec_data.columns):
non_zero_indices = sparse_matrix.indices[sparse_matrix.indptr[j] : sparse_matrix.indptr[j + 1]]
samples_full.append([idx[non_zero_indices].tolist()])
samples_full = pd.DataFrame(samples_full, index=vec_data.columns, columns=[self._sample_column_name])
return samples_full
@staticmethod
def _sequence_type(x: str) -> str:
temp = x.split(NGRAM_SEP)
n = len(temp)
n_unique = len(set(temp))
if (n_unique > 1) and (temp[0] == temp[-1]):
return "cycle"
if (n_unique == 1) and (n > 1):
return "loop"
return "other"
def _build_groups_df(
self, vec_data: pd.DataFrame, groups: UserGroupsType, group_names: UserGroupsNamesType
) -> pd.DataFrame:
vec_data["group"] = None
vec_data.loc[groups[0], "group"] = group_names[0]
vec_data.loc[groups[1], "group"] = group_names[1]
group_mask = vec_data["group"].copy()
vec_data.drop(columns="group", inplace=True)
group_dfs = []
samples_dfs = []
for group in group_names:
vec_data_group = vec_data[group_mask == group]
sequence_df_group = self._calculate_metrics_df(vec_data_group)
sequence_df_group.columns = pd.MultiIndex.from_product([sequence_df_group.columns, [group]])
group_dfs.append(sequence_df_group)
samples_full_group = self._add_support_set(vec_data=vec_data_group)
samples_dfs.append(samples_full_group)
sequence_df = pd.concat(group_dfs, axis=1)
sequence_type = pd.Series(sequence_df.index).apply(lambda x: self._sequence_type(x))
sequence_df[self.SEQUENCE_TYPE_GROUPS_COLUMN_NAME] = sequence_type.values
samples_full = pd.concat(samples_dfs, axis=1)
samples_columns = [(self._sample_column_name, group_names[0]), (self._sample_column_name, group_names[1])]
samples_full.columns = pd.MultiIndex.from_tuples(samples_columns)
for metric in self._full_metrics_list:
sequence_df[(metric, self.ABS_DELTA_COLUMN_NAME)] = (
sequence_df[(metric, group_names[0])] - sequence_df[(metric, group_names[1])]
)
sequence_df[(metric, self.RELATIVE_DELTA_COLUMN_NAME)] = (
sequence_df[(metric, self.ABS_DELTA_COLUMN_NAME)] / sequence_df[(metric, group_names[1])]
)
final_columns = (
list(itertools.product(self._full_metrics_list, self._default_columns_level_0))
+ [self.SEQUENCE_TYPE_GROUPS_COLUMN_NAME]
+ samples_columns
)
index_cols = pd.MultiIndex.from_tuples(final_columns)
sequence_df = sequence_df.merge(samples_full, left_index=True, right_index=True)
return sequence_df[index_cols]
[docs] @time_performance(
scope="sequences",
event_name="fit",
)
def fit(
self,
ngram_range: Tuple[int, int] = (1, 1),
groups: SplitExpr | None = None,
group_names: UserGroupsNamesType | None = None,
path_id_col: str | None = None,
) -> None:
"""
Calculate statistics on n-grams found in eventstream.
Calculated path_metrics:
- ``paths``: The number of unique paths that contain each particular event sequence
(calculated within the specified ``path_id_col``, or ``user_id`` by default).
- ``paths_share``: The ratio of paths to the sum of paths.
- ``count``: The number of occurrences of a particular sequence.
- ``count_share``: The ratio of count to the sum of counts.
- ``avg_count``: The average number of occurrences per path.
Defined sequences types:
- ``loop`` - if sequence length >= 2 and all the events are equal.
- ``cycle`` - if sequence length >= 3 and start and end events are equal.
- ``other`` - all other sequences.
Parameters
----------
ngram_range : Tuple(int, int)
The lower and upper boundary of the range of n-values for different word n-grams to be
extracted. For example, ngram_range=(1, 1) means only single events, (1, 2) means single events
and bigrams.
groups : SplitExpr, optional
Can be specified to calculate statistics for n-grams group-wise and provide delta values.
Must contain a tuple of two elements (g_1, g_2), where g_1 and g_2 are collections
of path IDs (for the column specified in the ``path_id_col`` parameter).
group_names : UserGroupsNamesType, optional
Names for the selected groups g_1 and g_2, which will be shown in the final plot header.
path_id_col : str, optional
The column used for calculating 'paths' and 'paths_share' path_metrics.
If not specified, the ``user_id`` from ``eventstream.schema``
will be used. For example, it can be specified as ``session_id`` if ``eventstream``
has such a ``custom_col``.
Returns
-------
None
Notes
-----
See the results of calculation using :py:func:`plot` method and the :py:func:`values` attribute.
"""
called_params = {
"ngram_range": ngram_range,
"groups": groups,
"group_names": group_names,
"path_id_col": path_id_col,
}
not_hash_values = ["ngram_range"]
self.ngram_range = ngram_range
self.path_id_col = path_id_col or self.__eventstream.schema.user_id
self._sample_column_name = f"{self.path_id_col}_sample"
self._full_metrics_list = [self.path_id_col, f"{self.path_id_col}_share", "count", "count_share", "avg_count"]
if groups:
self.groups, _ = _split_segment(self.__eventstream, groups)
if group_names:
self.group_names = group_names
else:
self.group_names = self.DEFAULT_GROUP_NAMES
vec_data = self.__eventstream.extract_features(
feature_type="count", ngram_range=self.ngram_range, path_id_col=self.path_id_col
)
if self.groups:
self._default_columns_level_0 = [
self.group_names[0],
self.group_names[1],
self.ABS_DELTA_COLUMN_NAME,
self.RELATIVE_DELTA_COLUMN_NAME,
]
vec_data = vec_data[vec_data.index.isin(list(self.groups[0]) + list(self.groups[1]))] # type: ignore
if self.groups:
self._sequence_df = self._build_groups_df(
vec_data=vec_data, groups=self.groups, group_names=self.group_names
)
else:
sequence_df = self._calculate_metrics_df(vec_data)
sequence_type = pd.Series(sequence_df.index).apply(lambda x: self._sequence_type(x))
sequence_df[self.SEQUENCE_TYPE_COLUMN_NAME] = sequence_type.values
samples_full = self._add_support_set(vec_data=vec_data)
self._sequence_df = sequence_df.join(samples_full[self._sample_column_name])
self._sequence_df.index.name = "Sequence"
self._show_values_df = self._sequence_df.copy()
collect_data_performance(
scope="sequences",
event_name="metadata",
called_params=called_params,
not_hash_values=not_hash_values,
performance_data={"shape": self._show_values_df.shape},
eventstream_index=self.__eventstream._eventstream_index,
)
def _size_sample_ids(
self, data: pd.DataFrame, sample_size: int | None, final_columns_level_0: list
) -> pd.DataFrame:
if sample_size:
displayed_columns = final_columns_level_0 + [self.SEQUENCE_TYPE_COLUMN_NAME] + [self._sample_column_name]
data = data[displayed_columns].copy()
def shuffle_and_slice(x: pd.Series, size: int = sample_size) -> pd.Series: # type: ignore
np.random.seed(42)
np.random.shuffle(x)
return x[:size]
if self.groups:
for group in self.group_names: # type: ignore
data[(self._sample_column_name, group)] = data[(self._sample_column_name, group)].apply(
lambda x: shuffle_and_slice(x)
)
else:
data[self._sample_column_name] = data[self._sample_column_name].apply(lambda x: shuffle_and_slice(x)) # type: ignore
else:
displayed_columns = final_columns_level_0 + [self.SEQUENCE_TYPE_COLUMN_NAME]
data = self._sequence_df[displayed_columns]
return data
@staticmethod
def _create_plot(
data: pd.DataFrame, heatmap_columns: list, precision: int | None
) -> pd.io.formats.style.Styler: # type: ignore
columns_intersection = set(data.columns) & set(heatmap_columns)
if not columns_intersection:
return
heatmap_values = data.copy(deep=True)
numeric_cols = data.select_dtypes(include="number").columns
max_values = data[numeric_cols].replace(+np.inf, -np.inf).max()
min_values = data[numeric_cols].replace(-np.inf, +np.inf).min()
heatmap_values[numeric_cols] = heatmap_values[numeric_cols].replace(+np.inf, max_values) # type: ignore
heatmap_values[numeric_cols] = heatmap_values[numeric_cols].replace(-np.inf, min_values) # type: ignore
cm = sns.diverging_palette(h_neg=246, h_pos=27, s=99, l=68, as_cmap=True)
def add_background(s: pd.Series, cmap: str = "PuBu") -> list[str]: # type: ignore
a = heatmap_values.loc[:, s.name].copy() # type: ignore
if a.min() >= 0:
norm = mcolors.TwoSlopeNorm(vmin=-1, vcenter=0, vmax=a.max())
elif a.max() <= 0:
norm = mcolors.TwoSlopeNorm(vmin=a.min(), vcenter=0, vmax=1)
else:
norm = mcolors.TwoSlopeNorm(vmin=a.min(), vcenter=0.0, vmax=a.max())
normed = norm(a.values)
c = [colors.rgb2hex(x) for x in colormaps.get_cmap(cmap)(normed)]
return ["background-color: %s" % color for color in c]
plot_sequence_df_styler = data.style.apply(add_background, subset=heatmap_columns, cmap=cm).format(
precision=precision, thousands=" "
)
return plot_sequence_df_styler
def _check_params(
self,
plot_sequence_df: pd.DataFrame,
metrics: SEQUENCES_METRIC_TYPES | None = None,
threshold: tuple[str, float | int] | None = None,
sorting: tuple[str | tuple, bool] | tuple[list[str | tuple], list[bool]] | None = None,
heatmap_columns: str | list[str | tuple] | None = None,
) -> Tuple[list, tuple, tuple, list]:
replacements = {"paths": self.path_id_col, "paths_share": f"{self.path_id_col}_share"}
replacer = replacements.get
final_columns_full, final_columns_level_0 = self._check_metrics(
plot_sequence_df=plot_sequence_df, metrics=metrics, replacer=replacer
)
heatmap_columns_list: list
heatmap_columns_list = self._define_heatmap_columns(
heatmap_columns=heatmap_columns,
replacer=replacer,
final_columns_level_0=final_columns_level_0,
final_columns_full=final_columns_full,
)
sorting_checked = self._define_sorting_columns(
sorting=sorting, replacer=replacer, final_columns_full=final_columns_full
)
threshold_checked: Tuple[list | str | None, float | int | None]
if threshold:
threshold_checked = self._define_threshold_columns(
threshold=threshold, replacer=replacer, final_columns_full=final_columns_full
)
else:
threshold_checked = None, None
return heatmap_columns_list, sorting_checked, threshold_checked, final_columns_level_0
def _check_metrics(
self, plot_sequence_df: pd.DataFrame, replacer: Callable, metrics: SEQUENCES_METRIC_TYPES | None = None
) -> Tuple[list, list]:
if self.groups:
final_columns_level_0 = [self.path_id_col]
else:
final_columns_level_0 = self._full_metrics_list
if metrics:
if isinstance(metrics, str): # type: ignore
final_metrics = [metrics]
else:
final_metrics = metrics
if not set(final_metrics).issubset(set(self.ORIGINAL_METRICS_LIST)):
unexpected_metrics = set(final_metrics).difference(set(self.ORIGINAL_METRICS_LIST))
raise ValueError(
f"""Unexpected path_metrics value: {unexpected_metrics}. Should be a metric name or
a list of metric names from {self.ORIGINAL_METRICS_LIST}."""
)
final_columns_level_0 = [replacer(n, n) for n in final_metrics]
final_columns_full = list(plot_sequence_df[final_columns_level_0].columns)
return final_columns_full, final_columns_level_0
def _define_sorting_columns(
self,
sorting: tuple[str | tuple, bool] | tuple[list[str | tuple], list[bool]] | None,
replacer: Callable,
final_columns_full: list,
) -> Tuple[str | list | tuple, bool | list | tuple]:
# sorting
if sorting:
error_message_text_sorting = f"""Should be tuple or list with two elements:
(column_name, bool) or ([column_name], [list of bool]). Note that those two elements should be the
same length. For example: ('count_share', False), ([('count_share', 'delta'),('paths_share', 'delta')],
[False, True])."""
sorting_columns: str | list | tuple
sorting_direction: bool | list | tuple
if not self.groups:
if isinstance(sorting, (list, tuple)): # type: ignore
if isinstance(sorting[0], str) and isinstance(sorting[1], bool):
sorting_columns = [sorting[0]]
sorting_direction = [sorting[1]]
elif len(sorting) == 2 and all(isinstance(elem, (list, tuple)) for elem in sorting):
sorting_columns = sorting[0]
sorting_direction = sorting[1]
if (
not all(isinstance(elem, str) for elem in sorting_columns)
or not all(isinstance(elem, bool) for elem in sorting_direction) # type: ignore
or not len(sorting_columns) == len(sorting_direction) # type: ignore
):
raise TypeError(f"Unexpected sorting parameter type. {error_message_text_sorting}")
else:
raise TypeError(f"Unexpected sorting parameter type. {error_message_text_sorting}")
else:
raise TypeError(f"Unexpected sorting parameter type. {error_message_text_sorting}")
sorting_columns = [(replacer(col, col)) for col in sorting_columns]
else:
if isinstance(sorting, (list, tuple)): # type: ignore
if isinstance(sorting[0], (list, tuple)) and isinstance(sorting[1], bool):
sorting_columns = [sorting[0]]
sorting_direction = [sorting[1]]
elif len(sorting) == 2 and all(isinstance(elem, (list, tuple)) for elem in sorting):
sorting_columns = sorting[0]
sorting_direction = sorting[1]
if len(sorting_columns) != len(sorting_direction): # type: ignore
raise TypeError(f"Unexpected sorting parameter type. {error_message_text_sorting}")
else:
raise TypeError(f"Unexpected sorting parameter type. {error_message_text_sorting}")
sorting_columns = [(replacer(col[0], col[0]), col[1]) for col in sorting_columns]
if not set(sorting_columns).issubset(set(final_columns_full)):
unexpected_sorting_columns = set(sorting_columns).difference(set(final_columns_full))
raise ValueError(
f"""Unexpected sorting_columns name: {unexpected_sorting_columns}.
{error_message_text_sorting}"""
)
else:
if self.groups:
sorting_columns = final_columns_full[3]
sorting_direction = False
else:
sorting_columns = final_columns_full[0]
sorting_direction = False
return sorting_columns, sorting_direction
def _define_threshold_columns(
self, threshold: tuple[str | list, float | int], replacer: Callable, final_columns_full: list
) -> Tuple[list | str | None, float | int | None]:
threshold_column, threshold_value = threshold
error_message_text_thresh = f"""Should be a tuple or list with column_name as the first element and int or
float values as the second.
For example: ('count_share', 0.5), (('count_share', 'delta'), 0.5)."""
if not isinstance(threshold_value, (int, float)): # type: ignore
raise TypeError(f"Unexpected threshold parameter type. {error_message_text_thresh}")
if not self.groups:
if not isinstance(threshold_column, str): # type: ignore
raise TypeError(f"Unexpected threshold parameter type. {error_message_text_thresh}")
else:
threshold_column = replacer(threshold_column, threshold_column)
else:
if not isinstance(threshold_column, (list, tuple)):
raise TypeError(f"Unexpected threshold parameter type. {error_message_text_thresh}")
elif not len(threshold_column) == 2 and all(isinstance(elem, str) for elem in threshold_column): # type: ignore
raise TypeError(f"Unexpected threshold parameter type. {error_message_text_thresh}")
else:
threshold_column = (replacer(threshold_column[0], threshold_column[0]), threshold_column[1]) # type: ignore
if threshold_column not in final_columns_full:
raise ValueError(
f"""Unexpected sorting_columns name: {threshold_column}.
{error_message_text_thresh}"""
)
return threshold_column, threshold_value # type: ignore
def _define_heatmap_columns(
self,
heatmap_columns: str | list[str | tuple] | None,
replacer: Callable,
final_columns_level_0: list,
final_columns_full: list,
) -> list:
error_message_text_heatmap = f"""Should be a column name or a list
of column names. For example: ['count_share', 'delta'],
[('count_share', 'delta'),('paths_share', 'delta')]."""
if heatmap_columns:
if not self.groups:
if isinstance(heatmap_columns, str):
heatmap_columns = [heatmap_columns]
elif isinstance(heatmap_columns, (list, tuple)): # type: ignore
if not all(isinstance(col, str) for col in heatmap_columns):
raise TypeError(f"Unexpected heatmap_columns type. {error_message_text_heatmap}")
else:
raise TypeError(f"Unexpected heatmap_columns type. {error_message_text_heatmap}")
heatmap_columns = [replacer(col, col) for col in heatmap_columns]
else:
if isinstance(heatmap_columns, (list, tuple)):
if len(heatmap_columns) == 2 and all(isinstance(col, str) for col in heatmap_columns):
heatmap_columns = [heatmap_columns] # type: ignore
elif not all(isinstance(col, (list, tuple)) for col in heatmap_columns):
raise TypeError(f"Unexpected heatmap_columns type. {error_message_text_heatmap}")
heatmap_columns = [(replacer(col[0], col[0]), col[1]) for col in heatmap_columns] # type: ignore
if not set(heatmap_columns).issubset(set(final_columns_full)):
unexpected_heatmap_columns = set(heatmap_columns).difference(set(final_columns_full))
raise ValueError(
f"""Unexpected heatmap_columns name: {unexpected_heatmap_columns}.
{error_message_text_heatmap}"""
)
else:
if self.groups:
heatmap_columns = list(itertools.product(final_columns_level_0, [self.RELATIVE_DELTA_COLUMN_NAME]))
else:
heatmap_columns = [f"{self.path_id_col}_share"]
return heatmap_columns # type: ignore
[docs] @time_performance(
scope="sequences",
event_name="plot",
)
def plot(
self,
metrics: SEQUENCES_METRIC_TYPES | None = None,
threshold: tuple[str, float | int] | None = None,
sorting: tuple[str | tuple, bool] | tuple[list[str | tuple], list[bool]] | None = None,
heatmap_cols: str | list[str | tuple] | None = None,
sample_size: int | None = 1,
precision: int = 2,
) -> pd.io.formats.style.Styler: # type: ignore
"""
Parameters
----------
metrics : {'paths', 'paths_share', 'count', 'count_share'}, optional
Specify the path_metrics to be displayed in the plot.
- If groups are specified, by default, only the 'paths' metric will be plotted within each group,
along with both deltas (relative and absolute).
- If ``groups=None``, all four path_metrics will be shown by default.
threshold : tuple[str, float | int], optional
Used to filter out infrequent sequences based on the specified metric.
- Example without groups: ('paths', 1200)
- Example with groups: (('user_id', 'group_1'), 1200)
Only rows with values greater than or equal to 1200 in the specified column will be displayed.
sorting : Tuple(str or list of str, bool or list of bool) or None, default None
- The first element in the tuple: Column name or list of names for sorting.
- The second element: The sorting order (ascending vs. descending). Specify a list for multiple
sorting orders.
If a list of bools is provided, it must match the length of the sorting columns.
heatmap_cols : str or list of str or list of tuples or None
Specifies columns to be represented in the heatmap as follows:
- The heatmap range is calculated column-wise.
- For columns containing negative values, the palette will be divergent (blue - orange with white as zero).
- For columns with only positive values, the palette will be orange.
- For columns with only negative values, the palette will be blue.
Default values
sample_size : int or None, default 1
Number of ID samples to display.
precision : int, default 2
Number of decimal digits to show as fractions in the heatmap.
Returns
-------
pd.io.formats.style.Styler
Styled pd.Dataframe object.
"""
called_params = {
"path_metrics": metrics,
"threshold": threshold,
"sorting": sorting,
"heatmap_cols": heatmap_cols,
"sample_size": sample_size,
"precision": precision,
}
not_hash_values = ["path_metrics"]
plot_sequence_df = self._sequence_df.copy()
heatmap_columns_list, sorting_checked, threshold_checked, final_columns_level_0 = self._check_params(
plot_sequence_df, metrics, threshold, sorting, heatmap_cols
)
sorting_columns, sorting_direction = sorting_checked
threshold_column, threshold_value = threshold_checked
plot_sequence_df = self._size_sample_ids(
data=plot_sequence_df, sample_size=sample_size, final_columns_level_0=final_columns_level_0
)
plot_sequence_df = plot_sequence_df.sort_values(by=sorting_columns, ascending=sorting_direction)
if threshold:
plot_sequence_df = plot_sequence_df[plot_sequence_df[threshold_column] >= abs(threshold_value)]
if precision:
plot_sequence_df = plot_sequence_df.round(precision)
plot_sequence_df_styler = self._create_plot(
data=plot_sequence_df, heatmap_columns=heatmap_columns_list, precision=precision
)
self._show_values_df = plot_sequence_df
collect_data_performance(
scope="sequences",
event_name="metadata",
called_params=called_params,
not_hash_values=not_hash_values,
performance_data={"shape": plot_sequence_df.shape},
eventstream_index=self.__eventstream._eventstream_index,
)
return plot_sequence_df_styler
@property
@time_performance(
scope="sequences",
event_name="values",
)
def values(self) -> pd.DataFrame:
"""
Returns a pd.DataFrame representing the fitted or plotted Sequences table.
Should be used after :py:func:`fit` or :py:func:`plot`.
Returns
-------
pd.DataFrame
"""
return self._show_values_df
@property
@time_performance(
scope="sequences",
event_name="params",
)
def params(self) -> dict[str, tuple | None | str | UserGroupsType | UserGroupsNamesType]:
"""
Returns the parameters used for the last fitting.
Should be used after :py:func:`fit`.
"""
return {
"ngram_range": self.ngram_range,
"groups": self.groups,
"group_names": self.group_names,
"path_id": self.path_id_col,
}