Source code for retentioneering.eventstream.helpers.pipe_helper
from __future__ import annotations
from typing import Callable, Optional
from pandas import DataFrame, Series
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.utils.doc_substitution import docstrings
from ..types import EventstreamSchemaType, EventstreamType
class PipeHelperMixin:
[docs] @docstrings.with_indent(12)
@time_performance( # type: ignore
scope="pipe",
event_name="helper",
event_value="combine",
)
def pipe(
self: EventstreamType, func: Callable[[DataFrame, Optional[EventstreamSchemaType]], DataFrame]
) -> EventstreamType:
"""
Modify an input eventstream in an arbitrary way by applying given function.
The function must accept a DataFrame associated with the input eventstream
and return a new state of the modified eventstream.
Parameters
----------
%(Pipe.parameters)s
Returns
-------
Eventstream
Resulting eventstream
"""
calling_params = {
"func": func,
}
# avoid circular import
from retentioneering.data_processors_lib import Pipe, PipeParams
from retentioneering.preprocessing_graph import PreprocessingGraph
from retentioneering.preprocessing_graph.nodes import EventsNode
p = PreprocessingGraph(source_stream=self) # type: ignore
node = EventsNode(processor=Pipe(params=PipeParams(func=func))) # type: ignore
p.add_node(node=node, parents=[p.root])
result = p.combine(node)
del p
collect_data_performance(
scope="pipe",
event_name="metadata",
called_params=calling_params,
performance_data={},
eventstream_index=self._eventstream_index,
parent_eventstream_index=self._eventstream_index,
child_eventstream_index=result._eventstream_index,
)
return result