Source code for retentioneering.data_processors_lib.add_positive_events

from __future__ import annotations

from typing import Callable, List

import pandas as pd

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType, EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfString, ReteFunction


def _default_func(eventstream: EventstreamType, targets: list[str]) -> pd.DataFrame:
    """
    Filter rows with target events from the input eventstream.

    Parameters
    ----------
    eventstream : Eventstream
        Source eventstream or output from previous nodes.
    targets : list of str
        Condition for eventstream filtering.
        Each event from that list is associated with a conversion goal of the user behaviour in the product.
        If there are several target events in user path - the event with minimum timestamp is taken.

    Returns
    -------
    pd.DataFrame
        Filtered DataFrame with targets and its timestamps.
    """
    user_col = eventstream.schema.user_id
    time_col = eventstream.schema.event_timestamp
    event_col = eventstream.schema.event_name
    df = eventstream.to_dataframe()

    targets_index = df[df[event_col].isin(targets)].groupby(user_col)[time_col].idxmin()  # type: ignore

    return df.loc[targets_index]  # type: ignore


[docs]class AddPositiveEventsParams(ParamsModel): """ A class with parameters for :py:class:`.AddPositiveEvents` class. """ targets: List[str] func: Callable = _default_func _widgets = {"func": ReteFunction(), "targets": ListOfString()}
[docs]@docstrings.get_sections(base="AddPositiveEvents") # type: ignore class AddPositiveEvents(DataProcessor): """ Create new synthetic events in paths of all users having the specified events: ``positive_target_RAW_EVENT_NAME`` Parameters ---------- targets : list of str Define the list of events we consider positive. If there are several target events in a user path, the event with the minimum timestamp is taken. func : Callable, default _default_func Filter rows with target events from the input eventstream. Returns ------- Eventstream ``Eventstream`` with new synthetic events only added to users who fit the conditions. +--------------------------------+-----------------+-----------------------------+ | **event_name** | **event_type** | **timestamp** | +--------------------------------+-----------------+-----------------------------+ | positive_target_RAW_EVENT_NAME | positive_target | min(targets) | +--------------------------------+-----------------+-----------------------------+ Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: AddPositiveEventsParams @time_performance( scope="add_positive_events", event_name="init", ) def __init__(self, params: AddPositiveEventsParams): super().__init__(params=params) @time_performance( scope="add_positive_events", event_name="apply", ) def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: from retentioneering.eventstream.eventstream import Eventstream type_col = schema.event_type event_col = schema.event_name func: Callable[[EventstreamType, list[str]], pd.DataFrame] = self.params.func targets = self.params.targets eventstream = Eventstream( raw_data_schema=schema.to_raw_data_schema(event_index=True), raw_data=df, ) positive_targets = func(eventstream, targets) positive_targets[type_col] = "positive_target" positive_targets[event_col] = "positive_target_" + positive_targets[event_col] result = pd.concat([eventstream.to_dataframe(), positive_targets]) collect_data_performance( scope="add_positive_events", event_name="metadata", called_params=self.to_dict()["values"], performance_data={ "parent": { "shape": df.shape, "hash": hash_dataframe(df), }, "child": { "shape": result.shape, "hash": hash_dataframe(result), }, }, ) return result