Source code for retentioneering.data_processors_lib.materialize_segment
from __future__ import annotations
import numpy as np
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.segments import (
SEGMENT_DELIMITER,
SEGMENT_TYPE,
_calculate_segment_col,
_create_segment_event,
_get_segment_mask,
)
from retentioneering.eventstream.types import (
AddSegmentType,
EventstreamSchemaType,
EventstreamType,
SeriesWithValidator,
)
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
class MaterializeSegmentParams(ParamsModel):
name: str
[docs]@docstrings.get_sections(base="MaterializeSegment") # type: ignore
class MaterializeSegment(DataProcessor):
"""
Propagate segment synthetic events to a column.
Parameters
----------
name : str
Name of a segment to convert
Returns
-------
Eventstream
Eventstream with added column.
"""
params: MaterializeSegmentParams
@time_performance(scope="materialize_segment", event_name="init")
def __init__(self, params: MaterializeSegmentParams) -> None:
super().__init__(params=params)
@time_performance(scope="materialize_segment", event_name="apply")
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
name = self.params.name
hash_before = hash_dataframe(df)
shape_before = df.shape
segment_col = _calculate_segment_col(df, schema, name)
df[name] = segment_col
collect_data_performance(
scope="rename_segment",
event_name="metadata",
called_params=self.to_dict()["values"],
performance_data={
"parent": {
"shape": shape_before,
"hash": hash_before,
},
"child": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
},
)
return df