from __future__ import annotations
from collections.abc import Collection
from typing import Any, Literal
import pandas as pd
import plotly.graph_objects as go
from pandas.core.common import flatten
from retentioneering.backend.tracker import track
from retentioneering.eventstream.types import EventstreamType
FunnelTypes = Literal["open", "closed", "hybrid"]
[docs]class Funnel:
"""
A class for the calculation and visualization of a conversion funnel.
Parameters
----------
eventstream : EventstreamType
See Also
--------
.Eventstream.funnel : Call Funnel tool as an eventstream method.
Notes
-----
See :doc:`Funnel user guide</user_guides/funnel>` for the details.
"""
__default_layout = dict(
margin={"l": 180, "r": 0, "t": 30, "b": 0, "pad": 0},
funnelmode="stack",
showlegend=True,
hovermode="closest",
legend=dict(orientation="v", bgcolor="#E2E2E2", xanchor="left", font=dict(size=12)),
)
__eventstream: EventstreamType
stages: list[str]
stage_names: list[str] | None
funnel_type: FunnelTypes
segments: Collection[Collection[int]] | None
segment_names: list[str] | None
__res_dict: dict[str, dict]
@track( # type: ignore
tracking_info={"event_name": "init"},
scope="funnel",
allowed_params=[],
)
def __init__(self, eventstream: EventstreamType) -> None:
self.__eventstream = eventstream
self.user_col = self.__eventstream.schema.user_id
self.event_col = self.__eventstream.schema.event_name
self.time_col = self.__eventstream.schema.event_timestamp
self.__res_dict = {}
def __validate_input(
self,
stages: list[str],
stage_names: list[str] | None = None,
funnel_type: FunnelTypes = "closed",
segments: Collection[Collection[int]] | None = None,
segment_names: list[str] | None = None,
) -> tuple[pd.DataFrame, list[str], list[str], FunnelTypes, Collection[Collection[int]], list[str]]:
data = self.__eventstream.to_dataframe(copy=True)
data = data[data[self.event_col].isin([i for i in flatten(stages)])] # type: ignore
if stages and stage_names and len(stages) != len(stage_names):
raise ValueError("stages and stage_names must be the same length!")
if segments is None:
segments = [data[self.user_col].unique().tolist()]
segment_names = ["all users"]
else:
sets = [set(segment) for segment in segments]
if len(set.intersection(*sets)) > 0:
raise ValueError("Check intersections of users in segments!")
if segment_names is None:
segment_names = [f"group {i}" for i in range(len(segments))] # type: ignore
if segments and segment_names and len(segments) != len(segment_names): # type: ignore
raise ValueError("segments and segment_names must be the same length!")
# IDK why but pyright thinks this is Funnel!!!
if funnel_type not in ["open", "closed", "hybrid"]:
raise ValueError("funnel_type should be 'open', 'closed' or 'hybrid'!")
for idx, stage in enumerate(stages):
if type(stage) is not list:
stages[idx] = [stage] # type: ignore
if stage_names is None:
stage_names = []
for t in stages:
# get name
stage_names.append(" | ".join(t).strip(" | "))
stage_names = stage_names
return data, stages, stage_names, funnel_type, segments, segment_names
def _plot_stacked_funnel(self, data: list[go.Funnel]) -> go.Figure:
layout = go.Layout(**self.__default_layout)
fig = go.Figure(data, layout)
return fig
@staticmethod
def _calculate_plot_data(plot_params: dict[str, Any]) -> list[go.Funnel]:
data = []
for t in plot_params.keys():
trace = go.Funnel(
name=t,
y=plot_params[t]["stages"],
x=plot_params[t]["values"],
textinfo="value+percent initial+percent previous",
)
data.append(trace)
return data
def _prepare_data_for_closed_and_hybrid_funnel(
self,
data: pd.DataFrame,
stages: list[str],
stage_names: list[str],
segments: Collection[Collection[int]],
segment_names: list[str],
) -> dict[str, dict]:
min_time_0stage = (
data[data[self.event_col].isin(stages[0])].groupby(self.user_col)[[self.time_col]].min().reset_index()
)
data = data.merge(min_time_0stage, "left", on=self.user_col, suffixes=("", "_min"))
data.rename(columns={data.columns[-1]: "min_date"}, inplace=True)
# filtered NA and only events that occurred after the user entered the first funnel event remain
data = data[(~data["min_date"].isna()) & (data["min_date"] <= data[self.time_col])]
data.drop(columns="min_date", inplace=True)
__res_dict = {}
for segment, name in zip(segments, segment_names):
vals, _df = self._crop_df(data, stages, segment)
__res_dict[name] = {"stages": stage_names, "values": vals}
return __res_dict
def _prepare_data_for_open_funnel(
self,
data: pd.DataFrame,
stages: list[str],
stage_names: list[str],
segments: Collection[Collection[int]],
segment_names: list[str],
) -> dict[str, dict]:
__res_dict = {}
for segment, name in zip(segments, segment_names):
# isolate users from group
group_data = data[data[self.user_col].isin(segment)]
vals = [group_data[group_data[self.event_col].isin(stage)][self.user_col].nunique() for stage in stages]
__res_dict[name] = {"stages": stage_names, "values": vals}
return __res_dict
def _crop_df(self, df: pd.DataFrame, stages: list[str], segment: Collection[int]) -> tuple[list[int], pd.DataFrame]:
first_stage = stages[0]
next_stages = stages[1:]
first_stage_users = set(
(df[(df[self.event_col].isin(first_stage)) & (df[self.user_col].isin(segment))][self.user_col])
)
df = df.drop(
df[(~df[self.user_col].isin(first_stage_users)) | (df[self.event_col].isin(first_stage))].index.tolist()
)
prev_users_stage = first_stage_users
vals = [len(first_stage_users)]
for stage in next_stages:
user_stage = set(
df[(df[self.event_col].isin(stage)) & (df[self.user_col].isin(first_stage_users))][self.user_col]
)
user_stage = user_stage - (user_stage - prev_users_stage)
prev_users_stage = user_stage
vals.append(len(user_stage))
if self.funnel_type == "closed":
stage_min_df = (
df[df[self.event_col].isin(stage)].groupby(self.user_col)[[self.time_col]].min().reset_index()
)
df = df.merge(stage_min_df, "left", on=self.user_col, suffixes=("", "_min"))
df.rename(columns={df.columns[-1]: "min_date"}, inplace=True)
df.drop(
df[
(df["min_date"].isna())
| (df["min_date"] >= df[self.time_col])
| (~df[self.user_col].isin(user_stage))
].index.tolist(),
inplace=True,
)
df.drop(columns="min_date", inplace=True)
else:
df = df.drop(df[~df[self.user_col].isin(user_stage)].index.tolist())
return vals, df
[docs] @track( # type: ignore
tracking_info={"event_name": "fit"},
scope="funnel",
allowed_params=[
"stages",
"stage_names",
"funnel_type",
"segments",
"segment_names",
],
)
def fit(
self,
stages: list[str],
stage_names: list[str] | None = None,
funnel_type: FunnelTypes = "closed",
segments: Collection[Collection[int]] | None = None,
segment_names: list[str] | None = None,
) -> None:
"""
Calculate the funnel internal values with the defined parameters.
Applying ``fit`` method is necessary for the following usage
of any visualization or descriptive ``Funnel`` methods.
Parameters
----------
stages : list of str
List of events used as stages for the funnel. Absolute and relative
number of users who reached specified events at least once will be
plotted. Multiple events can be grouped together as an individual state
by combining them as a sub list.
stage_names : list of str, optional
List of stage names, this is necessary for stages that include several events.
funnel_type : 'open', 'closed' or 'hybrid', default 'closed'
- if ``open`` - all users will be counted on each stage;
- if ``closed`` - each stage will include only users, that were present on all previous stages;
- if ``hybrid`` - combination of 2 previous types. The first stage is required
to go further. And for the second and subsequent stages it is important to have
all previous stages in their path, but the order of these events is not taken
into account.
segments : Collection[Collection[int]], optional
List of user_ids collections. Funnel for each user_id collection will be plotted.
If ``None`` - all users from the dataset will be plotted. A user can only belong to one segment at a time.
segment_names : list of str, optional
Names of segments. Should be a list from unique values of the ``segment_col``.
If ``None`` and ``segment_col`` is given - all values from ``segment_col`` will be used.
"""
(
data,
self.stages,
self.stage_names,
self.funnel_type,
self.segments,
self.segment_names,
) = self.__validate_input(stages, stage_names, funnel_type, segments, segment_names)
if self.funnel_type in ["closed", "hybrid"]:
self.__res_dict = self._prepare_data_for_closed_and_hybrid_funnel(
data=data,
stages=self.stages,
stage_names=self.stage_names,
segments=self.segments,
segment_names=self.segment_names,
)
elif self.funnel_type == "open":
self.__res_dict = self._prepare_data_for_open_funnel(
data=data,
stages=self.stages,
segments=self.segments,
segment_names=self.segment_names,
stage_names=self.stage_names,
)
[docs] @track( # type: ignore
tracking_info={"event_name": "plot"},
scope="funnel",
allowed_params=[],
)
def plot(self) -> go.Figure:
"""
Create a funnel plot based on the calculated funnel values.
Should be used after :py:func:`fit`.
Returns
-------
go.Figure
"""
result_dict = self.__res_dict
data = self._calculate_plot_data(plot_params=result_dict)
figure = self._plot_stacked_funnel(data=data)
return figure
@property
@track( # type: ignore
tracking_info={"event_name": "values"},
scope="funnel",
allowed_params=[],
)
def values(self) -> pd.DataFrame:
"""
Returns a pd.DataFrame representing the calculated funnel values.
Should be used after :py:func:`fit`.
Returns
-------
pd.DataFrame
+------------------+-------------+-----------------+-------------------+-----------------+
| **segment_name** | **stages** | **unique_users**| **%_of_initial** | **%_of_total** |
+------------------+-------------+-----------------+-------------------+-----------------+
| segment_1 | stage_1 | 2000 | 100.00 | 100.00 |
+------------------+-------------+-----------------+-------------------+-----------------+
"""
result_dict = self.__res_dict
result_list = []
for key in result_dict:
result_ = pd.DataFrame(result_dict[key])
result_.columns = ["stages", "unique_users"] # type: ignore
result_["segment_name"] = key
result_ = result_[["segment_name", "stages", "unique_users"]]
result_["shift"] = result_["unique_users"].shift(periods=1, fill_value=result_["unique_users"][0])
result_["%_of_initial"] = (result_["unique_users"] / result_["shift"] * 100).round(2)
result_["%_of_total"] = (result_["unique_users"] / result_["unique_users"][0] * 100).round(2)
result_.drop(columns="shift", inplace=True)
result_list.append(result_)
result_df = pd.concat(result_list).set_index(["segment_name", "stages"])
return result_df
@property
@track( # type: ignore
tracking_info={"event_name": "params"},
scope="funnel",
allowed_params=[],
)
def params(self) -> dict:
"""
Returns the parameters used for the last fitting.
"""
return {
"stages": self.stages,
"stage_names": self.stage_names,
"funnel_type": self.funnel_type,
"segments": self.segments,
"segment_names": self.segment_names,
}