Source code for retentioneering.eventstream.helpers.collapse_loops_helper
from __future__ import annotations
from typing import Literal, Union
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamType
class CollapseLoopsHelperMixin:
[docs] @docstrings.with_indent(12)
@time_performance(
scope="collapse_loops",
event_name="helper",
event_value="combine",
)
def collapse_loops(
self: EventstreamType,
suffix: Union[Literal["loop", "count"], None] = None,
time_agg: Literal["max", "min", "mean"] = "min",
) -> EventstreamType:
"""
A method of ``Eventstream`` class that finds ``loops`` and creates new synthetic events
in paths of all users having such sequences.
A ``loop`` - is a sequence of repetitive events.
For example *"event1 -> event1"*
Parameters
----------
%(CollapseLoops.parameters)s
Returns
-------
Eventstream
Input ``eventstream`` with ``loops`` replaced by new synthetic events.
"""
calling_params = {
"suffix": suffix,
"time_agg": time_agg,
}
not_hash_values = ["suffix", "time_agg"]
# avoid circular import
from retentioneering.data_processors_lib import (
CollapseLoops,
CollapseLoopsParams,
)
from retentioneering.preprocessing_graph import PreprocessingGraph
from retentioneering.preprocessing_graph.nodes import EventsNode
p = PreprocessingGraph(source_stream=self) # type: ignore
node = EventsNode(
processor=CollapseLoops(params=CollapseLoopsParams(suffix=suffix, time_agg=time_agg)) # type: ignore
)
p.add_node(node=node, parents=[p.root])
result = p.combine(node)
del p
collect_data_performance(
scope="collapse_loops",
event_name="metadata",
called_params=calling_params,
not_hash_values=not_hash_values,
performance_data={},
eventstream_index=self._eventstream_index,
parent_eventstream_index=self._eventstream_index,
child_eventstream_index=result._eventstream_index,
)
return result