[docs]classEventstream(CollapseLoopsHelperMixin,DropPathsHelperMixin,FilterEventsHelperMixin,GroupEventsHelperMixin,GroupEventsBulkHelperMixin,LabelLostUsersHelperMixin,AddNegativeEventsHelperMixin,LabelNewUsersHelperMixin,AddPositiveEventsHelperMixin,SplitSessionsHelperMixin,AddStartEndEventsHelperMixin,LabelCroppedPathsHelperMixin,TruncatePathsHelperMixin,RenameHelperMixin,PipeHelperMixin,AddSegmentHelperMixin,DropSegmentHelperMixin,EventstreamType,RenameSegmentHelperMixin,RemapSegmentHelperMixin,MaterializeSegmentHelperMixin,):""" Collection of tools for storing and processing clickstream data. Parameters ---------- raw_data : pd.DataFrame or pd.Series Raw clickstream data. raw_data_schema : dict or RawDataSchema, optional Represents mapping rules connecting important eventstream columns with the raw data columns. The keys are defined in :py:class:`.RawDataSchema`. The values are the corresponding column names in the raw data. ``custom_cols`` key stands for the defining additional columns that can be used in the eventstream. See the :ref:`Eventstream user guide <eventstream_raw_data_schema>` for the details. schema : dict or EventstreamSchema, optional Represents a schema of the created eventstream. The keys are defined in :py:class:`.EventstreamSchema`. The values are the names of the corresponding eventstream columns. See the :ref:`Eventstream user guide <eventstream_field_names>` for the details. custom_cols : list of str, optional The list of additional columns from the raw data to be included in the eventstream. If not defined, all the columns from the raw data are included. prepare : bool, default True - If ``True``, input data will be transformed in the following way: - ``event_timestamp`` column is converted to pandas datetime format. - ``event_type`` column is added and filled with ``raw`` value. If the column exists, it remains unchanged. - If ``False`` - ``raw_data`` will be remained as is. index_order : list of str, default DEFAULT_INDEX_ORDER Sorting order for ``event_type`` column. user_sample_size : int of float, optional Number (``int``) or share (``float``) of all users' trajectories that will be randomly chosen and left in final sample (all other trajectories will be removed) . See :numpy_random_choice:`numpy documentation<>`. user_sample_seed : int, optional A seed value that is used to generate user samples. See :numpy_random_seed:`numpy documentation<>`. events_order : list of str, optional Sorting order for ``event_name`` column, if there are events with equal timestamps inside each user trajectory. The order of raw events is fixed once while eventstream initialization. add_start_end_events : bool, default True If True, ``path_start`` and ``path_end`` synthetic events are added to each path explicitly. See also :py:class:`.AddStartEndEvents` documentation. convert_tz : 'local' or 'UTC', optional Timestamp column with timezones is not supported in the eventstream and should be explicitly converted. - If ``UTC``, the timestamp column will be converted to utc time, and the timezone part will be truncated. - If ``local``, the timezone will be truncated. Notes ----- See :doc:`Eventstream user guide</user_guides/eventstream>` for the details. """schema:EventstreamSchemaevents_order:OrderDictindex_order:OrderDict__hash:str=""_preprocessing_graph:PreprocessingGraph|None=None__raw_data_schema:RawDataSchemaType__events:pd.DataFrame|pd.Series[Any]__funnel:Funnel__clusters:Clusters__segment_overview:SegmentOverview__cohorts:Cohorts__step_matrix:StepMatrix__sankey:StepSankey__stattests:StatTests__sequences:Sequences__transition_graph:TransitionGraph__transition_matrix:TransitionMatrix__timedelta_hist:TimedeltaHist__user_lifetime_hist:UserLifetimeHist__event_timestamp_hist:EventTimestampHist__eventstream_index:int@time_performance(scope="eventstream",event_name="init",)def__init__(self,raw_data:pd.DataFrame|pd.Series[Any],raw_data_schema:(RawDataSchema|RawDataSchemaType|dict[str,str|list[RawDataCustomColSchema]]|None)=None,schema:EventstreamSchema|dict[str,str|list[str]]|None=None,prepare:bool=True,index_order:Optional[Union[OrderList,OrderDict]]=None,user_sample_size:Optional[int|float]=None,user_sample_seed:Optional[int]=None,events_order:Optional[Union[OrderList,OrderDict]]=None,custom_cols:List[str]|None=None,add_start_end_events:bool=True,convert_tz:Optional[Literal["UTC","local"]]=None,segment_cols:Optional[List[str]]=None,)->None:tracking_params=dict(raw_data=raw_data,prepare=prepare,index_order=index_order,user_sample_size=user_sample_size,user_sample_seed=user_sample_seed,events_order=events_order,custom_cols=custom_cols,add_start_end_events=add_start_end_events,convert_tz=convert_tz,segment_cols=segment_cols,)not_hash_values=["raw_data_schema","schema","convert_tz","index_order","events_order","segment_cols"]ifnotschema:schema=EventstreamSchema()elifisinstance(schema,dict):schema=EventstreamSchema(**schema)# type: ignoreself.schema=schemaself.__eventstream_index:int=counter.get_eventstream_index()ifnotraw_data_schema:raw_data_schema=RawDataSchema()ifself.schema.event_typeinraw_data.columns:raw_data_schema.event_type=self.schema.event_typeelifisinstance(raw_data_schema,dict):raw_data_schema=RawDataSchema(**raw_data_schema)# type: ignoreself.__raw_data_schema=raw_data_schemaifcustom_colsisNoneandnotself.__raw_data_schema.custom_colsandnotself.schema.custom_cols:custom_cols=self.__define_default_custom_cols(raw_data=raw_data)ifcustom_colsandprepare:self.__raw_data_schema.custom_cols=[]self.schema.custom_cols=[]forcol_nameincustom_cols:col:RawDataCustomColSchema={"raw_data_col":col_name,"custom_col":col_name}self.__raw_data_schema.custom_cols.append(col)self.schema.custom_cols.append(col_name)self.convert_tz=convert_tzraw_data_schema_default_values=asdict(RawDataSchema())schema_default_values=asdict(EventstreamSchema())ifisinstance(raw_data_schema,RawDataSchema):tracking_params["raw_data_schema"]=[keyforkey,valueinraw_data_schema_default_values.items()ifasdict(raw_data_schema)[key]!=value]tracking_params["schema"]=[keyforkey,valueinschema_default_values.items()ifasdict(self.schema)[key]!=value]self.__track_dataset(name="metadata",data=raw_data,params=tracking_params,schema=self.__raw_data_schema,not_hash_values=not_hash_values,)ifuser_sample_sizeisnotNone:raw_data=self.__sample_user_paths(raw_data,raw_data_schema,user_sample_size,user_sample_seed)ifnotindex_order:self.index_order=dictify(DEFAULT_INDEX_ORDER)else:self.index_order=dictify(index_order)ifevents_orderisnotNone:self.events_order=dictify(events_order)else:self.events_order=dict()self.__events=self.__prepare_events(raw_data)ifprepareelseraw_dataself.__events=self.__required_cleanup(events=self.__events)self.__apply_default_dataprocessors(add_start_end_events=add_start_end_events,segment_cols=segment_cols)self.index_events()ifprepare:self.__track_dataset(name="metadata",data=self.__events,params=tracking_params,schema=self.schema,not_hash_values=not_hash_values,)self._preprocessing_graph=None@propertydef_eventstream_index(self)->int:returnself.__eventstream_index@propertydef_hash(self)->str:ifself.__hash=="":self.__hash=hash_dataframe(self.__events)returnself.__hashdef__track_dataset(self,name:str,data:pd.DataFrame|pd.Series[Any],params:dict[str,Any],schema:RawDataSchema|RawDataSchemaType|EventstreamSchema,not_hash_values:list[str],)->None:try:unique_users=data[schema.user_id].nunique()exceptExceptionase:unique_users=Nonetry:unique_events=data[schema.event_name].nunique()exceptExceptionase:unique_events=Nonetry:hist_data=data[schema.user_id].drop_duplicates()iflen(hist_data)>=500:hist_data=hist_data.sample(500,random_state=42)eventstream_hist=(data[data[schema.user_id].isin(hist_data)].groupby(schema.user_id).size().value_counts().to_dict())exceptException:eventstream_hist={}eventstream_hash=hash_dataframe(data=data)self.__hash=eventstream_hashperformance_data:dict[str,Any]={"shape":data.shape,"custom_cols":len(self.schema.custom_cols),"unique_users":unique_users,"unique_events":unique_events,"hash":self._hash,"eventstream_hist":eventstream_hist,"index":self.__eventstream_index,}collect_data_performance(scope="eventstream",event_name=name,called_params=params,not_hash_values=not_hash_values,performance_data=performance_data,eventstream_index=self._eventstream_index,)
[docs]@time_performance(scope="eventstream",event_name="copy",)defcopy(self)->Eventstream:""" Make a copy of current ``eventstream``. Returns ------- Eventstream """copied_eventstream=Eventstream(raw_data_schema=self.__raw_data_schema.copy(),raw_data=self.__events.copy(),schema=self.schema.copy(),prepare=False,index_order=self.index_order.copy(),events_order=self.events_order.copy(),add_start_end_events=False,)collect_data_performance(scope="eventstream",event_name="metadata",called_params={},performance_data={},eventstream_index=self._eventstream_index,parent_eventstream_index=self._eventstream_index,child_eventstream_index=copied_eventstream._eventstream_index,)returncopied_eventstream
[docs]@time_performance(scope="eventstream",event_name="append_eventstream",)defappend_eventstream(self,eventstream:Eventstream)->None:# type: ignore""" Append ``eventstream`` with the same schema. Parameters ---------- eventstream : Eventstream Returns ------- eventstream Raises ------ ValueError If ``EventstreamSchemas`` of two ``eventstreams`` are not equal. """ifnotself.schema.is_equal(eventstream.schema):raiseValueError("invalid schema: joined eventstream")collect_data_performance(scope="eventstream",event_name="metadata",called_params={},performance_data={},eventstream_index=self._eventstream_index,parent_eventstream_index=eventstream._eventstream_index,child_eventstream_index=self._eventstream_index,)curr_events=self.to_dataframe()new_events=eventstream.to_dataframe()merged_events=pd.merge(curr_events,new_events,left_on=self.schema.event_id,right_on=self.schema.event_id,how="outer",indicator=True,)left_events=merged_events[(merged_events["_merge"]=="left_only")]both_events=merged_events[(merged_events["_merge"]=="both")]right_events=merged_events[(merged_events["_merge"]=="right_only")]right_events=pd.concat([right_events,both_events])cols=self.schema.get_cols()result_left_part=pd.DataFrame()result_right_part=pd.DataFrame()result_both_part=pd.DataFrame()withwarnings.catch_warnings():# disable warning for pydantic schema Callable typewarnings.simplefilter(action="ignore",category=FutureWarning)forcolincols:result_left_part[col]=get_merged_col(df=left_events,colname=col,suffix="_x")result_right_part[col]=get_merged_col(df=right_events,colname=col,suffix="_y")self.__events=pd.concat([result_left_part,result_both_part,result_right_part])self.index_events()
[docs]@time_performance(scope="eventstream",event_name="to_dataframe",)defto_dataframe(self,copy:bool=False,drop_segment_events:bool=True)->pd.DataFrame:""" Convert ``eventstream`` to ``pd.DataFrame`` Parameters ---------- copy : bool, default False If ``True`` copy data from current eventstream. See details in the :pandas_copy:`pandas documentation<>`. drop_segment_events : bool, default True If ``True`` remove segment synthetic events. Returns ------- pd.DataFrame """params:dict[str,Any]={"copy":copy,"drop_segment_events":drop_segment_events,}schema=self.schemaevents=self.__eventsifdrop_segment_events:events=events[events[schema.event_type]!=SEGMENT_TYPE]view=pd.DataFrame(events,columns=self.schema.get_cols(),copy=copy)self.__track_dataset(name="metadata",data=view,params=params,schema=self.schema,not_hash_values=[])returnview
[docs]@time_performance(scope="eventstream",event_name="index_events",)defindex_events(self)->None:""" Sort and index eventstream using DEFAULT_INDEX_ORDER. Returns ------- None """collect_data_performance(scope="eventstream",event_name="metadata",called_params={},performance_data={},eventstream_index=self._eventstream_index,)order_temp_col_name="order"indexed=self.__eventsindexed[order_temp_col_name]=np.vectorize(self.__get_events_priority_by_type,otypes=[np.int64])(indexed[self.schema.event_type])indexed=indexed.sort_values([self.schema.event_timestamp,self.schema.event_index,order_temp_col_name])# type: ignoreindexed=indexed.drop([order_temp_col_name],axis=1)indexed.reset_index(inplace=True,drop=True)self.__events=indexed
[docs]@time_performance(scope="eventstream",event_name="add_custom_col",)defadd_custom_col(self,name:str,data:pd.Series[Any]|None)->None:""" Add custom column to an existing ``eventstream``. Parameters ---------- name : str New column name. data : pd.Series - If ``pd.Series`` - new column with given values will be added. - If ``None`` - new column will be filled with ``np.nan``. Returns ------- Eventstream """collect_data_performance(scope="eventstream",event_name="metadata",called_params={"name":name,"data":data},performance_data={},eventstream_index=self._eventstream_index,)ifnamenotinself.schema.custom_cols:self.__raw_data_schema.custom_cols.extend([{"custom_col":name,"raw_data_col":name}])self.schema.custom_cols.extend([name])self.__events[name]=data
def__define_default_custom_cols(self,raw_data:pd.DataFrame|pd.Series[Any])->List[str]:raw_data_cols=self.__raw_data_schema.get_default_cols()schema_cols=self.schema.get_default_cols()cols_denylist:List[str]=raw_data_cols+schema_colscustom_cols:List[str]=[]forraw_col_nameinraw_data.columns:ifraw_col_nameincols_denylist:continuecustom_cols.append(raw_col_name)returncustom_colsdef__required_cleanup(self,events:pd.DataFrame|pd.Series[Any])->pd.DataFrame|pd.Series[Any]:income_size=len(events)events.dropna(# type: ignoresubset=[self.schema.event_name,self.schema.event_timestamp,self.schema.user_id],inplace=True)size_after_cleanup=len(events)if(removed_rows:=income_size-size_after_cleanup)>0:warnings.warn("Removed %s rows because they have empty %s or %s or %s"%(removed_rows,self.schema.event_name,self.schema.event_timestamp,self.schema.user_id))returneventsdef__prepare_events(self,raw_data:pd.DataFrame|pd.Series[Any])->pd.DataFrame|pd.Series[Any]:events=pd.DataFrame(index=raw_data.index)ifself.__raw_data_schema.event_idisnotNoneandself.__raw_data_schema.event_idinraw_data.columns:events[self.schema.event_id]=raw_data[self.__raw_data_schema.event_id]else:events[self.schema.event_id]=[uuid.uuid4()forxinrange(len(events))]events[self.schema.event_name]=self.__get_col_from_raw_data(raw_data=raw_data,colname=self.__raw_data_schema.event_name,)events[self.schema.event_timestamp]=self.__get_col_from_raw_data(raw_data=raw_data,colname=self.__raw_data_schema.event_timestamp)events[self.schema.event_timestamp]=pd.to_datetime(events[self.schema.event_timestamp])has_tz=self.convert_tzisnotNoneifnothas_tz:try:has_tz=events[self.schema.event_timestamp].dt.tzisnotNoneexceptAttributeError:has_tz=any(date.tzinfofordateinevents[self.schema.event_timestamp])ifhas_tz:ifnotself.convert_tz:error_text=("Eventstream doesn't support working with the timestamps that contain timezone data.\n""Use convert_tz='UTC' or convert_tz='local' arguments to convert the timezones explicitly.")raiseTypeError(error_text)elifself.convert_tz=="UTC":events[self.schema.event_timestamp]=pd.to_datetime(events[self.schema.event_timestamp],utc=True).dt.tz_localize(None)elifself.convert_tz=="local":# @TODO: regexp is potentially dangerous since it may not cover all possible data formats.# It'd be better to find more native yet vectorized way to do this. Vladimir Kukushkintz_pattern=r"[+-]\d{2}:\d{2}"cut_timezones=events[self.schema.event_timestamp].astype(str).str.replace(tz_pattern,"",regex=True)events[self.schema.event_timestamp]=pd.to_datetime(cut_timezones)else:raiseTypeError(f"convert_tz parameter should be either None, or 'UTC', or 'local'")events[self.schema.user_id]=self.__get_col_from_raw_data(raw_data=raw_data,colname=self.__raw_data_schema.user_id,)ifself.__raw_data_schema.event_typeisnotNone:events[self.schema.event_type]=self.__get_col_from_raw_data(raw_data=raw_data,colname=self.__raw_data_schema.event_type,)else:events[self.schema.event_type]="raw"forcustom_col_schemainself.__raw_data_schema.custom_cols:raw_data_col=custom_col_schema["raw_data_col"]custom_col=custom_col_schema["custom_col"]ifcustom_colnotinself.schema.custom_cols:self.schema.custom_cols.append(custom_col)events[custom_col]=self.__get_col_from_raw_data(raw_data=raw_data,colname=raw_data_col,)forcustom_colinself.schema.custom_cols:ifcustom_colinevents.columns:continueevents[custom_col]=np.nanifself.__raw_data_schema.event_indexisnotNoneandself.__raw_data_schema.event_indexinraw_data.columns:events[self.schema.event_index]=raw_data[self.__raw_data_schema.event_index].astype("int64")else:events=self._create_index(events=events)# type: ignorereturneventsdef__apply_default_dataprocessors(self,add_start_end_events:bool,segment_cols:Optional[List[str]]=None)->None:fromretentioneering.data_processors_lib.add_segmentimport(AddSegment,AddSegmentParams,)fromretentioneering.data_processors_lib.add_start_end_eventsimport(AddStartEndEvents,AddStartEndEventsParams,)ifadd_start_end_events:add_start_end_processor=AddStartEndEvents(AddStartEndEventsParams())self.__events=add_start_end_processor.apply(self.__events,self.schema)# type: ignoreifsegment_cols:forsegment_colinsegment_cols:add_segment_processor=AddSegment(AddSegmentParams(segment=segment_col))# type: ignoreself.__events=add_segment_processor.apply(self.__events,self.schema)# type: ignoredef__get_col_from_raw_data(self,raw_data:pd.DataFrame|pd.Series[Any],colname:str,create:bool=False)->pd.Series|float:ifcolnameinraw_data.columns:returnraw_data[colname]else:ifcreate:returnnp.nanelse:raiseValueError(f'invalid raw data. Column "{colname}" does not exists!')def__get_events_priority_by_type(self,event_type:str)->int:returnself.index_order.get(event_type,len(self.index_order))def__get_events_priority_by_config(self,event_name:str)->int:returnself.events_order.get(event_name,len(self.events_order))def__sample_user_paths(self,raw_data:pd.DataFrame|pd.Series[Any],raw_data_schema:RawDataSchemaType,user_sample_size:Optional[int|float]=None,user_sample_seed:Optional[int]=None,)->pd.DataFrame|pd.Series[Any]:iftype(user_sample_size)isnotfloatandtype(user_sample_size)isnotint:raiseTypeError('"user_sample_size" has to be a number(float for user share or int for user amount)')ifuser_sample_size<0:raiseValueError("User sample size/share cannot be negative!")iftype(user_sample_size)isfloat:ifuser_sample_size>1:raiseValueError("User sample share cannot exceed 1!")user_col_name=raw_data_schema.user_idunique_users=raw_data[user_col_name].unique()iftype(user_sample_size)isint:sample_size=user_sample_sizeeliftype(user_sample_size)isfloat:sample_size=int(user_sample_size*len(unique_users))else:returnraw_dataifuser_sample_seedisnotNone:np.random.seed(user_sample_seed)sample_users=np.random.choice(unique_users,sample_size,replace=False)raw_data_sampled=raw_data.loc[raw_data[user_col_name].isin(sample_users),:]# type: ignorereturnraw_data_sampled
[docs]@time_performance(scope="funnel",event_name="helper",event_value="plot",)deffunnel(self,stages:list[str],stage_names:list[str]|None=None,funnel_type:Literal["open","closed","hybrid"]="closed",groups:SplitExpr|None=None,group_names:UserGroupsNamesType|None=None,show_plot:bool=True,)->Funnel:""" Show a visualization of the user sequential events represented as a funnel. Parameters ---------- show_plot : bool, default True If ``True``, a funnel visualization is shown. See other parameters' description :py:class:`.Funnel` Returns ------- Funnel A ``Funnel`` class instance fitted to the given parameters. """params={"stages":stages,"stage_names":stage_names,"funnel_type":funnel_type,"groups":groups,"group_names":group_names,"show_plot":show_plot,}collect_data_performance(scope="funnel",event_name="metadata",called_params=params,not_hash_values=["funnel_type"],performance_data={},eventstream_index=self._eventstream_index,)self.__funnel=Funnel(eventstream=self)self.__funnel.fit(stages=stages,stage_names=stage_names,funnel_type=funnel_type,groups=groups,group_names=group_names,)ifshow_plot:figure=self.__funnel.plot()figure.show()returnself.__funnel
[docs]@time_performance(scope="get_clusters",event_name="helper",event_value="fit",)defget_clusters(self,X:pd.DataFrame,method:CLUSTERING_METHODS,n_clusters:int|None=None,scaler:SCALERS|None=None,random_state:int|None=None,segment_name:str="cluster_id",**kwargs:Any,)->EventstreamType|None:""" Split paths into clusters and save their labels as a segment. Parameters ---------- X : pd.DataFrame The input data to cluster. method : {"kmeans", "gmm", "hdbscan"} The clustering method to use. n_clusters : int, optional The number of clusters to form. Actual for ``kmeans`` and ``gmm`` methods. If ``n_clusters=None`` and ``method="kmeans"``, the elbow curve chart is displayed. scaler : {"minmax", "std"}, optional The scaling method to apply to the data before clustering. If None, no scaling is applied. random_state : int, optional A seed used by the random number generator for reproducibility. segment_name : str, default "cluster_id" The name of the segment that will contain the cluster labels. **kwargs Additional keyword arguments to pass to the clustering methods. Returns ------- Eventstream or None If ``n_clusters`` is specified, a new Eventstream object with clusters integrated as a segment ``segment_name`` is returned; otherwise, returns ``None``. """params={"method":method,"n_clusters":n_clusters,"random_state":random_state,"segment_name":segment_name,}hash_before=hash_dataframe(self.to_dataframe())shape_before=self.to_dataframe().shapeself.__clusters=Clusters(eventstream=self)new_stream=self.__clusters.fit(X=X,method=method,n_clusters=n_clusters,scaler=scaler,random_state=random_state,segment_name=segment_name,**kwargs,)collect_data_performance(scope="get_clusters",event_name="metadata",called_params=params,not_hash_values=["X","scaler"],performance_data={"parent":{"shape":shape_before,"hash":hash_before,},"child":{"shape":new_stream.to_dataframe().shape,"hash":hash_dataframe(new_stream.to_dataframe()),},},eventstream_index=self._eventstream_index,)ifn_clustersisnotNone:returnnew_streamreturnself
[docs]@time_performance(scope="step_matrix",event_name="helper",event_value="plot",)defstep_matrix(self,max_steps:int=20,weight_col:str|None=None,precision:int=2,targets:list[str]|str|None=None,accumulated:Literal["both","only"]|None=None,sorting:list|None=None,threshold:float=0.01,centered:dict|None=None,groups:SplitExpr|None=None,show_plot:bool=True,)->StepMatrix:""" Show a heatmap visualization of the step matrix. Parameters ---------- show_plot : bool, default True If ``True``, a step matrix heatmap is shown. See other parameters' description :py:class:`.StepMatrix` Returns ------- StepMatrix A ``StepMatrix`` class instance fitted to the given parameters. """params={"max_steps":max_steps,"weight_col":weight_col,"precision":precision,"targets":targets,"accumulated":accumulated,"sorting":sorting,"threshold":threshold,"centered":centered,"groups":groups,"show_plot":show_plot,}not_hash_values=["accumulated","centered"]collect_data_performance(scope="step_matrix",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__step_matrix=StepMatrix(eventstream=self)self.__step_matrix.fit(max_steps=max_steps,weight_col=weight_col,precision=precision,targets=targets,accumulated=accumulated,sorting=sorting,threshold=threshold,centered=centered,groups=groups,)ifshow_plot:self.__step_matrix.plot()returnself.__step_matrix
[docs]@time_performance(scope="step_sankey",event_name="helper",event_value="plot",)defstep_sankey(self,max_steps:int=10,threshold:int|float=0.05,sorting:list|None=None,targets:list[str]|str|None=None,autosize:bool=True,width:int|None=None,height:int|None=None,show_plot:bool=True,)->StepSankey:""" Show a Sankey diagram visualizing the user paths in stepwise manner. Parameters ---------- show_plot : bool, default True If ``True``, a sankey diagram is shown. See other parameters' description :py:class:`.StepSankey` Returns ------- StepSankey A ``StepSankey`` class instance fitted to the given parameters. """params={"max_steps":max_steps,"threshold":threshold,"sorting":sorting,"targets":targets,"autosize":autosize,"width":width,"height":height,"show_plot":show_plot,}collect_data_performance(scope="step_sankey",event_name="metadata",called_params=params,performance_data={},eventstream_index=self._eventstream_index,)self.__sankey=StepSankey(eventstream=self)self.__sankey.fit(max_steps=max_steps,threshold=threshold,sorting=sorting,targets=targets)ifshow_plot:figure=self.__sankey.plot(autosize=autosize,width=width,height=height)figure.show()returnself.__sankey
[docs]@time_performance(scope="cohorts",event_name="helper",event_value="heatmap",)defcohorts(self,cohort_start_unit:DATETIME_UNITS,cohort_period:Tuple[int,DATETIME_UNITS],average:bool=True,cut_bottom:int=0,cut_right:int=0,cut_diagonal:int=0,width:float=5.0,height:float=5.0,show_plot:bool=True,)->Cohorts:""" Show a heatmap visualization of the user appearance grouped by cohorts. Parameters ---------- show_plot : bool, default True If ``True``, a cohort matrix heatmap is shown. See other parameters' description :py:class:`.Cohorts` Returns ------- Cohorts A ``Cohorts`` class instance fitted to the given parameters. """params={"cohort_start_unit":cohort_start_unit,"cohort_period":cohort_period,"average":average,"cut_bottom":cut_bottom,"cut_right":cut_right,"cut_diagonal":cut_diagonal,"width":width,"height":height,"show_plot":show_plot,}not_hash_values=["cohort_start_unit","cohort_period"]collect_data_performance(scope="cohorts",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__cohorts=Cohorts(eventstream=self)self.__cohorts.fit(cohort_start_unit=cohort_start_unit,cohort_period=cohort_period,average=average,cut_bottom=cut_bottom,cut_right=cut_right,cut_diagonal=cut_diagonal,)ifshow_plot:self.__cohorts.heatmap(width=width,height=height)returnself.__cohorts
[docs]@time_performance(scope="stattests",event_name="helper",event_value="display_results",)defstattests(self,test:STATTEST_NAMES,groups:SplitExpr,func:Callable,group_names:UserGroupsNamesType,alpha:float=0.05,)->StatTests:""" Determine the statistical difference between the metric values in two user groups. Parameters ---------- See parameters' description :py:class:`.Stattests` Returns ------- StatTests A ``StatTest`` class instance fitted to the given parameters. """params={"test":test,"groups":groups,"func":func,"group_names":group_names,"alpha":alpha,}not_hash_values=["test"]collect_data_performance(scope="stattests",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__stattests=StatTests(eventstream=self)self.__stattests.fit(groups=groups,func=func,test=test,group_names=group_names,alpha=alpha)self.__stattests.display_results()returnself.__stattests
[docs]@time_performance(scope="timedelta_hist",event_name="helper",event_value="plot",)deftimedelta_hist(self,raw_events_only:bool=False,event_pair:list[str|Literal[EVENTSTREAM_GLOBAL_EVENTS]]|None=None,adjacent_events_only:bool=True,weight_col:str|None=None,time_agg:AGGREGATION_NAMES|None=None,timedelta_unit:DATETIME_UNITS="s",log_scale:bool|tuple[bool,bool]|None=None,lower_cutoff_quantile:float|None=None,upper_cutoff_quantile:float|None=None,bins:int|Literal[BINS_ESTIMATORS]=20,width:float=6.0,height:float=4.5,show_plot:bool=True,)->TimedeltaHist:""" Plot the distribution of the time deltas between two events. Support various distribution types, such as distribution of time for adjacent consecutive events, or for a pair of pre-defined events, or median transition time from event to event per user/session. Parameters ---------- show_plot : bool, default True If ``True``, histogram is shown. See other parameters' description :py:class:`.TimedeltaHist` Returns ------- TimedeltaHist A ``TimedeltaHist`` class instance fitted with given parameters. """params={"raw_events_only":raw_events_only,"event_pair":event_pair,"adjacent_events_only":adjacent_events_only,"weight_col":weight_col,"time_agg":time_agg,"timedelta_unit":timedelta_unit,"log_scale":log_scale,"lower_cutoff_quantile":lower_cutoff_quantile,"upper_cutoff_quantile":upper_cutoff_quantile,"bins":bins,"width":width,"height":height,"show_plot":show_plot,}not_hash_values=["time_agg","timedelta_unit"]collect_data_performance(scope="timedelta_hist",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__timedelta_hist=TimedeltaHist(eventstream=self,)self.__timedelta_hist.fit(raw_events_only=raw_events_only,event_pair=event_pair,adjacent_events_only=adjacent_events_only,time_agg=time_agg,weight_col=weight_col,timedelta_unit=timedelta_unit,log_scale=log_scale,lower_cutoff_quantile=lower_cutoff_quantile,upper_cutoff_quantile=upper_cutoff_quantile,bins=bins,)ifshow_plot:self.__timedelta_hist.plot(width=width,height=height,)returnself.__timedelta_hist
[docs]@time_performance(scope="user_lifetime_hist",event_name="helper",event_value="plot",)defuser_lifetime_hist(self,timedelta_unit:DATETIME_UNITS="s",log_scale:bool|tuple[bool,bool]|None=None,lower_cutoff_quantile:float|None=None,upper_cutoff_quantile:float|None=None,bins:int|Literal[BINS_ESTIMATORS]=20,width:float=6.0,height:float=4.5,show_plot:bool=True,)->UserLifetimeHist:""" Plot the distribution of user lifetimes. A ``users lifetime`` is the timedelta between the first and the last events of the user. Parameters ---------- show_plot : bool, default True If ``True``, histogram is shown. See other parameters' description :py:class:`.UserLifetimeHist` Returns ------- UserLifetimeHist A ``UserLifetimeHist`` class instance with given parameters. """params={"timedelta_unit":timedelta_unit,"log_scale":log_scale,"lower_cutoff_quantile":lower_cutoff_quantile,"upper_cutoff_quantile":upper_cutoff_quantile,"bins":bins,"width":width,"height":height,"show_plot":show_plot,}not_hash_values=["timedelta_unit"]collect_data_performance(scope="user_lifetime_hist",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__user_lifetime_hist=UserLifetimeHist(eventstream=self,)self.__user_lifetime_hist.fit(timedelta_unit=timedelta_unit,log_scale=log_scale,lower_cutoff_quantile=lower_cutoff_quantile,upper_cutoff_quantile=upper_cutoff_quantile,bins=bins,)ifshow_plot:self.__user_lifetime_hist.plot(width=width,height=height)returnself.__user_lifetime_hist
[docs]@time_performance(scope="event_timestamp_hist",event_name="helper",event_value="plot",)defevent_timestamp_hist(self,event_list:list[str]|None=None,raw_events_only:bool=False,lower_cutoff_quantile:float|None=None,upper_cutoff_quantile:float|None=None,bins:int|Literal[BINS_ESTIMATORS]=20,width:float=6.0,height:float=4.5,show_plot:bool=True,)->EventTimestampHist:""" Plot distribution of events over time. Can be useful for detecting time-based anomalies, and visualising general timespan of the eventstream. Parameters ---------- show_plot : bool, default True If ``True``, histogram is shown. See other parameters' description :py:class:`.EventTimestampHist` Returns ------- EventTimestampHist A ``EventTimestampHist`` class instance with given parameters. """params={"event_list":event_list,"raw_events_only":raw_events_only,"lower_cutoff_quantile":lower_cutoff_quantile,"upper_cutoff_quantile":upper_cutoff_quantile,"bins":bins,"width":width,"height":height,"show_plot":show_plot,}collect_data_performance(scope="event_timestamp_hist",event_name="metadata",called_params=params,performance_data={},eventstream_index=self._eventstream_index,)self.__event_timestamp_hist=EventTimestampHist(eventstream=self,)self.__event_timestamp_hist.fit(event_list=event_list,raw_events_only=raw_events_only,lower_cutoff_quantile=lower_cutoff_quantile,upper_cutoff_quantile=upper_cutoff_quantile,bins=bins,)ifshow_plot:self.__event_timestamp_hist.plot(width=width,height=height)returnself.__event_timestamp_hist
[docs]@time_performance(scope="describe",event_name="helper",event_value="_values",)defdescribe(self,session_col:str="session_id",raw_events_only:bool=False)->pd.DataFrame:""" Display general eventstream information. If ``session_col`` is present in eventstream, also output session statistics. Parameters ---------- session_col : str, default 'session_id' Specify name of the session column. If the column is present in the eventstream, session statistics will be added to the output. raw_events_only : bool, default False If ``True`` - statistics will only be shown for raw events. If ``False`` - statistics will be shown for all events presented in your data. Returns ------- pd.DataFrame A dataframe containing descriptive statistics for the eventstream. See Also -------- .EventTimestampHist : Plot the distribution of events over time. .TimedeltaHist : Plot the distribution of the time deltas between two events. .UserLifetimeHist : Plot the distribution of user lifetimes. .Eventstream.describe_events : Show general eventstream events statistics. Notes ----- - All ``float`` values are rounded to 2. - All ``datetime`` values are rounded to seconds. See :ref:`Eventstream user guide<eventstream_describe>` for the details. """params={"session_col":session_col,"raw_events_only":raw_events_only,}collect_data_performance(scope="describe",event_name="metadata",called_params=params,performance_data={},eventstream_index=self._eventstream_index,)describer=_Describe(eventstream=self,session_col=session_col,raw_events_only=raw_events_only)returndescriber._values()
[docs]@time_performance(scope="describe_events",event_name="helper",event_value="_values",)defdescribe_events(self,session_col:str="session_id",raw_events_only:bool=False,event_list:list[str]|None=None)->pd.DataFrame:""" Display general information on eventstream events. If ``session_col`` is present in eventstream, also output session statistics. Parameters ---------- session_col : str, default 'session_id' Specify name of the session column. If the column is present in the eventstream, output session statistics. raw_events_only : bool, default False If ``True`` - statistics will only be shown for raw events. If ``False`` - statistics will be shown for all events presented in your data. event_list : list of str, optional Specify events to be displayed. Returns ------- pd.DataFrame **Eventstream statistics**: - The following metrics are calculated for each event present in the eventstream (or the narrowed eventstream if parameters ``event_list`` or ``raw_events_only`` are used). Let all_events, all_users, all_sessions be the numbers of all events, users, and sessions present in the eventstream. Then: - *number_of_occurrences* - the number of occurrences of a particular event in the eventstream; - *unique_users* - the number of unique users who experienced a particular event; - *unique_sessions* - the number of unique sessions with each event; - *number_of_occurrences_shared* - number_of_occurrences / all_events (raw_events_only, if this parameter = ``True``); - *unique_users_shared* - unique_users / all_users; - *unique_sessions_shared* - unique_sessions / all_sessions; - **time_to_FO_user_wise** category - timedelta between ``path_start`` and the first occurrence (FO) of a specified event in each user path. - **steps_to_FO_user_wise** category - the number of steps (events) from ``path_start`` to the first occurrence (FO) of a specified event in each user path. If ``raw_events_only=True`` only raw events will be counted. - **time_to_FO_session_wise** category - timedelta between ``session_start`` and the first occurrence (FO) of a specified event in each session. - **steps_to_FO_session_wise** category - the number of steps (events) from ``session_start`` to the first occurrence (FO) of a specified event in each session. If ``raw_events_only=True`` only raw events will be counted. Agg functions for each ``first_occurrence*`` category are: mean, std, median, min, max. See Also -------- .EventTimestampHist : Plot the distribution of events over time. .TimedeltaHist : Plot the distribution of the time deltas between two events. .UserLifetimeHist : Plot the distribution of user lifetimes. .Eventstream.describe : Show general eventstream statistics. Notes ----- - All ``float`` values are rounded to 2. - All ``datetime`` values are rounded to seconds. See :ref:`Eventstream user guide<eventstream_describe_events>` for the details. """params={"session_col":session_col,"raw_events_only":raw_events_only,"event_list":event_list,}collect_data_performance(scope="describe_events",event_name="metadata",called_params=params,performance_data={},eventstream_index=self._eventstream_index,)describer=_DescribeEvents(eventstream=self,session_col=session_col,event_list=event_list,raw_events_only=raw_events_only)returndescriber._values()
[docs]@time_performance(scope="transition_graph",event_name="helper",event_value="plot",)deftransition_graph(self,edges_norm_type:NormType=None,nodes_norm_type:NormType=None,targets:TargetToNodesMap|None=None,nodes_threshold:Threshold|None=None,edges_threshold:Threshold|None=None,nodes_weight_col:str|None=None,edges_weight_col:str|None=None,custom_weight_cols:list[str]|None=None,width:str|int|float|None=None,height:str|int|float|None=None,show_weights:bool|None=None,show_percents:bool|None=None,show_nodes_names:bool|None=None,show_all_edges_for_targets:bool|None=None,show_nodes_without_links:bool|None=None,show_edge_info_on_hover:bool|None=None,open_sidebar_by_default:bool|None=None,nodes_custom_colors:Dict[str,str]|None=None,edges_custom_colors:Dict[Tuple[str,str],str]|None=None,nodelist:Nodelist|pd.DataFrame|None=None,layout_dump:str|None=None,import_file:str|None=None,)->TransitionGraph:""" Parameters ---------- See parameters' description :py:meth:`.TransitionGraph.plot` @TODO: maybe load docs with docrep? 2dpanina, Vladimir Makhanov Returns ------- TransitionGraph Rendered IFrame graph. """collect_data_performance(scope="transition_graph",event_name="metadata",called_params={"edges_norm_type":edges_norm_type,"nodes_norm_type":nodes_norm_type,"targets":targets,"nodes_threshold":nodes_threshold,"edges_threshold":edges_threshold,"nodes_weight_col":nodes_weight_col,"edges_weight_col":edges_weight_col,"custom_weight_cols":custom_weight_cols,"width":width,"height":height,"show_weights":show_weights,"show_percents":show_percents,"show_nodes_names":show_nodes_names,"show_all_edges_for_targets":show_all_edges_for_targets,"show_nodes_without_links":show_nodes_without_links,"show_edge_info_on_hover":show_edge_info_on_hover,"open_sidebar_by_default":open_sidebar_by_default,"nodes_custom_colors":nodes_custom_colors,"edges_custom_colors":edges_custom_colors,"nodelist":nodelist,"layout_dump":layout_dump,"import_file":import_file,},not_hash_values=["edges_norm_type","targets","width","height","edges_custom_colors"],performance_data={},eventstream_index=self._eventstream_index,)self.__transition_graph=TransitionGraph(eventstream=self)self.__transition_graph.plot(targets=targets,edges_norm_type=edges_norm_type,nodes_norm_type=nodes_norm_type,edges_weight_col=edges_weight_col,nodes_threshold=nodes_threshold,edges_threshold=edges_threshold,nodes_weight_col=nodes_weight_col,custom_weight_cols=custom_weight_cols,width=width,height=height,show_weights=show_weights,show_percents=show_percents,show_nodes_names=show_nodes_names,show_all_edges_for_targets=show_all_edges_for_targets,show_nodes_without_links=show_nodes_without_links,show_edge_info_on_hover=show_edge_info_on_hover,open_sidebar_by_default=open_sidebar_by_default,nodes_custom_colors=nodes_custom_colors,edges_custom_colors=edges_custom_colors,nodelist=nodelist,layout_dump=layout_dump,import_file=import_file,)returnself.__transition_graph
[docs]@time_performance(scope="preprocessing_graph",event_name="helper",event_value="display",)defpreprocessing_graph(self,width:int=960,height:int=600)->PreprocessingGraph:""" Display the preprocessing GUI tool. Parameters ---------- width : int, default 960 Width of plot in pixels. height : int, default 600 Height of plot in pixels. Returns ------- PreprocessingGraph Rendered preprocessing graph. """params={"width":width,"height":height,}collect_data_performance(scope="preprocessing_graph",event_name="metadata",called_params=params,performance_data={},eventstream_index=self._eventstream_index,)ifself._preprocessing_graphisNone:self._preprocessing_graph=PreprocessingGraph(source_stream=self)self._preprocessing_graph.display(width=width,height=height)returnself._preprocessing_graph
[docs]@time_performance(scope="transition_matrix",event_name="helper",event_value="plot",)deftransition_matrix(self,norm_type:NormType=None,weight_col:Optional[str]=None,groups:SplitExpr|None=None,heatmap_axis:Union[Literal["rows","columns","both"],int]="both",precision:Union[int,Literal["auto"]]="auto",figsize:Optional[Tuple[Union[float,int],Union[float,int]]]=None,show_large_matrix:Optional[bool]=None,show_values:Optional[bool]=None,show_plot:bool=True,)->TransitionMatrix:""" Retrieve a matrix of transition weights for each pair of unique events. This function calculates transition weights based on the same logic used for calculating edge weights in a transition graph. Parameters ---------- norm_type : {"full", "node", None}, default None Type of normalization that is used to calculate weights. Based on ``weight_col`` parameter the weight values are calculated. - If ``None``, normalization is not used, the absolute values are taken. - If ``full``, normalization across the whole eventstream. - If ``node``, normalization across each node (or outgoing transitions from each node). See :ref:`Transition graph user guide <transition_graph_weights>` for the details. weight_col : str, default 'user_id' A column name from the :py:class:`.EventstreamSchema` which values will control the final edges' weights. For each edge is calculated: - If ``None`` or ``user_id`` - the number of unique users. - If ``event_id`` - the number of transitions. - If ``session_id`` - the number of unique sessions. - If ``custom_col`` - the number of unique values in selected column. See :ref:`Transition graph user guide <transition_graph_weights>` for the details. groups : tuple[list, list], optional Can be specified to calculate differential transition matrix. Must contain a tuple of two elements (g_1, g_2): where g_1 and g_2 are collections of user_id`s. Two separate transition matrices M1 and M2 will be calculated for users from g_1 and g_2, respectively. Resulting matrix will be the matrix M = M1 - M2. heatmap_axis : {0 or 'rows', 1 or 'columns', 'both'}, default 'both' The axis for which the heatmap is to be generated. If specified, the heatmap will be created separately for the selected axis. If ``heatmap_axis='both'``, the heatmap will be applied to the entire matrix. precision : int or str, default 'auto' The number of decimal digits to display after zero as fractions in the heatmap. If precision is ``auto``, the value will depend on the ``norm_type``: 0 for ``norm_type=None``, and 2 otherwise. figsize : tuple[float, float], default None The size of the visualization. The default size is calculated automatically depending on the matrix dimension and `precision` and `show_values` options. show_large_matrix : bool, optional If ``None`` the matrix is displayed only in case the matrix dimension <= 60. If ``True``, the matrix is plotted explicitly. show_values : bool, optional If ``None`` the matrix values are not displayed only in case the matrix dimension lies between 30 and 60. If ``True``, the matrix values are shown explicitly. If ``False``, the values are hidden, ``precision`` parameter is ignored in this case. show_plot : bool, default True If ``True``, a heatmap of the transition matrix will be displayed. Returns ------- TransitionMatrix A `TransitionMatrix` instance fitted to the given parameters is returned. """not_hash_values=["norm_type","heatmap_axis","precision","figsize","show_large_matrix","show_values"]params={"weight_col":weight_col,"norm_type":norm_type}collect_data_performance(scope="transition_matrix",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__transition_matrix=TransitionMatrix(eventstream=self)self.__transition_matrix.fit(weight_col=weight_col,norm_type=norm_type,groups=groups)ifshow_plot:self.__transition_matrix.plot(heatmap_axis=heatmap_axis,precision=precision,figsize=figsize,show_large_matrix=show_large_matrix,show_values=show_values,)returnself.__transition_matrix
[docs]@time_performance(scope="sequences",event_name="helper",event_value="plot",)defsequences(self,ngram_range:Tuple[int,int]=(1,1),groups:SplitExpr|None=None,group_names:UserGroupsNamesType|None=None,weight_col:str|None=None,metrics:SEQUENCES_METRIC_TYPES|None=None,threshold:tuple[str,float|int]|None=None,sorting:tuple[str|tuple,bool]|tuple[list[str|tuple],list[bool]]|None=None,heatmap_cols:str|list[str|tuple]|None=None,sample_size:int|None=1,precision:int=2,show_plot:bool=True,)->Sequences:""" Calculate statistics on n-grams found in eventstream. Parameters ---------- show_plot : bool, default True If ``True``, a sankey diagram is shown. See other parameters' description :py:class:`.Sequences` Returns ------- Sequences A ``Sequences`` class instance fitted to the given parameters. """params={"ngram_range":ngram_range,"groups":groups,"group_names":group_names,"weight_col":weight_col,"metrics":metrics,"threshold":threshold,"sorting":sorting,"heatmap_cols":heatmap_cols,"sample_size":sample_size,"precision":precision,"show_plot":show_plot,}not_hash_values=["metrics","ngram_range"]collect_data_performance(scope="sequences",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__sequences=Sequences(eventstream=self)self.__sequences.fit(ngram_range=ngram_range,groups=groups,group_names=group_names,path_id_col=weight_col)styler_sequences=self.__sequences.plot(metrics=metrics,threshold=threshold,sorting=sorting,heatmap_cols=heatmap_cols,sample_size=sample_size,precision=precision,)ifshow_plot:display(styler_sequences)returnself.__sequences
[docs]@time_performance(scope="segment_map",event_name="helper",event_value="fit",)defsegment_map(self,name:str|None,index:Literal["path_id","segment_value"]="path_id",resolve_collision:Optional[Literal["majority","last"]]=None,)->pd.DataFrame|pd.Series:""" Return a mapping between segment values and paths. Works with static or roughly-static segments. Parameters ---------- name : str, optional A name of the segment. If ``None`` mapping is returned for all segments; works only for ``index="path_id"``. index : {"path_id", "segment_value"}, default "path_id". The index of the resulting Series or DataFrame. If ``path_id``, the index is path_id, and the values are the correspondingsegment values. If ``segment_value``, the index is segment values, and the values are lists of path_ids associated with the segment value. Returns ------- pd.Series If ``name`` is defined. pd.DataFrame If ``name=None`` and ``index="path_id"``. """params={"name":name,"index":index,"resolve_collision":resolve_collision}collect_data_performance(scope="segment_map",event_name="metadata",called_params=params,not_hash_values=None,performance_data={},eventstream_index=self._eventstream_index,)returnget_segment_path_map(self,name=name,index=index,resolve_collision=resolve_collision)
[docs]@time_performance(scope="extract_features",event_name="helper",event_value="fit",)defextract_features(self,feature_type:FEATURE_TYPES,ngram_range:Tuple[int,int]=(1,1),path_id_col:str|None=None,col_suffix:str|None=None,)->pd.DataFrame:""" Calculate set of features for each path. Parameters ---------- feature_type : {"tfidf", "count", "frequency", "binary", "markov", "time", "time_fraction"} Algorithms for converting event sequences to feature vectors: - ``tfidf`` see details in :sklearn_tfidf:`sklearn documentation<>`. - ``count`` see details in :sklearn_countvec:`sklearn documentation<>`. - ``frequency`` is similar to count, but normalized to the total number of the events in the user's trajectory. - ``binary`` 1 if a user had the given n-gram at least once and 0 otherwise. - ``markov`` available for bigrams only. For a given bigram ``(A, B)`` the vectorized values are the user's transition probabilities from ``A`` to ``B``. - ``time`` associated with unigrams only. The total amount of time (in seconds) spent on a given event. - ``time_fraction`` the same as ``time`` but divided by the path duration (in seconds). ngram_range : Tuple(int, int), default (1, 1) The lower and upper boundary of the range of n for n-grams to be extracted. For example, ngram_range=(1, 1) means only single events, (1, 2) means single events and bigrams. Ignored for ``markov``, ``time``, ``time_fraction`` feature types. path_id_col : str, optional A column name associated with a path identifier. A default value is linked to the user column from eventstream schema. col_suffix : str, optional A suffix added to the feature names. Returns ------- pd.DataFrame A DataFrame with the vectorized values. The index consists of path ids, the columns relate to the n-grams. """params={"feature_type":feature_type,"ngram_range":ngram_range,"path_id_col":path_id_col,"col_suffix":col_suffix,}collect_data_performance(scope="extract_features",event_name="metadata",called_params=params,not_hash_values=None,performance_data={},eventstream_index=self._eventstream_index,)pf=Vectorizer(self)features=pf.extract_features(feature_type,ngram_range,path_id_col,col_suffix)returnfeatures
[docs]@time_performance(scope="clusters_overview",event_name="helper",event_value="plot",)defclusters_overview(self,segment_name:str,features:pd.DataFrame,aggfunc:Callable|str="mean",scaler:Literal["minmax","std"]="minmax",metrics:List[Tuple[Union[str,Callable,pd.NamedAgg],Union[str,Callable],str]]|None=None,axis:int=1,show_plot:bool=True,)->SegmentOverview:""" Show a heatmap table with aggregated values of features and custom metrics by clusters. Parameters ---------- segment_name : str A name of the segment containing cluster labels. features : pd.DataFrame A DataFrame with features to be aggregated. The DataFrame's index should be path ids. aggfunc : callable or str, default "mean" A function to aggregate the features. If a string is passed, it should be a valid function name for the DataFrame's ``agg`` method, see :pandas_aggfunc:`pandas documentation<>`. for the details. scaler : {"minmax", "std"}, default "minmax" A scaler to normalize the features before the aggregation. Available scalers: - ``minmax``: MinMaxScaler. - ``std``: StandardScaler. metrics : list of tuples, optional A list of tuples with custom metrics. Each tuple should contain three elements: - a function to calculate the metric, see :py:meth:`Eventstream.path_metrics()<retentioneering.eventstream.eventstream.Eventstream.path_metrics>` for the details; - an aggregation metric to be applied to the metric values, same as ``aggfunc``; - a metric label to be displayed in the resulting table. axis : {0, 1}, default 1 The axis for which the heatmap is to be generated. - 1 : for row-wise heatmap, - 0 : for column-wise heatmap. Custom metrics coloring is ignored in this case. show_plot : bool, default True If ``True``, a heatmap is shown. Returns ------- SegmentOverview A ``SegmentOverview`` class instance fitted with given parameters. """params={"segment_name":segment_name,"aggfunc":aggfunc,"scaler":scaler,"axis":axis,"show_plot":show_plot,}not_hash_values=["features","metrics"]collect_data_performance(scope="clusters_overview",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__segment_overview=SegmentOverview(eventstream=self)self.__segment_overview.fit_clusters_overview(segment_name,features,aggfunc,scaler,metrics,axis)ifshow_plot:fig=self.__segment_overview.plot_heatmap()fig.show()returnself.__segment_overview
[docs]@time_performance(scope="segment_overview",event_name="helper",event_value="plot",)defsegment_overview(self,segment_name:str,metrics:List[Tuple[Union[str,Callable,pd.NamedAgg],Union[str,Callable],str]]|None=None,kind:SEGMENT_METRICS_OVERVIEW_KINDS="heatmap",axis:int=0,show_plot:bool=True,)->SegmentOverview:""" Show a visualization with aggregated values of custom metrics by segments. segment_name : str A name of the segment. metrics : list of tuples, optional A list of tuples with custom metrics. Each tuple should contain three elements: - a function to calculate the metric, see :py:meth:`Eventstream.path_metrics()<retentioneering.eventstream.eventstream.Eventstream.path_metrics>` for the details; - an aggregation metric to be applied to the metric values; - a metric label to be displayed in the resulting table. kind : {"heatmap", "bar"}, default="heatmap" Visualization option. axis : {0, 1}, default 0 The axis for which the heatmap is to be generated. - 0 : for row-wise heatmap. - 1 : for column-wise heatmap, Returns ------- SegmentOverview A ``SegmentOverview`` class instance fitted with given parameters. """params={"segment_name":segment_name,"kind":kind,"axis":axis}not_hash_values=["metrics"]collect_data_performance(scope="segment_overview",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__segment_overview=SegmentOverview(eventstream=self)cm=self.__segment_overview.fit_segment_overview(segment_name,metrics)ifshow_plot:fig=self.__segment_overview.plot_custom_metrics(axis,kind)fig.show()returnself.__segment_overview
[docs]@time_performance(scope="segment_diff",event_name="helper",event_value="plot",)defsegment_diff(self,segment_items:SplitExpr,features:pd.DataFrame,aggfunc:Callable=np.mean,threshold:float=0.01,top_n:int|None=None,show_plot:bool=True,)->SegmentDiff:""" Show a table with the difference between a pair of segment items. The rows relate to the features. Wasserstein distance is used to calculate the difference between the feature distributions of the pair segment items. segment_items : list A list with segment values to be compared. features : pd.DataFrame A DataFrame with features to be aggregated and compared between selected segment items. aggfunc : callable, default np.mean A function to aggregate the features. If a string is passed, it should be a valid function name for the DataFrame's ``agg`` method, see :pandas_aggfunc:`pandas documentation<>`. for the details. threshold : float, default 0.01 A threshold to filter out the features with a small difference between the segment items. top_n : int, optional A number of top features to be displayed. show_plot : bool, default True If ``True``, a table with the difference is shown. Returns ------- SegmentDiff A ``SegmentDiff`` class instance fitted with given parameters. """params={"threshold":threshold,"top_n":top_n,"show_plot":show_plot}not_hash_values=["segment_items","features","aggfunc"]collect_data_performance(scope="segment_diff",event_name="metadata",called_params=params,not_hash_values=not_hash_values,performance_data={},eventstream_index=self._eventstream_index,)self.__segment_diff=SegmentDiff(self)diff=self.__segment_diff.fit(features,segment_items,aggfunc,threshold,top_n)ifshow_plot:display(HTML(diff.to_html(escape=False)))returnself.__segment_diff
[docs]@time_performance(scope="projection",event_name="helper",event_value="plot",)defprojection(self,features:pd.DataFrame,method:Literal["tsne","umap"]="tsne",segments:List[str]|None=None,sample_size:int|None=None,random_state:int|None=None,show_plot:bool=True,**kwargs:Any,)->SegmentProjection:""" Project a dataset to a 2D space using a manifold transformation. Parameters ---------- features: pandas.DataFrame A dataset to be projected. The index should be path ids. method : {"umap", "tsne"}, default "tsne" Type of manifold transformation. See :sklearn_tsne:`sklearn.manifold.TSNE()<>` and :umap:`umap.UMAP()<>` for the details. sample_size : int, optional, default=1000 The number of elements to sample. random_state : int, optional Use an int number to make the randomness deterministic. Calling the method multiple times with the same ``random_state`` yields the same results. **kwargs : optional Additional parameters for :sklearn_tsne:`sklearn.manifold.TSNE()<>` and :umap:`umap.UMAP()<>`. Returns ------- SegmentProjection A ``SegmentProjection`` class instance fitted with given parameters. """params={"method":method,"sample_size":sample_size,"random_state":random_state}collect_data_performance(scope="projection",event_name="metadata",called_params=params,not_hash_values=["features"],performance_data={},eventstream_index=self._eventstream_index,)pf=Vectorizer(self)projection_2d=pf.projection(features,method,sample_size,random_state,**kwargs)self.__segment_projection=SegmentProjection(self)self.__segment_projection.fit(projection_2d,segments)ifshow_plot:fig=self.__segment_projection.plot()fig.show()returnself.__segment_projection
[docs]@time_performance(scope="path_metrics",event_name="helper",event_value="fit",)defpath_metrics(self,metrics:Tuple[PATH_METRIC_TYPES,str]|List,path_id_col:str|None=None,)->pd.DataFrame|pd.Series:""" Calculate metrics for each path. Parameters ---------- metrics : tuple, or list A metric or a list of metrics to be calculated. Each metric can be defined with the following types. - str. The following metric aliases are supported. - ``len``: the number of events in the path. - ``has:TARGET_EVENT``: whether the path contains the specified target event. - ``time_to:TARGET_EVENT``: the time to the first occurrence of the specified target event. - pd.NamedAgg. It is applied to a single column of the grouped DataFrame. See :pandas_namedagg:`pandas documentation<>` for the details. - Callable. An arbitrary function to be applied to the grouped DataFrame with apply method. A metric should be passed as a tuple of two elements: - a metric definition, according to the mentioned types. - a metric name. Examples of the metrics: .. code:: python metrics = [ ('len', 'path_length'), ('has:cart', 'has_cart'), ('time_to:cart', 'time_to_cart'), (lambda _df: (_df['event'] == 'cart').sum(), 'cart_count'), (pd.NamedAgg('timestamp', lambda s: len(s.dt.date.unique())), 'active_days') ] path_id_col : str, optional A column name associated with a path identifier. A default value is linked to the user column from eventstream schema. Returns ------- pd.DataFrame or pd.Series A DataFrame (for multiple metrics) or Series (for a single metric) with the calculated metric values. The index consists of path ids. """params={"metrics":metrics,"path_id_col":path_id_col}collect_data_performance(scope="path_metrics",event_name="metadata",called_params=params,not_hash_values=None,performance_data={},eventstream_index=self._eventstream_index,)df=self.to_dataframe()event_col=self.schema.event_nametime_col=self.schema.event_timestampifpath_id_colisNone:path_id_col=self.schema.user_idmetric_list=metricsifisinstance(metrics,list)else[metrics]res=pd.DataFrame()formetric_iteminmetric_list:ifisinstance(metric_item,Tuple):# type: ignoremetric,metric_name=metric_itemelse:raiseTypeError(f"Metric is supposed to be a str or tuple. Got {type(metric_item)} instead.")ifisinstance(metric,str):# type: ignoreifmetric=="len":metric_values=df.groupby(path_id_col).size().rename("len")# type: ignoreelifmetric.startswith(METRIC_PREFIX_HAS):target_event=metric.split(METRIC_PREFIX_HAS)[1]metric_values=(df.assign(is_target_event=lambda_df:_df[event_col]==target_event).groupby(path_id_col)["is_target_event"].any())elifmetric.startswith(METRIC_PREFIX_TIME_TO_EVENT):target_event=metric.split(METRIC_PREFIX_TIME_TO_EVENT)[1]df=df.assign(is_target_event=lambda_df:_df[event_col]==target_event)first_timestamp=df.groupby(path_id_col)[time_col].min().rename("first_timestamp")target_timestamp=(df[df[event_col]==target_event].groupby(path_id_col)[time_col].min().rename("target_timestamp"))timestamps=first_timestamp.to_frame("first_timestamp").join(target_timestamp.to_frame("target_timestamp"),how="left")timestamps[metric_name]=timestamps["target_timestamp"]-timestamps["first_timestamp"]metric_values=timestamps[metric_name]else:raiseTypeError(f"Metric alias '{metric}' is not supported.")elifisinstance(metric,LambdaType)orisinstance(metric,FunctionType):metric_values=df.groupby(path_id_col).apply(metric,include_groups=False)elifisinstance(metric,pd.NamedAgg):metric_values=df.groupby(path_id_col).agg(**{metric_name:metric})# type: ignoreelse:raiseTypeError(f"A metric is supposed to be one of the following types: {get_args(PATH_METRIC_TYPES)}.")metric_values.name=metric_nameres=pd.concat([res,metric_values],axis=1)# return Series if a single metric was passedifnotisinstance(metrics,list):returnres[res.columns[0]]returnres