Source code for retentioneering.data_processors_lib.add_start_end_events
from __future__ import annotations
import pandas as pd
from pandas import DataFrame
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
class AddStartEndEventsParams(ParamsModel):
pass
[docs]@docstrings.get_sections(base="AddStartEndEvents") # type: ignore
class AddStartEndEvents(DataProcessor):
"""
Create two synthetic events in each user's path:
``path_start`` and ``path_end``.
Returns
-------
Eventstream
``Eventstream`` with new synthetic events only - two for each user:
+----------------+----------------+----------------+
| **event_name** | **event_type** | **timestamp** |
+----------------+----------------+----------------+
| path_start | path_start | first_event |
+----------------+----------------+----------------+
| path_end | path_end | last_event |
+----------------+----------------+----------------+
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: AddStartEndEventsParams
@time_performance(scope="add_start_end_events", event_name="init")
def __init__(self, params: AddStartEndEventsParams) -> None:
super().__init__(params=params)
@time_performance(
scope="add_start_end_events",
event_name="apply",
)
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
user_col = schema.user_id
type_col = schema.event_type
event_col = schema.event_name
matched_events_start: DataFrame = df.groupby(user_col, as_index=False).first() # type: ignore
matched_events_start[type_col] = "path_start"
matched_events_start[event_col] = "path_start"
matched_events_end: DataFrame = df.groupby(user_col, as_index=False).last() # type: ignore
matched_events_end[type_col] = "path_end"
matched_events_end[event_col] = "path_end"
matched_events = pd.concat([matched_events_start, matched_events_end])
result = pd.concat([df, matched_events])
collect_data_performance(
scope="add_start_end_events",
event_name="metadata",
called_params={},
performance_data={
"parent": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
"child": {
"shape": result.shape,
"hash": hash_dataframe(result),
},
},
)
return result