Source code for retentioneering.eventstream.helpers.split_sessions_helper

from __future__ import annotations

from typing import Optional, Tuple

from retentioneering.backend.tracker import track
from retentioneering.constants import DATETIME_UNITS

from ..types import EventstreamType


class SplitSessionsHelperMixin:
[docs] @track( # type: ignore tracking_info={"event_name": "helper"}, scope="split_sessions", event_value="combine", allowed_params=[ "timeout", "session_col", "mark_truncated", ], ) def split_sessions( self, timeout: Tuple[float, DATETIME_UNITS], session_col: str = "session_id", mark_truncated: Optional[bool] = False, ) -> EventstreamType: """ A method of ``Eventstream`` class that creates new synthetic events in each user's path: ``session_start`` (or ``session_start_cropped``) and ``session_end`` (or ``session_end_cropped``). The created events divide users' paths on sessions. Also creates a new column that contains session number for each event in the input eventstream Session number will take the form: ``{user_id}_{session_number through one user path}``. The created events and column are added to the input eventstream. Parameters ---------- See parameters description :py:class:`.SplitSessions` Returns ------- Eventstream Input ``eventstream`` with new synthetic events and ``session_col``. """ # avoid circular import from retentioneering.data_processors_lib import ( SplitSessions, SplitSessionsParams, ) from retentioneering.preprocessing_graph import PreprocessingGraph from retentioneering.preprocessing_graph.nodes import EventsNode p = PreprocessingGraph(source_stream=self) # type: ignore params = dict(timeout=timeout, session_col=session_col, mark_truncated=mark_truncated) node = EventsNode(processor=SplitSessions(params=SplitSessionsParams(**params))) # type: ignore p.add_node(node=node, parents=[p.root]) result = p.combine(node) del p return result