Source code for retentioneering.data_processors_lib.label_cropped_paths

from __future__ import annotations

from typing import Optional, Tuple

import numpy as np
import pandas as pd
from pandas import DataFrame

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ReteTimeWidget


[docs]class LabelCroppedPathsParams(ParamsModel): """ A class with parameters for :py:class:`.LabelCroppedPaths` class. """ left_cutoff: Optional[Tuple[float, DATETIME_UNITS]] right_cutoff: Optional[Tuple[float, DATETIME_UNITS]] _widgets = { "left_cutoff": ReteTimeWidget(), "right_cutoff": ReteTimeWidget(), }
[docs]@docstrings.get_sections(base="LabelCroppedPaths") # type: ignore class LabelCroppedPaths(DataProcessor): """ Create new synthetic event(s) for each user based on the timeout threshold: ``cropped_left`` or ``cropped_right`` Parameters ---------- left_cutoff : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional Threshold value with its unit of measure. The timedelta between the last event in each user's path and the first event in the whole eventstream is calculated. For users with the timedelta less than the selected ``left_cutoff``, the new ``cropped_left`` event is added. right_cutoff : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional Threshold value with its unit of measure. The timedelta between the first event in each user's path and the last event in the whole eventstream is calculated. For users with timedelta less than the selected ``right_cutoff``, the new ``cropped_right`` event is added. Returns ------- Eventstream Eventstream containing only the generated synthetic events, for users whose paths satisfy the specified cut-offs. +-------------------+-------------------+------------------+ | **event_name** | **event_type** | **timestamp** | +-------------------+-------------------+------------------+ | cropped_left | cropped_left | first_event | +-------------------+-------------------+------------------+ | cropped_right | cropped_right | last_event | +-------------------+-------------------+------------------+ See Also -------- .TimedeltaHist : Plot the distribution of the time deltas between two events. .UserLifetimeHist : Plot the distribution of user lifetimes. Raises ------ ValueError If both of ``left_cutoff`` and ``right_cutoff`` are empty. Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: LabelCroppedPathsParams @time_performance( scope="label_cropped_paths", event_name="init", ) def __init__(self, params: LabelCroppedPathsParams): super().__init__(params=params) @time_performance( # type: ignore scope="label_cropped_paths", event_name="apply", ) def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: user_col = schema.user_id time_col = schema.event_timestamp type_col = schema.event_type event_col = schema.event_name left_cutoff, left_truncated_unit = None, None right_cutoff, right_truncated_unit = None, None if self.params.left_cutoff: left_cutoff, left_truncated_unit = self.params.left_cutoff if self.params.right_cutoff: right_cutoff, right_truncated_unit = self.params.right_cutoff labeled_events = pd.DataFrame() if not left_cutoff and not right_cutoff: raise ValueError("Either left_cutoff or right_cutoff must be specified!") userpath = df.groupby(user_col)[time_col].agg([np.min, np.max]).rename(columns={"amin": "start", "amax": "end"}) if left_cutoff: timedelta = (userpath["end"] - df[time_col].min()) / np.timedelta64(1, left_truncated_unit) # type: ignore cropped_users_index = userpath[timedelta < left_cutoff].index left_labeled_events = df.groupby(user_col).first().reindex(cropped_users_index).reset_index() left_labeled_events[event_col] = "cropped_left" left_labeled_events[type_col] = "cropped_left" labeled_events = pd.concat([labeled_events, left_labeled_events]) if right_cutoff: timedelta = (df[time_col].max() - userpath["start"]) / np.timedelta64( 1, right_truncated_unit # type: ignore ) cropped_users_index = userpath[timedelta < right_cutoff].index right_labeled_events = df.groupby(user_col).last().reindex(cropped_users_index).reset_index() right_labeled_events[event_col] = "cropped_right" right_labeled_events[type_col] = "cropped_right" labeled_events = pd.concat([labeled_events, right_labeled_events]) result = pd.concat([df, labeled_events]) collect_data_performance( scope="label_cropped_paths", event_name="metadata", called_params=self.to_dict()["values"], not_hash_values=["left_cutoff", "right_cutoff"], performance_data={ "parent": { "shape": df.shape, "hash": hash_dataframe(df), }, "child": { "shape": result.shape, "hash": hash_dataframe(result), }, }, ) return result