Source code for retentioneering.data_processors_lib.label_lost_users
from __future__ import annotations
from typing import List, Optional, Tuple, Union
import numpy as np
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.common.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfIds, ReteTimeWidget
[docs]class LabelLostUsersParams(ParamsModel):
    """
    A class with parameters for :py:class:`.LabelLostUsers` class.
    """
    timeout: Optional[Tuple[float, DATETIME_UNITS]]
    lost_users_list: Optional[Union[List[int], List[str]]]
    _widgets = {
        "timeout": ReteTimeWidget(),
        "lost_users_list": ListOfIds(),
    } 
[docs]@docstrings.get_sections(base="LabelLostUsers")  # type: ignore
class LabelLostUsers(DataProcessor):
    """
    Create one of synthetic events in each user's path:
    ``lost_user`` or ``absent_user``.
    Parameters
    ----------
    Only one of parameters could be used at the same time
    timeout : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional
        Threshold value and its unit of measure.
        Calculate timedelta between the last event in each user's path and the last event in the whole eventstream.
        For users with timedelta greater or equal to selected ``timeout``, a new synthetic event - ``lost_user``
        will be added.
        For other users paths a new synthetic event - ``absent_user`` will be added.
    lost_users_list : list of int or list of str, optional
        If the `list of user_ids` is given new synthetic event - ``lost_user`` will be added to each user from the list.
        For other user's paths will be added new synthetic event - ``absent_user``.
    Returns
    -------
    Eventstream
        ``Eventstream`` with new synthetic events only - one for each user:
        +-----------------+-----------------+------------------+
        | **event_name**  | **event_type**  |  **timestamp**   |
        +-----------------+-----------------+------------------+
        | lost_user       | lost_user       | last_event       |
        +-----------------+-----------------+------------------+
        | absent_user     | absent_user     | last_event       |
        +-----------------+-----------------+------------------+
    Raises
    ------
    ValueError
        Raised when both ``timeout`` and ``lost_users_list`` are either empty or given.
    Notes
    -----
    See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
    """
    params: LabelLostUsersParams
    @time_performance(
        scope="label_lost_users",
        event_name="init",
    )
    def __init__(self, params: LabelLostUsersParams):
        super().__init__(params=params)
    @time_performance(  # type: ignore
        scope="label_lost_users",
        event_name="apply",
    )
    def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
        user_col = schema.user_id
        time_col = schema.event_timestamp
        type_col = schema.event_type
        event_col = schema.event_name
        timeout, timeout_unit = None, None
        lost_users_list = self.params.lost_users_list
        data_lost = pd.DataFrame()
        if self.params.timeout:
            timeout, timeout_unit = self.params.timeout
        if timeout and lost_users_list:
            raise ValueError("timeout and lost_users_list parameters cannot be used simultaneously!")
        if not timeout and not lost_users_list:
            raise ValueError("Either timeout or lost_users_list must be specified!")
        if timeout and timeout_unit:
            data_lost = df.groupby(user_col, as_index=False).last()
            data_lost["diff_end_to_end"] = data_lost[time_col].max() - data_lost[time_col]
            data_lost["diff_end_to_end"] /= np.timedelta64(1, timeout_unit)  # type: ignore
            data_lost[type_col] = np.where(data_lost["diff_end_to_end"] < timeout, "absent_user", "lost_user")
            data_lost[event_col] = data_lost[type_col]
            del data_lost["diff_end_to_end"]
        if lost_users_list:
            data_lost = df.groupby(user_col, as_index=False).last()
            data_lost[type_col] = np.where(data_lost["user_id"].isin(lost_users_list), "lost_user", "absent_user")
            data_lost[event_col] = data_lost[type_col]
        result = pd.concat([df, data_lost])
        collect_data_performance(
            scope="label_lost_users",
            event_name="metadata",
            called_params=self.to_dict()["values"],
            not_hash_values=["timeout"],
            performance_data={
                "parent": {
                    "shape": df.shape,
                    "hash": hash_dataframe(df),
                },
                "child": {
                    "shape": result.shape,
                    "hash": hash_dataframe(result),
                },
            },
        )
        return result