Source code for retentioneering.data_processors_lib.collapse_loops

from __future__ import annotations

import uuid
import warnings
from typing import Any, Callable, Literal, Optional

import pandas as pd

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import EnumWidget


def _numeric_values_processing(x: pd.Series) -> int | float:
    return x.mean()


def _string_values_processing(x: pd.Series) -> str | None:
    # check if all the values in the collapsing group are equal
    # NaN values are ignored
    if x.nunique() == 1:
        return x.dropna().max()
    else:
        return None


[docs]class CollapseLoopsParams(ParamsModel): """ A class with parameters for :py:class:`.CollapseLoops` class. """ suffix: Optional[Literal["loop", "count"]] time_agg: Literal["max", "min", "mean"] = "min" _widgets = { "time_agg": EnumWidget(optional=False, params=["max", "min", "mean"], default="min"), }
[docs]@docstrings.get_sections(base="CollapseLoops") # type: ignore class CollapseLoops(DataProcessor): """ Find ``loops`` and create new synthetic events in the paths of all users having such sequences. A ``loop`` - is a sequence of repetitive events. For example *"event1 -> event1"* Parameters ---------- suffix : {"loop", "count"}, optional - If ``None``, event_name will be event_name without any changes.\n For example *"event1 - event1 - event1"* --> event1. - If ``loop``, event_name will be event_name_loop.\n For example *"event1 - event1 - event1"* --> event1_loop. - If ``count``, event_name will be event_name_loop_{number of events}.\n For example *"event1 - event1 - event1"* --> event1_loop_3. time_agg : {"max", "min", "mean"}, default "min" Aggregation method to calculate timestamp values for new groups. Returns ------- Eventstream Eventstream with: - raw events: the returned events will be marked ``_deleted=True`` and soft-deleted from input eventstream. - new synthetic events: the returned events will be added to the input eventstream with columns below. +------------------------+----------------+--------------------------------------------+ | **event_name** | **event_type** | **timestamp** | +------------------------+----------------+--------------------------------------------+ | event_name_loop | group_alias | min/max/mean(group of repetitive events)) | +------------------------+----------------+--------------------------------------------+ | event_name_loop_{count}| group_alias | (min/max/mean(group of repetitive events)) | +------------------------+----------------+--------------------------------------------+ | event_name | group_alias | (min/max/mean(group of repetitive events)) | +------------------------+----------------+--------------------------------------------+ See Also -------- .StepMatrix : This class provides methods for step matrix calculation and visualization. .RawDataSchema : Define schema for ``raw_data`` columns names. Notes ----- If an eventstream contains custom columns they will be aggregated in the following way: - for numeric columns the mean value will be calculated for each collapsed group. ``None`` values are ignored. Supported numeric types are: ``bool``, ``int``, ``float``. - for string columns, if all the values to be aggregated in the collapsing group are equal, this single value will be returned, otherwise - ``None``. ``None`` values in the input data will be ignored. See :doc:`Data processors user guide</user_guides/dataprocessors>` and :ref:`Eventstream custom columns' explanation<eventstream_custom_fields>` for the details. """ params: CollapseLoopsParams NUMERIC_DTYPES = ["integer", "floating", "boolean", "mixed-integer-float"] @time_performance( scope="collapse_loops", event_name="init", ) def __init__(self, params: CollapseLoopsParams): super().__init__(params=params) @time_performance( scope="collapse_loops", event_name="apply", ) def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: source = df.copy() user_col = schema.user_id time_col = schema.event_timestamp type_col = schema.event_type event_col = schema.event_name event_index_col = schema.event_index custom_cols: list = schema.custom_cols event_id = schema.event_id suffix = self.params.suffix time_agg = self.params.time_agg full_agg: dict[str, Any] = {} if len(custom_cols) > 0: df_custom_cols = df[custom_cols] default_agg: dict[str, Callable] = {} for col in df_custom_cols.columns: if pd.api.types.infer_dtype(df_custom_cols[col]) in self.NUMERIC_DTYPES: default_agg[col] = _numeric_values_processing # type: ignore elif pd.api.types.infer_dtype(df_custom_cols[col]) == "string": default_agg[col] = _string_values_processing # type: ignore else: doc_link = "https://pandas.pydata.org/docs/reference/api/pandas.api.types.infer_dtype.html" message = ( f"Column '{col}' with " f"'{pd.api.types.infer_dtype(df_custom_cols[col])}'" f" data type is not supported for collapsing. See {doc_link}" ) raise TypeError(message) full_agg = default_agg else: full_agg = {} df["grp"] = df[event_col] != df.groupby(user_col)[event_col].shift(1) df["cumgroup"] = df.groupby(user_col)["grp"].cumsum() df["count"] = df.groupby([user_col, "cumgroup"]).cumcount() + 1 df["collapsed"] = df.groupby([user_col, "cumgroup", event_col])["count"].transform(max) df["collapsed"] = df["collapsed"].apply(lambda x: False if x == 1 else True) loops = ( df[df["collapsed"] == 1] .groupby([user_col, "cumgroup", event_col]) .agg({time_col: time_agg, "count": "max", **full_agg, event_index_col: time_agg}) .reset_index() ) loops[event_index_col] = loops[event_index_col].astype("int64") if suffix == "loop": loops[event_col] = loops[event_col].map(str) + "_loop" elif suffix == "count": loops[event_col] = loops[event_col].map(str) + "_loop_" + loops["count"].map(str) loops[type_col] = "group_alias" df_to_del = df[df["collapsed"] == 1] if len(custom_cols) > 0: cols_to_show = [user_col, time_col, type_col, event_col] + custom_cols link_url = ( "https://doc.retentioneering.com/stable/doc/api/preprocessing/data_processors/collapse_loops.html" ) message_template = """ \nThere are NaN values in the {where_nans} columns! \nThe total amount of NaN values in each column: \n{total_nan_amount} \nAs a reference, here are some rows where NaNs occurred: \n{nan_example}{input_data_add_info} \nFor more information, see collapse_loops documentation {link}\n """ if df_to_del[custom_cols].isna().values.sum() > 0: cols_with_na = df_to_del[custom_cols].isna().sum() rows_to_show = df_to_del[df_to_del[custom_cols].isnull().any(axis=1)].head(3) warning_message = message_template.format( where_nans="input", total_nan_amount=cols_with_na, nan_example=rows_to_show[cols_to_show], input_data_add_info="\n\nThese NaN values will be ignored in the further calculation", link=link_url, ) warnings.warn(warning_message) if loops[custom_cols].isna().values.sum() > 0: cols_with_na = loops[custom_cols].isna().sum() rows_to_show = loops[loops[custom_cols].isnull().any(axis=1)].head(3) warning_message = message_template.format( where_nans="custom", total_nan_amount=cols_with_na, nan_example=rows_to_show[cols_to_show], input_data_add_info="", link=link_url, ) warnings.warn(warning_message) updated = source if not df_to_del.empty: updated = source[~source[event_id].isin(df_to_del[event_id])] loops[schema.event_id] = [uuid.uuid4() for x in range(len(loops))] result = pd.concat([updated, loops]) result = result.drop(["cumgroup", "count"], axis=1, errors="ignore") collect_data_performance( scope="collapse_loops", event_name="metadata", called_params=self.to_dict()["values"], not_hash_values=["suffix", "time_agg"], performance_data={ "parent": { "shape": df.shape, "hash": hash_dataframe(df), }, "child": { "shape": result.shape, "hash": hash_dataframe(result), }, }, ) return result