Source code for retentioneering.data_processors_lib.add_negative_events
from __future__ import annotations
from typing import Any, Callable, List
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.schema import EventstreamSchema
from retentioneering.eventstream.types import EventstreamSchemaType, EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfString, ReteFunction
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchema], Any]
def _default_func(eventstream: EventstreamType, targets: List[str]) -> pd.DataFrame:
"""
Filter rows with target events from the input eventstream.
Parameters
----------
eventstream : Eventstream
Source eventstream or output from previous nodes.
targets : list of str
Each event from that list is associated with the bad result (scenario)
of user's behaviour (experience) in the product.
If there are several target events in user path - the event with minimum timestamp is taken.
Returns
-------
pd.DataFrame
Filtered DataFrame with targets and its timestamps.
"""
user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
event_col = eventstream.schema.event_name
df = eventstream.to_dataframe()
targets_index = df[df[event_col].isin(targets)].groupby(user_col)[time_col].idxmin() # type: ignore
return df.loc[targets_index] # type: ignore
[docs]class AddNegativeEventsParams(ParamsModel):
"""
A class with parameters for :py:class:`.AddNegativeEvents` class.
"""
targets: List[str]
# @TODO: remove eventstream from the "func" signature in a future major release. Aleksei Avramenko
func: Callable = _default_func
_widgets = {"func": ReteFunction(), "targets": ListOfString()}
[docs]@docstrings.get_sections(base="AddNegativeEvents") # type: ignore
class AddNegativeEvents(DataProcessor):
"""
Create new synthetic events in paths of all users having the specified event(s):
``negative_target_RAW_EVENT_NAME``.
Parameters
----------
targets : list of str
Define the list of events that we consider negative.
If there are several target events in the user path, the event with the minimum timestamp is taken.
func : Callable, default _default_func_negative
Filter rows with target events from the input eventstream.
Returns
-------
Eventstream
``Eventstream`` with new synthetic events only added to the users who fit the conditions.
+--------------------------------+-----------------+-----------------------------+
| **event_name** | **event_type** | **timestamp** |
+--------------------------------+-----------------+-----------------------------+
| negative_target_RAW_EVENT_NAME | negative_target | min(targets) |
+--------------------------------+-----------------+-----------------------------+
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: AddNegativeEventsParams
@time_performance(
scope="add_negative_events",
event_name="init",
)
def __init__(self, params: AddNegativeEventsParams):
super().__init__(params=params)
@time_performance(
scope="add_negative_events",
event_name="apply",
)
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
from retentioneering.eventstream.eventstream import Eventstream
type_col = schema.event_type
event_col = schema.event_name
func = self.params.func
targets = self.params.targets
eventstream = Eventstream(
raw_data_schema=schema.to_raw_data_schema(event_index=True),
raw_data=df,
add_start_end_events=False,
)
# @TODO: remove eventstream from the "func" signature in a future major release. Aleksei Avramenko
negative_targets: pd.DataFrame = func(eventstream, targets)
negative_targets[type_col] = "negative_target"
negative_targets[event_col] = "negative_target_" + negative_targets[event_col]
result = pd.concat([eventstream.to_dataframe(), negative_targets])
collect_data_performance(
scope="add_negative_events",
event_name="metadata",
called_params=self.to_dict()["values"],
performance_data={
"parent": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
"child": {
"shape": result.shape,
"hash": hash_dataframe(result),
},
},
)
return result