Source code for retentioneering.data_processors_lib.group_events
from inspect import signature
from typing import Any, Callable, Optional
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ReteFunction
EventstreamFilter = Callable[[pd.DataFrame, Optional[EventstreamSchemaType]], Any]
[docs]class GroupEventsParams(ParamsModel):
"""
A class with parameters for :py:class:`.GroupEvents` class.
"""
event_name: str
func: EventstreamFilter
event_type: Optional[str] = "group_alias"
_widgets = {
"func": ReteFunction(),
}
[docs]@docstrings.get_sections(base="GroupEvents") # type: ignore
class GroupEvents(DataProcessor):
"""
Filter the specified events from the input ``eventstream`` and create
new synthetic events, with names based on the old events' names.
Parameters
----------
event_name : str
Name of the created event.
func : Callable[[DataFrame, Optional[EventstreamSchema]], Any]
Custom function that returns boolean mask with the same length as input eventstream.
- If ``True`` - events will be grouped.
- If ``False`` - events will be remained.
event_type : str, default "group_alias"
Event_type name for the grouped events.
If custom event_type is created, it should be added to the ``DEFAULT_INDEX_ORDER``.
Returns
-------
Eventstream
``Eventstream`` with:
- new synthetic events with ``group_alias`` or custom type`
+-----------------+----------------+-------------------+
| **event_name** | **event_type** | **timestamp** |
+-----------------+----------------+-------------------+
| raw_event_name | raw | raw_event |
+-----------------+----------------+-------------------+
| new_event_name | group_alias | raw_event |
+-----------------+----------------+-------------------+
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: GroupEventsParams
@time_performance(
scope="group_events",
event_name="init",
)
def __init__(self, params: GroupEventsParams) -> None:
super().__init__(params=params)
@time_performance(
scope="group_events",
event_name="apply",
)
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
event_name = self.params.event_name
func: Callable = self.params.func
event_type = self.params.event_type
parent_info = {
"shape": df.shape,
"hash": hash_dataframe(df),
}
expected_args_count = len(signature(func).parameters)
if expected_args_count == 1:
mask = func(df) # type: ignore
else:
mask = func(df, schema)
with pd.option_context("mode.chained_assignment", None):
if event_type is not None:
df.loc[mask, schema.event_type] = event_type
df.loc[mask, schema.event_name] = event_name
collect_data_performance(
scope="group_events",
event_name="metadata",
called_params=self.to_dict()["values"],
performance_data={
"parent": parent_info,
"child": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
},
)
return df