Source code for retentioneering.eventstream.helpers.group_events_helper
from __future__ import annotations
from typing import Any, Callable
import pandas as pd
from retentioneering.backend.tracker import (
    collect_data_performance,
    time_performance,
    track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamSchemaType, EventstreamType
EventstreamFilter = Callable[[pd.DataFrame, EventstreamSchemaType], Any]
class GroupEventsHelperMixin:
[docs]    @docstrings.with_indent(12)
    @time_performance(  # type: ignore
        scope="group_events",
        event_name="helper",
        event_value="combine",
    )
    def group_events(
        self: EventstreamType,
        event_name: str,
        func: EventstreamFilter,
        event_type: str | None = "group_alias",
    ) -> EventstreamType:
        """
        A method of ``Eventstream`` class that filters and replaces raw events with new synthetic events,
        having the same ``timestamp`` and ``user_id``, but new ``event_name``.
        Parameters
        ----------
            %(GroupEvents.parameters)s
        Returns
        -------
        Eventstream
             Input ``eventstream`` with replaced events.
        """
        calling_params = {
            "event_name": event_name,
            "func": func,
            "event_type": event_type,
        }
        # avoid circular import
        from retentioneering.data_processors_lib import GroupEvents, GroupEventsParams
        from retentioneering.preprocessing_graph import PreprocessingGraph
        from retentioneering.preprocessing_graph.nodes import EventsNode
        p = PreprocessingGraph(source_stream=self)  # type: ignore
        node = EventsNode(
            processor=GroupEvents(
                params=GroupEventsParams(event_name=event_name, func=func, event_type=event_type)  # type: ignore
            )
        )
        p.add_node(node=node, parents=[p.root])
        result = p.combine(node)
        del p
        collect_data_performance(
            scope="group_events",
            event_name="metadata",
            called_params=calling_params,
            performance_data={},
            eventstream_index=self._eventstream_index,
            parent_eventstream_index=self._eventstream_index,
            child_eventstream_index=result._eventstream_index,
        )
        return result