Source code for retentioneering.data_processors_lib.add_positive_events
from __future__ import annotations
from typing import Callable, List
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType, EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfString, ReteFunction
def _default_func(eventstream: EventstreamType, targets: list[str]) -> pd.DataFrame:
    """
    Filter rows with target events from the input eventstream.
    Parameters
    ----------
    eventstream : Eventstream
        Source eventstream or output from previous nodes.
    targets : list of str
        Condition for eventstream filtering.
        Each event from that list is associated with a conversion goal of the user behaviour in the product.
        If there are several target events in user path - the event with minimum timestamp is taken.
    Returns
    -------
    pd.DataFrame
        Filtered DataFrame with targets and its timestamps.
    """
    user_col = eventstream.schema.user_id
    time_col = eventstream.schema.event_timestamp
    event_col = eventstream.schema.event_name
    df = eventstream.to_dataframe()
    targets_index = df[df[event_col].isin(targets)].groupby(user_col)[time_col].idxmin()  # type: ignore
    return df.loc[targets_index]  # type: ignore
[docs]class AddPositiveEventsParams(ParamsModel):
    """
    A class with parameters for :py:class:`.AddPositiveEvents` class.
    """
    targets: List[str]
    func: Callable = _default_func
    _widgets = {"func": ReteFunction(), "targets": ListOfString()} 
[docs]@docstrings.get_sections(base="AddPositiveEvents")  # type: ignore
class AddPositiveEvents(DataProcessor):
    """
    Create new synthetic events in paths of all users having the specified events:
    ``positive_target_RAW_EVENT_NAME``
    Parameters
    ----------
    targets : list of str
        Define the list of events we consider positive.
        If there are several target events in a user path, the event with the minimum timestamp is taken.
    func : Callable, default _default_func
        Filter rows with target events from the input eventstream.
    Returns
    -------
    Eventstream
        ``Eventstream`` with new synthetic events only added to users who fit the conditions.
        +--------------------------------+-----------------+-----------------------------+
        | **event_name**                 | **event_type**  | **timestamp**               |
        +--------------------------------+-----------------+-----------------------------+
        | positive_target_RAW_EVENT_NAME | positive_target | min(targets)                |
        +--------------------------------+-----------------+-----------------------------+
    Notes
    -----
    See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
    """
    params: AddPositiveEventsParams
    @time_performance(
        scope="add_positive_events",
        event_name="init",
    )
    def __init__(self, params: AddPositiveEventsParams):
        super().__init__(params=params)
    @time_performance(
        scope="add_positive_events",
        event_name="apply",
    )
    def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
        from retentioneering.eventstream.eventstream import Eventstream
        type_col = schema.event_type
        event_col = schema.event_name
        func: Callable[[EventstreamType, list[str]], pd.DataFrame] = self.params.func
        targets = self.params.targets
        eventstream = Eventstream(
            raw_data_schema=schema.to_raw_data_schema(event_index=True),
            raw_data=df,
        )
        positive_targets = func(eventstream, targets)
        positive_targets[type_col] = "positive_target"
        positive_targets[event_col] = "positive_target_" + positive_targets[event_col]
        result = pd.concat([eventstream.to_dataframe(), positive_targets])
        collect_data_performance(
            scope="add_positive_events",
            event_name="metadata",
            called_params=self.to_dict()["values"],
            performance_data={
                "parent": {
                    "shape": df.shape,
                    "hash": hash_dataframe(df),
                },
                "child": {
                    "shape": result.shape,
                    "hash": hash_dataframe(result),
                },
            },
        )
        return result