Source code for retentioneering.data_processors_lib.label_lost_users

from __future__ import annotations

from typing import List, Optional, Tuple, Union

import numpy as np
import pandas as pd

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfIds, ReteTimeWidget


[docs]class LabelLostUsersParams(ParamsModel): """ A class with parameters for :py:class:`.LabelLostUsers` class. """ timeout: Optional[Tuple[float, DATETIME_UNITS]] lost_users_list: Optional[Union[List[int], List[str]]] _widgets = { "timeout": ReteTimeWidget(), "lost_users_list": ListOfIds(), }
[docs]@docstrings.get_sections(base="LabelLostUsers") # type: ignore class LabelLostUsers(DataProcessor): """ Create one of synthetic events in each user's path: ``lost_user`` or ``absent_user``. Parameters ---------- Only one of parameters could be used at the same time timeout : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional Threshold value and its unit of measure. Calculate timedelta between the last event in each user's path and the last event in the whole eventstream. For users with timedelta greater or equal to selected ``timeout``, a new synthetic event - ``lost_user`` will be added. For other users paths a new synthetic event - ``absent_user`` will be added. lost_users_list : list of int or list of str, optional If the `list of user_ids` is given new synthetic event - ``lost_user`` will be added to each user from the list. For other user's paths will be added new synthetic event - ``absent_user``. Returns ------- Eventstream ``Eventstream`` with new synthetic events only - one for each user: +-----------------+-----------------+------------------+ | **event_name** | **event_type** | **timestamp** | +-----------------+-----------------+------------------+ | lost_user | lost_user | last_event | +-----------------+-----------------+------------------+ | absent_user | absent_user | last_event | +-----------------+-----------------+------------------+ Raises ------ ValueError Raised when both ``timeout`` and ``lost_users_list`` are either empty or given. Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: LabelLostUsersParams @time_performance( scope="label_lost_users", event_name="init", ) def __init__(self, params: LabelLostUsersParams): super().__init__(params=params) @time_performance( # type: ignore scope="label_lost_users", event_name="apply", ) def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: user_col = schema.user_id time_col = schema.event_timestamp type_col = schema.event_type event_col = schema.event_name timeout, timeout_unit = None, None lost_users_list = self.params.lost_users_list data_lost = pd.DataFrame() if self.params.timeout: timeout, timeout_unit = self.params.timeout if timeout and lost_users_list: raise ValueError("timeout and lost_users_list parameters cannot be used simultaneously!") if not timeout and not lost_users_list: raise ValueError("Either timeout or lost_users_list must be specified!") if timeout and timeout_unit: data_lost = df.groupby(user_col, as_index=False).last() data_lost["diff_end_to_end"] = data_lost[time_col].max() - data_lost[time_col] data_lost["diff_end_to_end"] /= np.timedelta64(1, timeout_unit) # type: ignore data_lost[type_col] = np.where(data_lost["diff_end_to_end"] < timeout, "absent_user", "lost_user") data_lost[event_col] = data_lost[type_col] del data_lost["diff_end_to_end"] if lost_users_list: data_lost = df.groupby(user_col, as_index=False).last() data_lost[type_col] = np.where(data_lost["user_id"].isin(lost_users_list), "lost_user", "absent_user") data_lost[event_col] = data_lost[type_col] result = pd.concat([df, data_lost]) collect_data_performance( scope="label_lost_users", event_name="metadata", called_params=self.to_dict()["values"], not_hash_values=["timeout"], performance_data={ "parent": { "shape": df.shape, "hash": hash_dataframe(df), }, "child": { "shape": result.shape, "hash": hash_dataframe(result), }, }, ) return result