Source code for retentioneering.eventstream.helpers.collapse_loops_helper
from __future__ import annotations
from typing import Literal, Union
from retentioneering.backend.tracker import (
    collect_data_performance,
    time_performance,
    track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamType
class CollapseLoopsHelperMixin:
[docs]    @docstrings.with_indent(12)
    @time_performance(
        scope="collapse_loops",
        event_name="helper",
        event_value="combine",
    )
    def collapse_loops(
        self: EventstreamType,
        suffix: Union[Literal["loop", "count"], None] = None,
        time_agg: Literal["max", "min", "mean"] = "min",
    ) -> EventstreamType:
        """
        A method of ``Eventstream`` class that finds ``loops`` and creates new synthetic events
        in paths of all users having such sequences.
        A ``loop`` - is a sequence of repetitive events.
        For example *"event1 -> event1"*
        Parameters
        ----------
            %(CollapseLoops.parameters)s
        Returns
        -------
        Eventstream
             Input ``eventstream`` with ``loops`` replaced by new synthetic events.
        """
        calling_params = {
            "suffix": suffix,
            "time_agg": time_agg,
        }
        not_hash_values = ["suffix", "time_agg"]
        # avoid circular import
        from retentioneering.data_processors_lib import (
            CollapseLoops,
            CollapseLoopsParams,
        )
        from retentioneering.preprocessing_graph import PreprocessingGraph
        from retentioneering.preprocessing_graph.nodes import EventsNode
        p = PreprocessingGraph(source_stream=self)  # type: ignore
        node = EventsNode(
            processor=CollapseLoops(params=CollapseLoopsParams(suffix=suffix, time_agg=time_agg))  # type: ignore
        )
        p.add_node(node=node, parents=[p.root])
        result = p.combine(node)
        del p
        collect_data_performance(
            scope="collapse_loops",
            event_name="metadata",
            called_params=calling_params,
            not_hash_values=not_hash_values,
            performance_data={},
            eventstream_index=self._eventstream_index,
            parent_eventstream_index=self._eventstream_index,
            child_eventstream_index=result._eventstream_index,
        )
        return result