Source code for retentioneering.data_processors_lib.add_negative_events
from__future__importannotationsfromtypingimportAny,Callable,Listimportpandasaspdfromretentioneering.backend.trackerimportcollect_data_performance,time_performancefromretentioneering.data_processorimportDataProcessorfromretentioneering.eventstream.schemaimportEventstreamSchemafromretentioneering.eventstream.typesimportEventstreamSchemaType,EventstreamTypefromretentioneering.params_modelimportParamsModelfromretentioneering.utils.doc_substitutionimportdocstringsfromretentioneering.utils.hash_objectimporthash_dataframefromretentioneering.widget.widgetsimportListOfString,ReteFunctionEventstreamFilter=Callable[[pd.DataFrame,EventstreamSchema],Any]def_default_func(eventstream:EventstreamType,targets:List[str])->pd.DataFrame:""" Filter rows with target events from the input eventstream. Parameters ---------- eventstream : Eventstream Source eventstream or output from previous nodes. targets : list of str Each event from that list is associated with the bad result (scenario) of user's behaviour (experience) in the product. If there are several target events in user path - the event with minimum timestamp is taken. Returns ------- pd.DataFrame Filtered DataFrame with targets and its timestamps. """user_col=eventstream.schema.user_idtime_col=eventstream.schema.event_timestampevent_col=eventstream.schema.event_namedf=eventstream.to_dataframe()targets_index=df[df[event_col].isin(targets)].groupby(user_col)[time_col].idxmin()# type: ignorereturndf.loc[targets_index]# type: ignore
[docs]classAddNegativeEventsParams(ParamsModel):""" A class with parameters for :py:class:`.AddNegativeEvents` class. """targets:List[str]# @TODO: remove eventstream from the "func" signature in a future major release. Aleksei Avramenkofunc:Callable=_default_func_widgets={"func":ReteFunction(),"targets":ListOfString()}
[docs]@docstrings.get_sections(base="AddNegativeEvents")# type: ignoreclassAddNegativeEvents(DataProcessor):""" Create new synthetic events in paths of all users having the specified event(s): ``negative_target_RAW_EVENT_NAME``. Parameters ---------- targets : list of str Define the list of events that we consider negative. If there are several target events in the user path, the event with the minimum timestamp is taken. func : Callable, default _default_func_negative Filter rows with target events from the input eventstream. Returns ------- Eventstream ``Eventstream`` with new synthetic events only added to the users who fit the conditions. +--------------------------------+-----------------+-----------------------------+ | **event_name** | **event_type** | **timestamp** | +--------------------------------+-----------------+-----------------------------+ | negative_target_RAW_EVENT_NAME | negative_target | min(targets) | +--------------------------------+-----------------+-----------------------------+ Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """params:AddNegativeEventsParams@time_performance(scope="add_negative_events",event_name="init",)def__init__(self,params:AddNegativeEventsParams):super().__init__(params=params)@time_performance(scope="add_negative_events",event_name="apply",)defapply(self,df:pd.DataFrame,schema:EventstreamSchemaType)->pd.DataFrame:fromretentioneering.eventstream.eventstreamimportEventstreamtype_col=schema.event_typeevent_col=schema.event_namefunc=self.params.functargets=self.params.targetseventstream=Eventstream(raw_data_schema=schema.to_raw_data_schema(event_index=True),raw_data=df,add_start_end_events=False,)# @TODO: remove eventstream from the "func" signature in a future major release. Aleksei Avramenkonegative_targets:pd.DataFrame=func(eventstream,targets)negative_targets[type_col]="negative_target"negative_targets[event_col]="negative_target_"+negative_targets[event_col]result=pd.concat([eventstream.to_dataframe(),negative_targets])collect_data_performance(scope="add_negative_events",event_name="metadata",called_params=self.to_dict()["values"],performance_data={"parent":{"shape":df.shape,"hash":hash_dataframe(df),},"child":{"shape":result.shape,"hash":hash_dataframe(result),},},)returnresult