Source code for retentioneering.tooling.step_matrix.step_matrix
from__future__importannotationsimportitertoolsfromcopyimportdeepcopyfromdataclassesimportdataclassfromtypingimportAny,Literalimportmatplotlibimportpandasaspdimportseabornassnsfromretentioneering.backend.trackerimport(collect_data_performance,time_performance,track,tracker,)fromretentioneering.eventstream.segmentsimport_split_segmentfromretentioneering.eventstream.typesimport(EventstreamType,SplitExpr,UserGroupsNamesType,UserGroupsType,UserListType,)fromretentioneering.tooling.mixins.ended_eventsimportEndedEventsMixin@dataclassclassCenteredParams:event:strleft_gap:intoccurrence:intdef__post_init__(self)->None:ifself.occurrence<1:raiseValueError("Occurrence in 'centered' dictionary must be >=1")ifself.left_gap<1:raiseValueError("left_gap in 'centered' dictionary must be >=1")
[docs]classStepMatrix(EndedEventsMixin):""" Step matrix is a matrix where its ``(i, j)`` element shows the frequency of event ``i`` occurring as ``j``-th step in user trajectories. This class provides methods for step matrix calculation and visualization. Parameters ---------- eventstream : EventstreamType See Also -------- .Eventstream.step_matrix : Call StepMatrix tool as an eventstream method. .StepSankey : A class for the visualization of user paths in stepwise manner using Sankey diagram. .CollapseLoops : Find loops and create new synthetic events in the paths of all users having such sequences. Notes ----- See :doc:`StepMatrix user guide</user_guides/step_matrix>` for the details. """__eventstream:EventstreamTypeENDED_EVENT="ENDED"max_steps:intweight_col:strprecision:inttargets:list[str]|str|None=Noneaccumulated:Literal["both","only"]|None=Nonesorting:list|None=Nonethreshold:floatcentered:CenteredParams|None=Nonegroups:UserGroupsType|None=Nonegroup_names:UserGroupsNamesType|None=None_result_data:pd.DataFrame_result_targets:pd.DataFrame|None_title:str|None_targets_list:list[list[str]]|None@time_performance(scope="step_matrix",event_name="init",)def__init__(self,eventstream:EventstreamType,)->None:super().__init__()self.__eventstream=eventstreamself.user_col=self.__eventstream.schema.user_idself.event_col=self.__eventstream.schema.event_nameself.time_col=self.__eventstream.schema.event_timestampself.event_index_col=self.__eventstream.schema.event_indexself._result_data=pd.DataFrame()self._result_targets=Noneself._title=Noneself._targets_list=Nonedef_pad_to_center(self,df_:pd.DataFrame)->pd.DataFrame|None:ifself.centeredisNone:returnNonecenter_event=self.centered.eventoccurrence=self.centered.occurrencewindow=self.centered.left_gapposition=df_.loc[(df_[self.event_col]==center_event)&(df_["occurrence_counter"]==occurrence)]["event_rank"].min()shift=position-window-1df_["event_rank"]=df_["event_rank"]-shiftreturndf_@staticmethoddef_align_index(df1:pd.DataFrame,df2:pd.DataFrame)->tuple[pd.DataFrame,pd.DataFrame]:df1=df1.align(df2)[0].fillna(0)# type: ignoredf2=df2.align(df1)[0].fillna(0)# type: ignorereturndf1,df2# type: ignoredef_pad_cols(self,df:pd.DataFrame)->pd.DataFrame:""" Parameters ---------- df : pd.Dataframe Returns ------- pd.Dataframe With columns from 0 to ``max_steps``. """df=df.copy()ifmax(df.columns)<self.max_steps:# type: ignoreforcolinrange(max(df.columns)+1,self.max_steps+1):# type: ignoredf[col]=0# add missing cols if needed:ifmin(df.columns)>1:# type: ignoreforcolinrange(1,min(df.columns)):# type: ignoredf[col]=0# sort colsreturndf[list(range(1,self.max_steps+1))]def_step_matrix_values(self,data:pd.DataFrame)->tuple[pd.DataFrame,pd.DataFrame|None,str,list[list[str]]|None]:data=data.copy()# ALIGN DATA IF CENTRALifself.centeredisnotNone:data,fraction_title=self._center_matrix(data)else:fraction_title=""# calculate step matrix elements:piv=self._generate_step_matrix(data)# ADD ROWS FOR TARGETS:ifself.targets:piv_targets,targets=self._process_targets(data)else:targets,piv_targets=None,Nonereturnpiv,piv_targets,fraction_title,targetsdef_generate_step_matrix(self,data:pd.DataFrame)->pd.DataFrame:agg=data.groupby(["event_rank",self.event_col])[self.weight_col].nunique().reset_index()agg[self.weight_col]/=data[self.weight_col].nunique()agg=agg[agg["event_rank"]<=self.max_steps]agg.columns=["event_rank","event_name","freq"]# type: ignorepiv=agg.pivot(index="event_name",columns="event_rank",values="freq").fillna(0)# add missing cols if number of events < max_steps:piv=self._pad_cols(piv)piv.columns.name=Nonepiv.index.name=None# MAKE TERMINATED STATE ACCUMULATED:ifself.ENDED_EVENTinpiv.index:piv.loc[self.ENDED_EVENT]=piv.loc[self.ENDED_EVENT].cumsum().fillna(0)returnpivdef_process_targets(self,data:pd.DataFrame)->tuple[pd.DataFrame|None,list[list[str]]|None]:ifself.targetsisNone:returnNone,None# format targets to list of lists. E.g. [['a', 'b'], 'c'] -> [['a', 'b'], ['c']]targets=[]ifisinstance(self.targets,list):fortinself.targets:ifisinstance(t,list):targets.append(t)else:targets.append([t])else:targets.append([self.targets])# obtain flatten list of targets. E.g. [['a', 'b'], 'c'] -> ['a', 'b', 'c']targets_flatten=list(itertools.chain(*targets))agg_targets=data.groupby(["event_rank",self.event_col])[self.time_col].count().reset_index()agg_targets[self.time_col]/=data[self.weight_col].nunique()agg_targets.columns=["event_rank","event_name","freq"]# type: ignoreagg_targets=agg_targets[agg_targets["event_rank"]<=self.max_steps]piv_targets=agg_targets.pivot(index="event_name",columns="event_rank",values="freq").fillna(0)piv_targets=self._pad_cols(piv_targets)# if target is not present in dataset add zeros:foriintargets_flatten:ifinotinpiv_targets.index:piv_targets.loc[i]=0piv_targets=piv_targets.loc[targets_flatten].copy()piv_targets.columns.name=Nonepiv_targets.index.name=NoneACC_INDEX="ACC_"ifself.accumulated=="only":piv_targets.index=map(lambdax:ACC_INDEX+x,piv_targets.index)# type: ignoreforiinpiv_targets.index:piv_targets.loc[i]=piv_targets.loc[i].cumsum().fillna(0)# change names is targets list:fortargetintargets:forj,iteminenumerate(target):target[j]=ACC_INDEX+itemifself.accumulated=="both":foriinpiv_targets.index:piv_targets.loc[ACC_INDEX+i]=piv_targets.loc[i].cumsum().fillna(0)# type: ignore# add accumulated targets to the list:targets_not_acc=deepcopy(targets)fortargetintargets:forj,iteminenumerate(target):target[j]=ACC_INDEX+itemtargets=targets_not_acc+targetsreturnpiv_targets,targetsdef_center_matrix(self,data:pd.DataFrame)->tuple[pd.DataFrame,str]:ifself.centeredisNone:returnpd.DataFrame(),""center_event=self.centered.eventoccurrence=self.centered.occurrenceifcenter_eventnotindata[self.event_col].unique():error_text='Event "{}" from \'centered\' dict not found in the column: "{}"'.format(center_event,self.event_col)raiseValueError(error_text)# keep only users who have center_event at least N = occurrence timesdata["occurrence"]=data[self.event_col]==center_eventdata["occurrence_counter"]=data.groupby(self.weight_col)["occurrence"].cumsum()*data["occurrence"]users_to_keep=data[data["occurrence_counter"]==occurrence][self.weight_col].unique()iflen(users_to_keep)==0:raiseValueError(f'no records found with event "{center_event}" occurring N={occurrence} times')fraction_used=len(users_to_keep)/data[self.weight_col].nunique()*100iffraction_used<100:fraction_title=f"({fraction_used:.1f}% of total records"else:fraction_title=""data=data[data[self.weight_col].isin(users_to_keep)].copy()data=data.groupby(self.weight_col,group_keys=True).apply(self._pad_to_center)# type: ignoredata=data[data["event_rank"]>0].copy()returndata,fraction_title@staticmethoddef_sort_matrix(step_matrix:pd.DataFrame)->pd.DataFrame:x=step_matrix.copy()order=[]foriinx.columns:new_r=x[i].idxmax()# type: ignoreorder.append(new_r)x=x.drop(new_r)# type: ignoreifx.shape[0]==0:breakorder.extend(list(set(step_matrix.index)-set(order)))returnstep_matrix.loc[order]def_create_title(self,fraction_title:str)->str:title_centered="centered"ifself.centeredelse""title_diff="differential "ifself.groupselse""title_threshold="threshold enabled)"ifself.threshold>0else""ifself.threshold>0andfraction_title=="":title_technical="("elifself.threshold>0andfraction_title!="":title_technical=", "elifself.threshold==0andfraction_title!="":title_technical=")"else:title_technical=""ifself.groups:ifself.group_names:title_groups=f", {self.group_names[0]} vs. {self.group_names[1]}"else:title_groups=", group 1 vs. group 2"else:title_groups=""title=f"{title_centered}{title_diff}step matrix{title_groups}\n"title+=f"{fraction_title}{title_technical}{title_threshold}"returntitledef_render_plot(self,data:pd.DataFrame,targets:pd.DataFrame|None,targets_list:list[list[str]]|None,title:str|None,)->Any:shape=len(data)+len(targets)iftargetsisnotNoneelselen(data),data.shape[1]collect_data_performance(scope="step_matrix",event_name="metadata",called_params={},performance_data={"unique_nodes":data.shape[0],"shape":shape},eventstream_index=self.__eventstream._eventstream_index,)n_rows=1+(len(targets_list)iftargets_listelse0)n_cols=1grid_specs=({"wspace":0.08,"hspace":0.04,"height_ratios":[data.shape[0],*list(map(len,targets_list))]}iftargetsisnotNoneandtargets_listisnotNoneelse{})figure,axs=sns.mpl.pyplot.subplots(n_rows,n_cols,sharex=True,figsize=(round(data.shape[1]*0.7),round((len(data)+(len(targets)iftargetsisnotNoneelse0))*0.6),),gridspec_kw=grid_specs,)heatmap=sns.heatmap(data,yticklabels=data.index,annot=True,fmt=f".{self.precision}f",ax=axs[0]iftargetsisnotNoneelseaxs,cmap="RdGy",center=0,cbar=False,)heatmap.set_title(title,fontsize=16)iftargetsisnotNoneandtargets_listisnotNone:target_cmaps=itertools.cycle(["BrBG","PuOr","PRGn","RdBu"])forn,iinenumerate(targets_list):sns.heatmap(targets.loc[i],yticklabels=targets.loc[i].index,annot=True,fmt=f".{self.precision}f",ax=axs[1+n],cmap=next(target_cmaps),center=0,vmin=targets.loc[i].values.min(),vmax=targets.loc[i].values.max()or1,cbar=False,)foraxinaxs:sns.mpl.pyplot.sca(ax)sns.mpl.pyplot.yticks(rotation=0)# add vertical lines for central step-matrixifself.centeredisnotNone:centered_position=self.centered.left_gapax.vlines([centered_position-0.02,centered_position+0.98],*ax.get_ylim(),colors="Black",linewidth=0.7,)else:sns.mpl.pyplot.sca(axs)sns.mpl.pyplot.yticks(rotation=0)# add vertical lines for central step-matrixifself.centeredisnotNone:centered_position=self.centered.left_gapaxs.vlines([centered_position-0.02,centered_position+0.98],*axs.get_ylim(),colors="Black",linewidth=0.7)returnaxs
[docs]@time_performance(scope="step_matrix",event_name="fit",)deffit(self,max_steps:int=20,weight_col:str|None=None,precision:int=2,targets:list[str]|str|None=None,accumulated:Literal["both","only"]|None=None,sorting:list|None=None,threshold:float=0.01,centered:dict|None=None,groups:SplitExpr|None=None,)->None:""" Calculates the step matrix internal values with the defined parameters. Applying ``fit`` method is necessary for the following usage of any visualization or descriptive ``StepMatrix`` methods. Parameters ---------- max_steps : int, default 20 Maximum number of steps in ``user path`` to include. weight_col : str, optional Aggregation column for edge weighting. If ``None``, specified ``user_id`` from ``eventstream.schema`` will be used. For example, can be specified as ``session_id`` if ``eventstream`` has such ``custom_col``. precision : int, default 2 Number of decimal digits after 0 to show as fractions in the ``heatmap``. targets : list of str or str, optional List of event names to include in the bottom of ``step_matrix`` as individual rows. Each specified target will have separate color-coding space for clear visualization. `Example: ['product_page', 'cart', 'payment']` If multiple targets need to be compared and plotted using the same color-coding scale, such targets must be combined in a sub-list. `Example: ['product_page', ['cart', 'payment']]` accumulated : {"both", "only"}, optional Option to include accumulated values for targets. - If ``None``, accumulated tartes are not shown. - If ``both``, show step values and accumulated values. - If ``only``, show targets only as accumulated. sorting : list of str, optional - If list of event names specified - lines in the heatmap will be shown in the passed order. - If ``None`` - rows will be ordered according to i`th value (first row, where 1st element is max; second row, where second element is max; etc) threshold : float, default 0.01 Used to remove rare events. Aggregates all rows where all values are less than the specified threshold. centered : dict, optional Parameter used to align user paths at a specific event at a specific step. Has to contain three keys: - ``event``: str, name of event to align. - ``left_gap``: int, number of events to include before specified event. - ``occurrence`` : int which occurrence of event to align (typical 1). If not ``None`` - only users who have selected events with the specified ``occurrence`` in their paths will be included. ``Fraction`` of such remaining users is specified in the title of centered step_matrix. `Example: {'event': 'cart', 'left_gap': 8, 'occurrence': 1}` groups : tuple[list, list], tuple[str, str, str], str, optional Specify two groups of paths to plot differential step_matrix. Two separate step matrices M1 and M2 will be calculated for these groups. Resulting matrix is M = M1 - M2. - If ``tuple[list, list]``, each sub-list should contain valid path ids. - If ``tuple[str, str, str]``, the first str should refer to a segment name, the others should refer to the corresponding segment values. - If ``str``, it should refer to a binary (i.e. containing two segment values only) segment name. Notes ----- During step matrix calculation an artificial ``ENDED`` event is created. If a path already contains ``path_end`` event (See :py:class:`.AddStartEndEvents`), it will be temporarily replaced with ``ENDED`` (within step matrix only). Otherwise, ``ENDED`` event will be explicitly added to the end of each path. Event ``ENDED`` is cumulated so that the values in its row are summed up from the first step to the last. ``ENDED`` row is always placed at the last line of step matrix. This design guarantees that the sum of any step matrix's column is 1 (0 for a differential step matrix). """called_params={"max_steps":max_steps,"weight_col":weight_col,"precision":precision,"targets":targets,"accumulated":accumulated,"sorting":sorting,"threshold":threshold,"centered":centered,"groups":groups,}not_hash_values=["accumulated","centered"]self.max_steps=max_stepsself.precision=precisionself.targets=targetsself.accumulated=accumulatedself.sorting=sortingself.threshold=thresholdself.centered=CenteredParams(**centered)ifcenteredelseNoneifgroups:groups_,group_names=_split_segment(self.__eventstream,groups)self.groups=groups_self.group_names=group_namesself.weight_col=weight_colorself.__eventstream.schema.user_idweight_col=self.weight_colorself.user_coldata=self.__eventstream.to_dataframe()data=self._add_ended_events(data=data,schema=self.__eventstream.schema,weight_col=self.weight_col)data["event_rank"]=data.groupby(weight_col).cumcount()+1# BY HERE WE NEED TO OBTAIN FINAL DIFF piv and piv_targets before sorting, thresholding and plotting:ifself.groups:data_pos=data[data[weight_col].isin(self.groups[0])].copy()iflen(data_pos)==0:raiseIndexError("Users from positive group are not present in dataset")piv_pos,piv_targets_pos,fraction_title,targets_plot=self._step_matrix_values(data=data_pos)data_neg=data[data[weight_col].isin(self.groups[1])].copy()iflen(data_pos)==0:raiseIndexError("Users from negative group are not present in dataset")piv_neg,piv_targets_neg,fraction_title,targets_plot=self._step_matrix_values(data=data_neg)piv_pos,piv_neg=self._align_index(piv_pos,piv_neg)piv=piv_pos-piv_negifself.targetsandlen(piv_targets_pos)>0andlen(piv_targets_neg)>0:# type: ignorepiv_targets_pos,piv_targets_neg=self._align_index(piv_targets_pos,piv_targets_neg)# type: ignorepiv_targets=piv_targets_pos-piv_targets_negelse:piv_targets=Noneelse:piv,piv_targets,fraction_title,targets_plot=self._step_matrix_values(data=data)threshold_index="THRESHOLDED_"ifself.threshold!=0:# find if there are any rows to threshold:thresholded=piv.loc[(piv.abs()<self.threshold).all(axis=1)&(piv.index!=self.ENDED_EVENT)].copy()iflen(thresholded)>0:piv=piv.loc[(piv.abs()>=self.threshold).any(axis=1)|(piv.index==self.ENDED_EVENT)].copy()threshold_index=f"{threshold_index}{len(thresholded)}"piv.loc[threshold_index]=thresholded.sum()ifself.sortingisNone:piv=self._sort_matrix(piv)keep_in_the_end=[]keep_in_the_end.append(self.ENDED_EVENT)if(self.ENDED_EVENTinpiv.index)elseNonekeep_in_the_end.append(threshold_index)if(threshold_indexinpiv.index)elseNoneevents_order=[*(iforiinpiv.indexifinotinkeep_in_the_end),*keep_in_the_end]piv=piv.loc[events_order]else:if{*self.sorting}!={*piv.index}:raiseValueError("The sorting list provided does not match the list of events. ""Run with `sorting` = None to get the actual list")piv=piv.loc[self.sorting]ifself.centered:window=self.centered.left_gappiv.columns=[f"{int(i)-window-1}"foriinpiv.columns]# type: ignoreifself.targetsandpiv_targetsisnotNone:piv_targets.columns=[f"{int(i)-window-1}"foriinpiv_targets.columns]# type: ignorefull_title=self._create_title(fraction_title)self._result_data=pivself._result_targets=piv_targetsself._title=full_titleself._targets_list=targets_plotshape=((len(self._result_data)+len(self._result_targets)ifself._result_targetsisnotNoneelselen(self._result_data)),self._result_data.shape[1],)collect_data_performance(scope="step_matrix",event_name="metadata",called_params=called_params,not_hash_values=not_hash_values,performance_data={"unique_nodes":self._result_data.shape[0],"shape":shape},eventstream_index=self.__eventstream._eventstream_index,)
[docs]@time_performance(scope="step_matrix",event_name="plot",)defplot(self)->Any:""" Create a heatmap plot based on the calculated step matrix values. Should be used after :py:func:`fit`. Returns ------- matplotlib.axes.Axes """axes=self._render_plot(self._result_data,self._result_targets,self._targets_list,self._title)returnaxes
@property@time_performance(scope="step_matrix",event_name="values",)defvalues(self)->tuple[pd.DataFrame,pd.DataFrame|None]:""" Returns the calculated step matrix as a pd.DataFrame. Should be used after :py:func:`fit`. Returns ------- tuple[pd.DataFrame, pd.DataFrame | None] 1. Stands for the step matrix. 2. Stands for a separate step matrix related for target events only. """returnself._result_data,self._result_targets@property@time_performance(# type: ignorescope="step_matrix",event_name="params",)defparams(self)->dict:""" Returns the parameters used for the last fitting. Should be used after :py:func:`fit`. """return{"max_steps":self.max_steps,"weight_col":self.weight_col,"precision":self.precision,"targets":self.targets,"accumulated":self.accumulated,"sorting":self.sorting,"threshold":self.threshold,"centered":self.centered,"groups":self.groups,}