Source code for retentioneering.eventstream.helpers.split_sessions_helper

from __future__ import annotations

from typing import List, Optional, Tuple

from retentioneering.backend.tracker import (
    collect_data_performance,
    time_performance,
    track,
)
from retentioneering.common.constants import DATETIME_UNITS
from retentioneering.utils.doc_substitution import docstrings

from ..types import EventstreamType


class SplitSessionsHelperMixin:
[docs] @docstrings.with_indent(12) @time_performance( scope="split_sessions", event_name="helper", event_value="combine", ) def split_sessions( self: EventstreamType, timeout: Optional[Tuple[float, DATETIME_UNITS]] = None, delimiter_events: Optional[List[str]] = None, delimiter_col: Optional[str] = None, session_col: str = "session_id", mark_truncated: Optional[bool] = False, ) -> EventstreamType: """ A method of ``Eventstream`` class that creates new synthetic events in each user's path: ``session_start`` (or ``session_start_cropped``) and ``session_end`` (or ``session_end_cropped``). The created events divide users' paths on sessions. Also creates a new column that contains session number for each event in the input eventstream Session number will take the form: ``{user_id}_{session_number through one user path}``. The created events and column are added to the input eventstream. Parameters ---------- %(SplitSessions.parameters)s Returns ------- Eventstream Input ``eventstream`` with new synthetic events and ``session_col``. """ calling_params = { "timeout": timeout, "session_col": session_col, "mark_truncated": mark_truncated, "delimiter_events": delimiter_events, "delimiter_col": delimiter_col, } not_hash_values = ["timeout"] # avoid circular import from retentioneering.data_processors_lib import ( SplitSessions, SplitSessionsParams, ) from retentioneering.preprocessing_graph import PreprocessingGraph from retentioneering.preprocessing_graph.nodes import EventsNode p = PreprocessingGraph(source_stream=self) # type: ignore params = dict( timeout=timeout, delimiter_events=delimiter_events, delimiter_col=delimiter_col, session_col=session_col, mark_truncated=mark_truncated, ) node = EventsNode(processor=SplitSessions(params=SplitSessionsParams(**params))) # type: ignore p.add_node(node=node, parents=[p.root]) result = p.combine(node) del p collect_data_performance( scope="split_sessions", event_name="metadata", called_params=calling_params, not_hash_values=not_hash_values, performance_data={}, eventstream_index=self._eventstream_index, parent_eventstream_index=self._eventstream_index, child_eventstream_index=result._eventstream_index, ) return result