Source code for retentioneering.eventstream.helpers.truncate_paths_helper
from __future__ import annotations
from typing import Literal, Optional
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamType
class TruncatePathsHelperMixin:
[docs] @docstrings.with_indent(12)
@time_performance( # type: ignore
scope="truncate_paths",
event_name="helper",
event_value="combine",
)
def truncate_paths(
self: EventstreamType,
drop_before: Optional[str] = None,
drop_after: Optional[str] = None,
occurrence_before: Literal["first", "last"] = "first",
occurrence_after: Literal["first", "last"] = "first",
shift_before: int = 0,
shift_after: int = 0,
) -> EventstreamType:
"""
A method of ``Eventstream`` class that truncates each user's path based on the
specified event(s) and selected parameters.
Parameters
----------
%(TruncatePath.parameters)s
Returns
-------
Eventstream
Input ``eventstream`` with truncated paths.
"""
# avoid circular import
from retentioneering.data_processors_lib import (
TruncatePaths,
TruncatePathsParams,
)
from retentioneering.preprocessing_graph import PreprocessingGraph
from retentioneering.preprocessing_graph.nodes import EventsNode
p = PreprocessingGraph(source_stream=self) # type: ignore
params = {
"drop_before": drop_before,
"drop_after": drop_after,
"occurrence_before": occurrence_before,
"occurrence_after": occurrence_after,
"shift_before": shift_before,
"shift_after": shift_after,
}
not_hash_values = ["occurrence_before", "occurrence_after"]
node = EventsNode(processor=TruncatePaths(params=TruncatePathsParams(**params))) # type: ignore
p.add_node(node=node, parents=[p.root])
result = p.combine(node)
del p
collect_data_performance(
scope="truncate_paths",
event_name="metadata",
called_params=params,
not_hash_values=not_hash_values,
performance_data={},
eventstream_index=self._eventstream_index,
parent_eventstream_index=self._eventstream_index,
child_eventstream_index=result._eventstream_index,
)
return result