Source code for retentioneering.data_processors_lib.pipe
from inspect import signature
from typing import Callable, Optional
import pandas as pd
from pandas import DataFrame, Series
from retentioneering.backend.tracker import (
    collect_data_performance,
    time_performance,
    track,
)
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.schema import EventstreamSchema
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ReteFunction
class PipeParams(ParamsModel):
    func: Callable[[DataFrame, Optional[EventstreamSchema]], DataFrame]
    _widgets = {
        "func": ReteFunction(),
    }
[docs]@docstrings.get_sections(base="Pipe")  # type: ignore
class Pipe(DataProcessor):
    """
    Modify an input eventstream in an arbitrary way by applying given function.
    The function must accept a DataFrame associated with the input eventstream
    and return a new state of the modified eventstream.
    Parameters
    ----------
    func : Callable[[DataFrame], DataFrame]
        A function that is applied to the DataFrame underlying the eventstream.
        Must accept DataFrame as input and return DataFrame as output
    Returns
    -------
    Eventstream
        Resulting eventstream
    """
    params: PipeParams
    @time_performance(
        scope="pipe",
        event_name="init",
    )
    def __init__(self, params: PipeParams):
        super().__init__(params=params)
    @time_performance(
        scope="pipe",
        event_name="apply",
    )
    def apply(self, df: DataFrame, schema: EventstreamSchemaType) -> DataFrame:
        func: Callable[[DataFrame, Optional[EventstreamSchemaType]], DataFrame] = self.params.func  # type: ignore
        expected_args_count = len(signature(func).parameters)
        if expected_args_count == 1:
            result = func(df)  # type: ignore
        else:
            result = func(df, schema)
        collect_data_performance(
            scope="pipe",
            event_name="metadata",
            called_params=self.to_dict()["values"],
            performance_data={
                "parent": {
                    "shape": df.shape,
                    "hash": hash_dataframe(df),
                },
                "child": {
                    "shape": result.shape,
                    "hash": hash_dataframe(result),
                },
            },
        )
        return result