Source code for retentioneering.data_processors_lib.drop_paths

from __future__ import annotations

from typing import Optional, Tuple

import numpy as np
import pandas as pd

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ReteTimeWidget


[docs]class DropPathsParams(ParamsModel): """ A class with parameters for :py:class:`.DropPaths` class. """ min_steps: Optional[int] min_time: Optional[Tuple[float, DATETIME_UNITS]] _widgets = { "min_time": ReteTimeWidget(), }
[docs]@docstrings.get_sections(base="DropPaths") # type: ignore class DropPaths(DataProcessor): """ Filter user paths based on the path length, removing the paths that are shorter than the specified number of events or time. Parameters ---------- min_steps : int, optional Minimum user path length measured in the number of events. min_time : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional Minimum user path length and its unit of measure. Returns ------- Eventstream ``Eventstream`` with events that should be deleted from input ``eventstream`` marked ``_deleted=True``. Raises ------ ValueError If both of ``min_steps`` and ``min_time`` are empty or both are given. See Also -------- .TimedeltaHist : Plot the distribution of the time deltas between two events. .UserLifetimeHist : Plot the distribution of user lifetimes. Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: DropPathsParams @time_performance( scope="drop_paths", event_name="init", ) def __init__(self, params: DropPathsParams): super().__init__(params=params) @time_performance( scope="drop_paths", event_name="apply", ) def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: user_col = schema.user_id time_col = schema.event_timestamp min_time, time_unit = None, None min_steps = self.params.min_steps if self.params.min_time: min_time, time_unit = self.params.min_time if min_steps and min_time: raise ValueError("min_steps and min_time parameters cannot be used simultaneously!") if not min_steps and not min_time: raise ValueError("Either min_steps or min_time must be specified!") if min_time and time_unit: userpath = ( df.groupby(user_col)[time_col].agg([np.min, np.max]).rename(columns={"amin": "start", "amax": "end"}) ) mask_ = (userpath["end"] - userpath["start"]) / np.timedelta64(1, time_unit) < min_time # type: ignore else: userpath = df.groupby([user_col])[[time_col]].nunique().rename(columns={time_col: "length"}) mask_ = userpath["length"] < min_steps users_to_delete = userpath[mask_].index result = df[~df[user_col].isin(users_to_delete)] collect_data_performance( scope="drop_paths", event_name="metadata", called_params=self.to_dict()["values"], not_hash_values=["min_time"], performance_data={ "parent": { "shape": df.shape, "hash": hash_dataframe(df), }, "child": { "shape": result.shape, "hash": hash_dataframe(result), }, }, ) return result