Source code for retentioneering.eventstream.helpers.truncate_paths_helper
from __future__ import annotations
from typing import Literal, Optional
from retentioneering.backend.tracker import (
    collect_data_performance,
    time_performance,
    track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamType
class TruncatePathsHelperMixin:
[docs]    @docstrings.with_indent(12)
    @time_performance(  # type: ignore
        scope="truncate_paths",
        event_name="helper",
        event_value="combine",
    )
    def truncate_paths(
        self: EventstreamType,
        drop_before: Optional[str] = None,
        drop_after: Optional[str] = None,
        occurrence_before: Literal["first", "last"] = "first",
        occurrence_after: Literal["first", "last"] = "first",
        shift_before: int = 0,
        shift_after: int = 0,
        ignore_before: bool = False,
        ignore_after: bool = False,
        keep_synthetic: bool = False,
    ) -> EventstreamType:
        """
        A method of ``Eventstream`` class that truncates each user's path based on the
        specified event(s) and selected parameters.
        Parameters
        ----------
            %(TruncatePath.parameters)s
        Returns
        -------
        Eventstream
             Input ``eventstream`` with truncated paths.
        """
        # avoid circular import
        from retentioneering.data_processors_lib import (
            TruncatePaths,
            TruncatePathsParams,
        )
        from retentioneering.preprocessing_graph import PreprocessingGraph
        from retentioneering.preprocessing_graph.nodes import EventsNode
        p = PreprocessingGraph(source_stream=self)  # type: ignore
        params = {
            "drop_before": drop_before,
            "drop_after": drop_after,
            "occurrence_before": occurrence_before,
            "occurrence_after": occurrence_after,
            "shift_before": shift_before,
            "shift_after": shift_after,
            "ignore_before": ignore_before,
            "ignore_after": ignore_after,
            "keep_synthetic": keep_synthetic,
        }
        not_hash_values = ["occurrence_before", "occurrence_after"]
        node = EventsNode(processor=TruncatePaths(params=TruncatePathsParams(**params)))  # type: ignore
        p.add_node(node=node, parents=[p.root])
        result = p.combine(node)
        del p
        collect_data_performance(
            scope="truncate_paths",
            event_name="metadata",
            called_params=params,
            not_hash_values=not_hash_values,
            performance_data={},
            eventstream_index=self._eventstream_index,
            parent_eventstream_index=self._eventstream_index,
            child_eventstream_index=result._eventstream_index,
        )
        return result