Source code for retentioneering.data_processors_lib.drop_paths

from __future__ import annotations

from typing import Optional, Tuple

import numpy as np

from retentioneering.backend.tracker import track
from retentioneering.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.widget.widgets import ReteTimeWidget


[docs]class DropPathsParams(ParamsModel): """ A class with parameters for :py:class:`.DropPaths` class. """ min_steps: Optional[int] min_time: Optional[Tuple[float, DATETIME_UNITS]] _widgets = { "min_time": ReteTimeWidget(), }
[docs]class DropPaths(DataProcessor): """ Filter user paths based on the path length, removing the paths that are shorter than the specified number of events or time. Parameters ---------- min_steps : int, optional Minimum user path length measured in the number of events. min_time : Tuple(float, :numpy_link:`DATETIME_UNITS<>`), optional Minimum user path length and its unit of measure. Returns ------- Eventstream ``Eventstream`` with events that should be deleted from input ``eventstream`` marked ``_deleted=True``. Raises ------ ValueError If both of ``min_steps`` and ``min_time`` are empty or both are given. See Also -------- .TimedeltaHist : Plot the distribution of the time deltas between two events. .UserLifetimeHist : Plot the distribution of user lifetimes. Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: DropPathsParams @track( # type: ignore tracking_info={"event_name": "init"}, scope="drop_paths", allowed_params=[], ) def __init__(self, params: DropPathsParams): super().__init__(params=params) @track( # type: ignore tracking_info={"event_name": "apply"}, scope="drop_paths", allowed_params=[], ) def apply(self, eventstream: EventstreamType) -> EventstreamType: from retentioneering.eventstream.eventstream import Eventstream user_col = eventstream.schema.user_id time_col = eventstream.schema.event_timestamp min_time, time_unit = None, None min_steps = self.params.min_steps if self.params.min_time: min_time, time_unit = self.params.min_time if min_steps and min_time: raise ValueError("min_steps and min_time parameters cannot be used simultaneously!") if not min_steps and not min_time: raise ValueError("Either min_steps or min_time must be specified!") events = eventstream.to_dataframe(copy=True) if min_time and time_unit: userpath = ( events.groupby(user_col)[time_col] .agg([np.min, np.max]) .rename(columns={"amin": "start", "amax": "end"}) ) mask_ = (userpath["end"] - userpath["start"]) / np.timedelta64(1, time_unit) < min_time # type: ignore else: userpath = events.groupby([user_col])[[time_col]].nunique().rename(columns={time_col: "length"}) mask_ = userpath["length"] < min_steps users_to_delete = userpath[mask_].index events = events[events[user_col].isin(users_to_delete)] events["ref"] = events.loc[:, eventstream.schema.event_id] eventstream = Eventstream( raw_data_schema=eventstream.schema.to_raw_data_schema(), raw_data=events, relations=[{"raw_col": "ref", "eventstream": eventstream}], ) if not events.empty: eventstream._soft_delete(eventstream.to_dataframe()) return eventstream