Source code for retentioneering.data_processors_lib.pipe
from inspect import signature
from typing import Callable, Optional
import pandas as pd
from pandas import DataFrame, Series
from retentioneering.backend.tracker import (
collect_data_performance,
time_performance,
track,
)
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.schema import EventstreamSchema
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ReteFunction
class PipeParams(ParamsModel):
func: Callable[[DataFrame, Optional[EventstreamSchema]], DataFrame]
_widgets = {
"func": ReteFunction(),
}
[docs]@docstrings.get_sections(base="Pipe") # type: ignore
class Pipe(DataProcessor):
"""
Modify an input eventstream in an arbitrary way by applying given function.
The function must accept a DataFrame associated with the input eventstream
and return a new state of the modified eventstream.
Parameters
----------
func : Callable[[DataFrame], DataFrame]
A function that is applied to the DataFrame underlying the eventstream.
Must accept DataFrame as input and return DataFrame as output
Returns
-------
Eventstream
Resulting eventstream
"""
params: PipeParams
@time_performance(
scope="pipe",
event_name="init",
)
def __init__(self, params: PipeParams):
super().__init__(params=params)
@time_performance(
scope="pipe",
event_name="apply",
)
def apply(self, df: DataFrame, schema: EventstreamSchemaType) -> DataFrame:
func: Callable[[DataFrame, Optional[EventstreamSchemaType]], DataFrame] = self.params.func # type: ignore
expected_args_count = len(signature(func).parameters)
if expected_args_count == 1:
result = func(df) # type: ignore
else:
result = func(df, schema)
collect_data_performance(
scope="pipe",
event_name="metadata",
called_params=self.to_dict()["values"],
performance_data={
"parent": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
"child": {
"shape": result.shape,
"hash": hash_dataframe(result),
},
},
)
return result