Source code for retentioneering.core.core_functions.step_matrix

# * Copyright (C) 2020 Maxim Godzi, Anatoly Zaytsev, Retentioneering Team
# * This Source Code Form is subject to the terms of the Retentioneering Software Non-Exclusive License (License)
# * By using, sharing or editing this code you agree with the License terms and conditions.
# * You can obtain License text at

import pandas as pd

from retentioneering.visualization import plot_step_matrix

[docs]def step_matrix(self, *, max_steps=20, weight_col=None, precision=2, targets=None, accumulated=None, sorting=None, thresh=0, centered=None, groups=None, show_plot=True): """ Plots heatmap with distribution of users over trajectory steps ordered by event name. Matrix rows are event names, columns are aligned user trajectory step numbers and the values are shares of users. A given entry X at column i and event j means at i'th step fraction of users X have specific event j. Parameters ---------- max_steps: int (optional, default 20) Maximum number of steps in trajectory to include. weight_col: str (optional, default None) Aggregation column for edge weighting. If None, specified index_col from retentioneering.config will be used as column name. For example, can be specified as `session_id` if dataframe has such column. precision: int (optional, default 2) Number of decimal digits after 0 to show as fractions in the heatmap. thresh: float (optional, default 0) Used to remove rare events. Aggregates all rows where all values are less then specified threshold. targets: list (optional, default None) List of events names (as str) to include in the bottom of step_matrix as individual rows. Each specified target will have separate color-coding space for clear visualization. Example: ['product_page', 'cart', 'payment']. If multiple targets need to be compared and plotted using same color-coding scale, such targets must be combined in sub-list. Examples: ['product_page', ['cart', 'payment']] accumulated: string (optional, default None) Option to include accumulated values for targets. Valid values are None (do not show accumulated tartes), 'both' (show step values and accumulated values), 'only' (show targets only as accumulated). centered: dict (optional, default None) Parameter used to align user trajectories at specific event at specific step. Has to contain three keys: 'event': str, name of event to align 'left_gap': int, number of events to include before specified event 'occurrence': int which occurance of event to align (typical 1) When this parameter is not None only users which have specified i'th 'occurance' of selected event preset in their trajectories will be included. Fraction of such remaining users is specified in the title of centered step_matrix. Example: {'event': 'cart', 'left_gap': 8, 'occurrence': 1} sorting: list (optional, default None) List of events_names (as string) can be passed to plot step_matrix with specified ordering of events. If None rows will be ordered according to i`th value (first row, where 1st element is max, second row, where second element is max, etc) groups: tuple (optional, default None) Can be specified to plot step differential step_matrix. Must contain tuple of two elements (g_1, g_2): where g_1 and g_2 are collections of user_id`s (list, tuple or set). Two separate step_matrixes M1 and M2 will be calculated for users from g_1 and g_2, respectively. Resulting matrix will be the matrix M = M1-M2. Note, that values in each column in differential step matrix will sum up to 0 (since columns in both M1 and M2 always sum up to 1). show_plot: bool (optional, default True) whether to show resulting heatmap or not. Returns ------- Dataframe with max_steps number of columns and len(event_col.unique) number of rows at max, or less if used thr > 0. Return type ----------- pd.DataFrame """ event_col = self.retention_config['event_col'] weight_col = weight_col or self.retention_config['user_col'] time_col = self.retention_config['event_time_col'] data = self._obj.copy() # add termination event to each history: data = data.rete.split_sessions(thresh=None, eos_event='ENDED', session_col=None) data['event_rank'] = 1 data['event_rank'] = data.groupby(weight_col)['event_rank'].cumsum() # BY HERE WE NEED TO OBTAIN FINAL DIFF piv and piv_targets before sorting, thresholding and plotting: if groups: data_pos = data[data[weight_col].isin(groups[0])].copy() if len(data_pos) == 0: raise IndexError('Users from positive group are not present in dataset') piv_pos, piv_targets_pos, fraction_title, window, targets_plot = \ _step_matrix_values(data=data_pos, weight_col=weight_col, event_col=event_col, time_col=time_col, targets=targets, accumulated=accumulated, centered=centered, max_steps=max_steps) data_neg = data[data[weight_col].isin(groups[1])].copy() if len(data_pos) == 0: raise IndexError('Users from negative group are not present in dataset') piv_neg, piv_targets_neg, fraction_title, window, targets_plot = \ _step_matrix_values(data=data_neg, weight_col=weight_col, event_col=event_col, time_col=time_col, targets=targets, accumulated=accumulated, centered=centered, max_steps=max_steps) def join_index(df1, df2): full_list = set(df1.index) | set(df2.index) for i in full_list: if i not in df1.index: df1.loc[i] = 0 if i not in df2.index: df2.loc[i] = 0 join_index(piv_pos, piv_neg) piv = piv_pos - piv_neg if targets: join_index(piv_targets_pos, piv_targets_neg) piv_targets = piv_targets_pos - piv_targets_neg else: piv_targets = None else: piv, piv_targets, fraction_title, window, targets_plot = \ _step_matrix_values(data=data, weight_col=weight_col, event_col=event_col, time_col=time_col, targets=targets, accumulated=accumulated, centered=centered, max_steps=max_steps) thresh_index = 'THRESHOLDED_' if thresh != 0: # find if there are any rows to threshold: thresholded = piv.loc[(piv.abs() < thresh).all(1)].copy() if len(thresholded) > 0: piv = piv.loc[(piv.abs() >= thresh).any(1)].copy() thresh_index = f'THRESHOLDED_{len(thresholded)}' piv.loc[thresh_index] = thresholded.sum() if sorting is None: piv = _sort_matrix(piv) keep_in_the_end = [] keep_in_the_end.append('ENDED') if ('ENDED' in piv.index) else None keep_in_the_end.append(thresh_index) if (thresh_index in piv.index) else None events_order = [*(i for i in piv.index if i not in keep_in_the_end), *keep_in_the_end] piv = piv.loc[events_order] # add sorting list to config self.retention_config['step_matrix'] = {'sorting': events_order} else: # if custom sorting was provided: if not isinstance(sorting, list): raise TypeError('parameter `sorting` must be a list of event names') if {*sorting} != {*piv.index}: raise ValueError( 'provided sorting list does not match list of events. Run with `sorting` = None to get the actual list') piv = piv.loc[sorting] if centered: piv.columns = [f'{int(i) - window - 1}' for i in piv.columns] if targets: piv_targets.columns = [f'{int(i) - window - 1}' for i in piv_targets.columns] if show_plot: plot_step_matrix.step_matrix(piv, piv_targets, targets_list=targets_plot, title=f'{"centered" if centered else ""} {"differential " if groups else ""}step matrix {fraction_title}', centered_position=window, precision=precision) return piv
def _step_matrix_values(*, data, weight_col, event_col, time_col, targets, accumulated, centered, max_steps): """ Supplemental function to calculate values for step_matrix Parameters same as in step_matrix Parameters ---------- data weight_col event_col time_col targets accumulated centered max_steps Returns ------- pandas Dataframe """ def pad_cols(df, max_steps): """ Parameters ---------- df - dataframe max_steps - number of cols Returns ------- returns Dataframe with columns from 0 to max_steps """ df = df.copy() if max(df.columns) < max_steps: for col in range(max(df.columns) + 1, max_steps + 1): df[col] = 0 # add missing cols if needed: if min(df.columns) > 1: for col in range(1, min(df.columns)): df[col] = 0 # sort cols return df[list(range(1, max_steps + 1))] from copy import deepcopy data = data.copy() targets = deepcopy(targets) # ALIGN DATA IF CENTRAL fraction_title = '' window = None if centered is not None: # CHECKS if (not isinstance(centered, dict) or ({'event', 'left_gap', 'occurrence'} - {*centered.keys()})): raise ValueError("Parameter centered must be dict with following keys: 'event', 'left_gap', 'occurrence'") center_event = centered.get('event') window = centered.get('left_gap') occurrence = centered.get('occurrence') if occurrence < 1 or not isinstance(occurrence, int): raise ValueError("key value 'occurrence' must be Int >=1") if window < 0 or not isinstance(window, int): raise ValueError("key value 'left_gap' must be Int >=0") if center_event not in data[event_col].unique(): raise ValueError(f'Event "{center_event}" not found in the column: "{event_col}"') # keep only users who have center_event at least N = occurrence times data['occurrence'] = data[event_col] == center_event data['occurance_counter'] = data.groupby(weight_col)['occurrence'].cumsum() * data['occurrence'] users_to_keep = data[data['occurance_counter'] == occurrence][weight_col].unique() if len(users_to_keep) == 0: raise ValueError(f'no records found with event "{center_event}" occuring N={occurrence} times') fraction_used = len(users_to_keep) / data[weight_col].nunique() * 100 if fraction_used < 100: fraction_title = f'({fraction_used:.1f}% of total records)' data = data[data[weight_col].isin(users_to_keep)].copy() def pad_to_center(x): position = x.loc[(x[event_col] == center_event) & (x['occurance_counter'] == occurrence)]['event_rank'].min() shift = position - window - 1 x['event_rank'] = x['event_rank'] - shift return x data = data.groupby(weight_col).apply(pad_to_center) data = data[data['event_rank'] > 0].copy() # calculate step matrix elements: agg = (data .groupby(['event_rank', event_col])[weight_col] .nunique() .reset_index()) agg[weight_col] /= data[weight_col].nunique() agg = agg[agg['event_rank'] <= max_steps] agg.columns = ['event_rank', 'event_name', 'freq'] piv = agg.pivot(index='event_name', columns='event_rank', values='freq').fillna(0) # add missing cols if number of events < max_steps: piv = pad_cols(piv, max_steps) = None = None # MAKE TERMINATED STATE ACCUMULATED: if 'ENDED' in piv.index: piv.loc['ENDED'] = piv.loc['ENDED'].cumsum().fillna(0) # add NOT_STARTED events for centered matrix if centered: piv.loc['NOT_STARTED'] = 1 - piv.sum() # ADD ROWS FOR TARGETS: piv_targets = None if targets: # obtain flatten list of targets: targets_flatten = list(pd.core.common.flatten(targets)) # format targets to list of lists: for n, i in enumerate(targets): if type(i) != list: targets[n] = [i] agg_targets = (data .groupby(['event_rank', event_col])[time_col] .count() .reset_index()) agg_targets[time_col] /= data[weight_col].nunique() agg_targets.columns = ['event_rank', 'event_name', 'freq'] agg_targets = agg_targets[agg_targets['event_rank'] <= max_steps] piv_targets = agg_targets.pivot(index='event_name', columns='event_rank', values='freq').fillna(0) piv_targets = pad_cols(piv_targets, max_steps) # if target is not present in dataset add zeros: for i in targets_flatten: if i not in piv_targets.index: piv_targets.loc[i] = 0 piv_targets = piv_targets.loc[targets_flatten].copy() = None = None if accumulated == 'only': piv_targets.index = map(lambda x: 'ACC_' + x, piv_targets.index) for i in piv_targets.index: piv_targets.loc[i] = piv_targets.loc[i].cumsum().fillna(0) # change names is targets list: for target in targets: for j, item in enumerate(target): target[j] = 'ACC_'+item if accumulated == 'both': for i in piv_targets.index: piv_targets.loc['ACC_'+i] = piv_targets.loc[i].cumsum().fillna(0) # add accumulated targets to the list: targets_not_acc = deepcopy(targets) for target in targets: for j, item in enumerate(target): target[j] = 'ACC_'+item targets = targets_not_acc + targets return piv, piv_targets, fraction_title, window, targets def _sort_matrix(step_matrix): x = step_matrix.copy() order = [] for i in x.columns: new_r = x[i].idxmax() order.append(new_r) x = x.drop(new_r) if x.shape[0] == 0: break order.extend(list(set(step_matrix.index) - set(order))) return step_matrix.loc[order]