Source code for retentioneering.data_processors_lib.add_start_end_events

from __future__ import annotations

import pandas as pd
from pandas import DataFrame

from retentioneering.backend.tracker import track
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamType
from retentioneering.params_model import ParamsModel


class AddStartEndEventsParams(ParamsModel):
    pass


[docs]class AddStartEndEvents(DataProcessor): """ Create two synthetic events in each user's path: ``path_start`` and ``path_end``. Returns ------- Eventstream ``Eventstream`` with new synthetic events only - two for each user: +----------------+----------------+----------------+ | **event_name** | **event_type** | **timestamp** | +----------------+----------------+----------------+ | path_start | path_start | first_event | +----------------+----------------+----------------+ | path_end | path_end | last_event | +----------------+----------------+----------------+ Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: AddStartEndEventsParams @track(tracking_info={"event_name": "init"}, scope="add_start_end_events", allowed_params=[]) # type: ignore def __init__(self, params: AddStartEndEventsParams) -> None: super().__init__(params=params) @track( # type: ignore tracking_info={"event_name": "apply"}, scope="add_start_end_events", allowed_params=[], ) def apply(self, eventstream: EventstreamType) -> EventstreamType: from retentioneering.eventstream.eventstream import Eventstream events: DataFrame = eventstream.to_dataframe(copy=True) user_col = eventstream.schema.user_id type_col = eventstream.schema.event_type event_col = eventstream.schema.event_name matched_events_start: DataFrame = events.groupby(user_col, as_index=False).first() # type: ignore matched_events_start[type_col] = "path_start" matched_events_start[event_col] = "path_start" matched_events_start["ref"] = None matched_events_end: DataFrame = events.groupby(user_col, as_index=False).last() # type: ignore matched_events_end[type_col] = "path_end" matched_events_end[event_col] = "path_end" matched_events_end["ref"] = None matched_events = pd.concat([matched_events_start, matched_events_end]) eventstream = Eventstream( raw_data_schema=eventstream.schema.to_raw_data_schema(), raw_data=matched_events, relations=[{"raw_col": "ref", "eventstream": eventstream}], ) return eventstream