Source code for retentioneering.data_processors_lib.group_events
from typing import Any, Callable, Optional
import pandas as pd
from retentioneering.backend.tracker import track
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType, EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.widget.widgets import ReteFunction
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchemaType], Any]
[docs]class GroupEventsParams(ParamsModel):
"""
A class with parameters for :py:class:`.GroupEvents` class.
"""
event_name: str
func: EventstreamFilter
event_type: Optional[str] = "group_alias"
_widgets = {
"func": ReteFunction(),
}
[docs]class GroupEvents(DataProcessor):
"""
Filter the specified events from the input ``eventstream`` and create
new synthetic events, with names based on the old events' names.
Parameters
----------
event_name : str
Name of the created event.
func : Callable[[DataFrame, EventstreamSchema], Any]
Custom function that returns boolean mask with the same length as input eventstream.
- If ``True`` - events will be grouped.
- If ``False`` - events will be remained.
event_type : str, default "group_alias"
Event_type name for the grouped events.
If custom event_type is created, it should be added to the ``DEFAULT_INDEX_ORDER``.
Returns
-------
Eventstream
``Eventstream`` with:
- new synthetic events with ``group_alias`` or custom type
- raw events marked ``_deleted=True``
+-----------------+----------------+-------------------+----------------+
| **event_name** | **event_type** | **timestamp** | **_deleted** |
+-----------------+----------------+-------------------+----------------+
| raw_event_name | raw | raw_event | True |
+-----------------+----------------+-------------------+----------------+
| new_event_name | group_alias | raw_event | False |
+-----------------+----------------+-------------------+----------------+
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: GroupEventsParams
@track( # type: ignore
tracking_info={"event_name": "init"},
scope="group_events",
allowed_params=[],
)
def __init__(self, params: GroupEventsParams) -> None:
super().__init__(params=params)
@track( # type: ignore
tracking_info={"event_name": "apply"},
scope="group_events",
allowed_params=[],
)
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from retentioneering.eventstream.eventstream import Eventstream
event_name = self.params.event_name
func: Callable = self.params.func
event_type = self.params.event_type
events = eventstream.to_dataframe()
mask = func(events, eventstream.schema)
matched_events = events[mask]
with pd.option_context("mode.chained_assignment", None):
if event_type is not None:
matched_events[eventstream.schema.event_type] = event_type
matched_events[eventstream.schema.event_name] = event_name
matched_events["ref"] = matched_events[eventstream.schema.event_id]
return Eventstream(
raw_data_schema=eventstream.schema.to_raw_data_schema(),
raw_data=matched_events,
relations=[{"raw_col": "ref", "eventstream": eventstream}],
)