Source code for retentioneering.data_processors_lib.remap_segment

from __future__ import annotations

from typing import Any

import numpy as np
import pandas as pd

from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.segments import (
    SEGMENT_DELIMITER,
    SEGMENT_TYPE,
    _create_segment_event,
    _extract_segment_values,
    _get_segment_mask,
)
from retentioneering.eventstream.types import (
    AddSegmentType,
    EventstreamSchemaType,
    EventstreamType,
)
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfString, ReteFunction

ADD_SEGMENT_TIMEDELTA = pd.Timedelta(1, "microseconds")


class RemapSegmentParams(ParamsModel):
    name: str
    mapping: dict


[docs]@docstrings.get_sections(base="RemapSegment") # type: ignore class RemapSegment(DataProcessor): """ Remap segment values for synthetic eventstream events. Parameters ---------- name : str A segment name to remap. mapping : str A pandas.Series containing mapping rules. The series index relates to old segment values, and the values relate to new segment values. Returns ------- EventstreamType Eventstream with remapped segment. """ params: RemapSegmentParams @time_performance(scope="remap_segment", event_name="init") def __init__(self, params: RemapSegmentParams) -> None: super().__init__(params=params) @time_performance(scope="remap_segment", event_name="apply") def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame: def mapper(value: Any) -> Any: return value if not mapping.get(value) else mapping.get(value) name = self.params.name mapping = self.params.mapping hash_before = hash_dataframe(df) shape_before = df.shape mask = _get_segment_mask(df, schema, name) values = _extract_segment_values(df.loc[mask, schema.event_name]) df.loc[mask, schema.event_name] = _create_segment_event(values.map(mapper), name) collect_data_performance( scope="remap_segment", event_name="metadata", called_params=self.to_dict()["values"], performance_data={ "parent": { "shape": shape_before, "hash": hash_before, }, "child": { "shape": df.shape, "hash": hash_dataframe(df), }, }, ) return df