Source code for retentioneering.data_processors_lib.drop_segment
from __future__ import annotations
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.segments import _get_segment_mask
from retentioneering.eventstream.types import EventstreamSchemaType, EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
class DropSegmentParams(ParamsModel):
name: str
[docs]@docstrings.get_sections(base="DropSegment") # type: ignore
class DropSegment(DataProcessor):
"""
Remove segment synthetic events from eventstream.
Parameters
----------
name : str
Segment name to remove.
Returns
-------
EventstreamType
Eventstream with removed segment.
"""
params: DropSegmentParams
@time_performance(scope="drop_segment", event_name="init")
def __init__(self, params: DropSegmentParams) -> None:
super().__init__(params=params)
@time_performance(scope="drop_segment", event_name="apply")
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
hash_before = hash_dataframe(df)
shape_before = df.shape
mask = _get_segment_mask(df, schema, self.params.name)
df.drop(df[mask].index, inplace=True)
collect_data_performance(
scope="drop_segment",
event_name="metadata",
called_params=self.to_dict()["values"],
performance_data={
"parent": {
"shape": shape_before,
"hash": hash_before,
},
"child": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
},
)
return df