Source code for retentioneering.eventstream.helpers.group_events_helper
from __future__ import annotations
from typing import Any, Callable
import pandas as pd
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamSchemaType, EventstreamType
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchemaType], Any]
class GroupEventsHelperMixin:
[docs] @docstrings.with_indent(12)
@time_performance( # type: ignore
scope="group_events",
event_name="helper",
event_value="combine",
)
def group_events(
self: EventstreamType,
event_name: str,
func: EventstreamFilter,
event_type: str | None = "group_alias",
) -> EventstreamType:
"""
A method of ``Eventstream`` class that filters and replaces raw events with new synthetic events,
having the same ``timestamp`` and ``user_id``, but new ``event_name``.
Parameters
----------
%(GroupEvents.parameters)s
Returns
-------
Eventstream
Input ``eventstream`` with replaced events.
"""
calling_params = {
"event_name": event_name,
"func": func,
"event_type": event_type,
}
# avoid circular import
from retentioneering.data_processors_lib import GroupEvents, GroupEventsParams
from retentioneering.preprocessing_graph import PreprocessingGraph
from retentioneering.preprocessing_graph.nodes import EventsNode
p = PreprocessingGraph(source_stream=self) # type: ignore
node = EventsNode(
processor=GroupEvents(
params=GroupEventsParams(event_name=event_name, func=func, event_type=event_type) # type: ignore
)
)
p.add_node(node=node, parents=[p.root])
result = p.combine(node)
del p
collect_data_performance(
scope="group_events",
event_name="metadata",
called_params=calling_params,
performance_data={},
eventstream_index=self._eventstream_index,
parent_eventstream_index=self._eventstream_index,
child_eventstream_index=result._eventstream_index,
)
return result