Source code for retentioneering.tooling.cohorts.cohorts
from__future__importannotationsfromtypingimportLiteral,Tupleimportmatplotlibimportmatplotlib.pyplotaspltimportnumpyasnpimportpandasaspdimportseabornassnsfromretentioneering.backend.trackerimport(collect_data_performance,time_performance,track,)fromretentioneering.constantsimportDATETIME_UNITS,DATETIME_UNITS_LISTfromretentioneering.eventstream.typesimportEventstreamType# @TODO Подумать над сокращением списка поддерживаемых типов для когорт? dpanina
[docs]classCohorts:""" A class that provides methods for cohort analysis. The users are split into groups depending on the time of their first appearance in the eventstream; thus each user is associated with some ``cohort_group``. Retention rates of the active users belonging to each ``cohort_group`` are calculated within each ``cohort_period``. Parameters ---------- eventstream : EventstreamType See Also -------- .Eventstream.cohorts : Call Cohorts tool as an eventstream method. .EventTimestampHist : Plot the distribution of events over time. .UserLifetimeHist : Plot the distribution of user lifetimes. Notes ----- See :doc:`Cohorts user guide</user_guides/cohorts>` for the details. """__eventstream:EventstreamTypecohort_start_unit:DATETIME_UNITScohort_period:intcohort_period_unit:DATETIME_UNITSaverage:boolcut_bottom:intcut_right:intcut_diagonal:int_cohort_matrix_result:pd.DataFrame@time_performance(scope="cohorts",event_name="init",)def__init__(self,eventstream:EventstreamType):self.__eventstream=eventstreamself.user_col=self.__eventstream.schema.user_idself.event_col=self.__eventstream.schema.event_nameself.time_col=self.__eventstream.schema.event_timestampself._cohort_matrix_result=pd.DataFrame()def_add_min_date(self,data:pd.DataFrame,cohort_start_unit:DATETIME_UNITS,cohort_period:int,cohort_period_unit:DATETIME_UNITS,)->pd.DataFrame:freq=cohort_start_unitdata["user_min_date_gr"]=data.groupby(self.user_col)[self.time_col].transform(min)min_cohort_date=data["user_min_date_gr"].min().to_period(freq).start_timemax_cohort_date=data["user_min_date_gr"].max()ifDATETIME_UNITS_LIST.index(cohort_start_unit)<DATETIME_UNITS_LIST.index(cohort_period_unit):freq=cohort_period_unitiffreq=="W":freq="D"data["user_min_date_gr"]=data["user_min_date_gr"].dt.to_period(freq)step=np.timedelta64(cohort_period,cohort_period_unit)start_point=np.datetime64(min_cohort_date,freq)end_point=np.datetime64(max_cohort_date,freq)+np.timedelta64(cohort_period,cohort_period_unit)coh_groups_start_dates=np.arange(start_point,end_point,step)coh_groups_start_dates=pd.to_datetime(coh_groups_start_dates).to_period(freq)ifmax_cohort_date<coh_groups_start_dates[-1].start_time:# type: ignorecoh_groups_start_dates=coh_groups_start_dates[:-1]# type: ignorecohorts_list=pd.DataFrame(data=coh_groups_start_dates,index=None,columns=["CohortGroup"]# type: ignore).reset_index()cohorts_list.columns=["CohortGroupNum","CohortGroup"]# type: ignorecohorts_list["CohortGroupNum"]+=1data["OrderPeriod"]=data[self.time_col].dt.to_period(freq)start_int=pd.Series(min_cohort_date.to_period(freq=freq)).astype(int)[0]converter_freq=np.timedelta64(cohort_period,cohort_period_unit)converter_freq_=converter_freq.astype(f"timedelta64[{freq}]").astype(int)data["CohortGroupNum"]=(data["user_min_date_gr"].astype(int)-start_int+converter_freq_)//converter_freq_data=data.merge(cohorts_list,on="CohortGroupNum",how="left")data["CohortPeriod"]=((data["OrderPeriod"].astype(int)-(data["CohortGroup"].astype(int)+converter_freq_))# type: ignore//converter_freq_)+1returndata@staticmethoddef_cut_cohort_matrix(df:pd.DataFrame,cut_bottom:int=0,cut_right:int=0,cut_diagonal:int=0)->pd.DataFrame:forrowindf.index:df.loc[row,max(0,df.loc[row].notna()[::-1].idxmax()+1-cut_diagonal):]=None# type: ignorereturndf.iloc[:len(df)-cut_bottom,:len(df.columns)-cut_right]
[docs]@time_performance(scope="cohorts",event_name="fit",)deffit(self,cohort_start_unit:DATETIME_UNITS,cohort_period:Tuple[int,DATETIME_UNITS],average:bool=True,cut_bottom:int=0,cut_right:int=0,cut_diagonal:int=0,)->None:""" Calculates the cohort internal values with the defined parameters. Applying ``fit`` method is necessary for the following usage of any visualization or descriptive ``Cohorts`` methods. Parameters ---------- cohort_start_unit : :numpy_link:`DATETIME_UNITS<>` The way of rounding and formatting of the moment from which the cohort count begins. The minimum timestamp is rounded down to the selected datetime unit. For example: assume we have an eventstream with the following minimum timestamp - "2021-12-28 09:08:34.432456". The result of roundings with different ``DATETIME_UNITS`` is shown in the table below: +------------------------+-------------------------+ | **cohort_start_unit** | **cohort_start_moment** | +------------------------+-------------------------+ | Y | 2021-01-01 00:00:00 | +------------------------+-------------------------+ | M | 2021-12-01 00:00:00 | +------------------------+-------------------------+ | W | 2021-12-27 00:00:00 | +------------------------+-------------------------+ | D | 2021-08-28 00:00:00 | +------------------------+-------------------------+ cohort_period : Tuple(int, :numpy_link:`DATETIME_UNITS<>`) The cohort_period size and its ``DATETIME_UNIT``. This parameter is used in calculating: - Start moments for each cohort from the moment specified with the ``cohort_start_unit`` parameter - Cohort periods for each cohort from its start moment. average : bool, default True - If ``True`` - calculating average for each cohort period. - If ``False`` - averaged values aren't calculated. cut_bottom : int, default 0 Drop 'n' rows from the bottom of the cohort matrix. Average is recalculated. cut_right : int, default 0 Drop 'n' columns from the right side of the cohort matrix. Average is recalculated. cut_diagonal : int, default 0 Replace values in 'n' diagonals (last period-group cells) with ``np.nan``. Average is recalculated. Notes ----- Parameters ``cohort_start_unit`` and ``cohort_period`` should be consistent. Due to "Y" and "M" being non-fixed types, it can be used only with each other or if ``cohort_period_unit`` is more detailed than ``cohort_start_unit``. More information - :numpy_timedelta_link:`about numpy timedelta<>` Only cohorts with at least 1 user in some period are shown. See :doc:`Cohorts user guide</user_guides/cohorts>` for the details. """called_params={"cohort_start_unit":cohort_start_unit,"cohort_period":cohort_period,"average":average,"cut_bottom":cut_bottom,"cut_right":cut_right,"cut_diagonal":cut_diagonal,}not_hash_values=["cohort_start_unit","cohort_period"]data=self.__eventstream.to_dataframe()self.average=averageself.cohort_start_unit=cohort_start_unitself.cohort_period,self.cohort_period_unit=cohort_periodself.cut_bottom=cut_bottomself.cut_right=cut_rightself.cut_diagonal=cut_diagonalifself.cohort_period<=0:raiseValueError("cohort_period should be positive integer!")# @TODO добавить ссылку на numpy с объяснением. dpaninaifself.cohort_period_unitin["Y","M"]andself.cohort_start_unitnotin["Y","M"]:raiseValueError("""Parameters ``cohort_start_unit`` and ``cohort_period`` should be consistent. Due to "Y" and "M" are non-fixed types it can be used only with each other or if ``cohort_period_unit`` is more detailed than ``cohort_start_unit``!""")df=self._add_min_date(data=data,cohort_start_unit=self.cohort_start_unit,cohort_period=self.cohort_period,cohort_period_unit=self.cohort_period_unit,)cohorts=df.groupby(["CohortGroup","CohortPeriod"])[[self.user_col]].nunique()cohorts.reset_index(inplace=True)cohorts.rename(columns={self.user_col:"TotalUsers"},inplace=True)cohorts.set_index(["CohortGroup","CohortPeriod"],inplace=True)cohort_group_size=cohorts["TotalUsers"].groupby(level=0).first()cohorts.reset_index(inplace=True)user_retention=(cohorts.pivot(index="CohortPeriod",columns="CohortGroup",values="TotalUsers").divide(cohort_group_size,axis=1)).Tuser_retention=self._cut_cohort_matrix(df=user_retention,cut_diagonal=self.cut_diagonal,cut_bottom=self.cut_bottom,cut_right=self.cut_right)user_retention.index=user_retention.index.astype(str)ifself.average:user_retention.loc["Average"]=user_retention.mean()self._cohort_matrix_result=user_retentioncollect_data_performance(scope="cohorts",event_name="metadata",called_params=called_params,not_hash_values=not_hash_values,performance_data={"shape":self._cohort_matrix_result.shape},eventstream_index=self.__eventstream._eventstream_index,)
[docs]@time_performance(scope="cohorts",event_name="heatmap",)defheatmap(self,width:float=5.0,height:float=5.0)->matplotlib.axes.Axes:""" Builds a heatmap based on the calculated cohort matrix values. Should be used after :py:func:`fit`. Parameters ---------- width : float, default 5.0 Width of the figure in inches. height : float, default 5.0 Height of the figure in inches. Returns ------- matplotlib.axes.Axes """called_params={"width":width,"height":height,}df=self._cohort_matrix_resultfigsize=(width,height)figure,ax=plt.subplots(figsize=figsize)sns.heatmap(df,annot=True,fmt=".1%",linewidths=1,linecolor="gray",ax=ax)collect_data_performance(scope="cohorts",event_name="metadata",called_params=called_params,performance_data={"shape":self._cohort_matrix_result.shape},eventstream_index=self.__eventstream._eventstream_index,)returnax
[docs]@time_performance(scope="cohorts",event_name="lineplot",)deflineplot(self,plot_type:Literal["cohorts","average","all"]="cohorts",width:float=7.0,height:float=5.0)->matplotlib.axes.Axes:""" Create a chart representing each cohort dynamics over time. Should be used after :py:func:`fit`. Parameters ---------- plot_type: 'cohorts', 'average' or 'all' - if ``cohorts`` - shows a lineplot for each cohort, - if ``average`` - shows a lineplot only for the average values over all the cohorts, - if ``all`` - shows a lineplot for each cohort and also for their average values. width : float, default 7.0 Width of the figure in inches. height : float, default 5.0 Height of the figure in inches. Returns ------- matplotlib.axes.Axes """ifplot_typenotin["cohorts","average","all"]:raiseValueError("plot_type parameter should be 'cohorts', 'average' or 'all'!")called_params={"plot_type":plot_type,"width":width,"height":height,}not_hash_values=["plot_type"]figsize=(width,height)df_matrix=self._cohort_matrix_resultdf_wo_average=df_matrix[df_matrix.index!="Average"]# type: ignoreifplot_typein["all","average"]and"Average"notindf_matrix.index:# type: ignoredf_matrix.loc["Average"]=df_matrix.mean()# type: ignoredf_average=df_matrix[df_matrix.index=="Average"]# type: ignorefigure,ax=plt.subplots(figsize=figsize)ifplot_type=="all":sns.lineplot(df_wo_average.T,lw=1.5,ax=ax)sns.lineplot(df_average.T,lw=2.5,palette=["red"],marker="X",markersize=8,alpha=0.6,ax=ax)ifplot_type=="average":sns.lineplot(df_average.T,lw=2.5,palette=["red"],marker="X",markersize=8,alpha=0.6,ax=ax)ifplot_type=="cohorts":sns.lineplot(df_wo_average.T,lw=1.5,ax=ax)ax.legend(loc="upper left",bbox_to_anchor=(1,1))ax.set_xlabel("Period from the start of observation")ax.set_ylabel("Share of active users")collect_data_performance(scope="cohorts",event_name="metadata",called_params=called_params,not_hash_values=not_hash_values,performance_data={"shape":self._cohort_matrix_result.shape},eventstream_index=self.__eventstream._eventstream_index,)returnax
@property@time_performance(scope="cohorts",event_name="values",)defvalues(self)->pd.DataFrame:""" Returns a pd.DataFrame representing the calculated cohort matrix values. Should be used after :py:func:`fit`. Returns ------- pd.DataFrame """returnself._cohort_matrix_result@property@time_performance(# type: ignorescope="cohorts",event_name="params",)defparams(self)->dict[str,DATETIME_UNITS|tuple|bool|int|None]:""" Returns the parameters used for the last fitting. Should be used after :py:func:`fit`. """return{"cohort_start_unit":self.cohort_start_unit,"cohort_period":(self.cohort_period,self.cohort_period_unit),"average":self.average,"cut_bottom":self.cut_bottom,"cut_right":self.cut_right,"cut_diagonal":self.cut_diagonal,}