Source code for retentioneering.data_processors_lib.add_positive_events

from __future__ import annotations

from typing import Callable, List

import pandas as pd

from retentioneering.backend.tracker import track
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.widget.widgets import ListOfString, ReteFunction


def _default_func(eventstream: EventstreamType, targets: list[str]) -> pd.DataFrame:
    """
    Filter rows with target events from the input eventstream.

    Parameters
    ----------
    eventstream : Eventstream
        Source eventstream or output from previous nodes.

    targets : list of str
        Condition for eventstream filtering.
        Each event from that list is associated with a conversion goal of the user behaviour in the product.
        If there are several target events in user path - the event with minimum timestamp is taken.

    Returns
    -------
    pd.DataFrame
        Filtered DataFrame with targets and its timestamps.
    """
    user_col = eventstream.schema.user_id
    time_col = eventstream.schema.event_timestamp
    event_col = eventstream.schema.event_name
    df = eventstream.to_dataframe()

    targets_index = df[df[event_col].isin(targets)].groupby(user_col)[time_col].idxmin()  # type: ignore

    return df.loc[targets_index]  # type: ignore


[docs]class AddPositiveEventsParams(ParamsModel): """ A class with parameters for :py:class:`.AddPositiveEvents` class. """ targets: List[str] func: Callable = _default_func _widgets = {"func": ReteFunction(), "targets": ListOfString()}
[docs]class AddPositiveEvents(DataProcessor): """ Create new synthetic events in paths of all users having the specified events: ``positive_target_RAW_EVENT_NAME`` Parameters ---------- targets : list of str Define the list of events we consider positive. If there are several target events in a user path, the event with the minimum timestamp is taken. func : Callable, default _default_func Filter rows with target events from the input eventstream. Returns ------- Eventstream ``Eventstream`` with new synthetic events only added to users who fit the conditions. +--------------------------------+-----------------+-----------------------------+ | **event_name** | **event_type** | **timestamp** | +--------------------------------+-----------------+-----------------------------+ | positive_target_RAW_EVENT_NAME | positive_target | min(targets) | +--------------------------------+-----------------+-----------------------------+ Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: AddPositiveEventsParams @track( # type: ignore tracking_info={"event_name": "init"}, scope="add_positive_events", allowed_params=[], ) def __init__(self, params: AddPositiveEventsParams): super().__init__(params=params) @track( # type: ignore tracking_info={"event_name": "apply"}, scope="add_positive_events", allowed_params=[], ) def apply(self, eventstream: EventstreamType) -> EventstreamType: from retentioneering.eventstream.eventstream import Eventstream type_col = eventstream.schema.event_type event_col = eventstream.schema.event_name func: Callable[[EventstreamType, list[str]], pd.DataFrame] = self.params.func targets = self.params.targets positive_targets = func(eventstream, targets) positive_targets[type_col] = "positive_target" positive_targets[event_col] = "positive_target_" + positive_targets[event_col] positive_targets["ref"] = None eventstream = Eventstream( raw_data_schema=eventstream.schema.to_raw_data_schema(), raw_data=positive_targets, relations=[{"raw_col": "ref", "eventstream": eventstream}], ) return eventstream