Source code for retentioneering.eventstream.helpers.group_events_helper
from __future__ import annotations
from typing import Any, Callable
import pandas as pd
from retentioneering.backend.tracker import track
from ..types import EventstreamSchemaType, EventstreamType
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchemaType], Any]
class GroupEventsHelperMixin:
[docs] @track( # type: ignore
tracking_info={"event_name": "helper"},
scope="group_events",
event_value="combine",
allowed_params=[
"event_name",
"func",
"event_type",
],
)
def group_events(
self,
event_name: str,
func: EventstreamFilter,
event_type: str | None = "group_alias",
) -> EventstreamType:
"""
A method of ``Eventstream`` class that filters and replaces raw events with new synthetic events,
having the same ``timestamp`` and ``user_id``, but new ``event_name``.
Parameters
----------
See parameters description
:py:class:`.GroupEvents`
Returns
-------
Eventstream
Input ``eventstream`` with replaced events.
"""
# avoid circular import
from retentioneering.data_processors_lib import GroupEvents, GroupEventsParams
from retentioneering.preprocessing_graph import PreprocessingGraph
from retentioneering.preprocessing_graph.nodes import EventsNode
p = PreprocessingGraph(source_stream=self) # type: ignore
node = EventsNode(
processor=GroupEvents(
params=GroupEventsParams(event_name=event_name, func=func, event_type=event_type) # type: ignore
)
)
p.add_node(node=node, parents=[p.root])
result = p.combine(node)
del p
return result