Source code for retentioneering.data_processors_lib.split_sessions
from __future__ import annotations
from typing import Tuple
import numpy as np
import pandas as pd
from retentioneering.backend.tracker import track
from retentioneering.constants import DATETIME_UNITS
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.schema import EventstreamSchema
from retentioneering.eventstream.types import EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.widget.widgets import ReteTimeWidget
[docs]class SplitSessionsParams(ParamsModel):
"""
A class with parameters for :py:class:`.SplitSessions` class.
"""
timeout: Tuple[float, DATETIME_UNITS]
mark_truncated: bool = False
session_col: str = "session_id"
_widgets = {"timeout": ReteTimeWidget()}
[docs]class SplitSessions(DataProcessor):
"""
Create new synthetic events, that divide users' paths on sessions:
``session_start`` (or ``session_start_cropped``) and ``session_end`` (or ``session_end_cropped``).
Also create a new column that contains session number for each event in input eventstream.
Session number will take the form: ``{user_id}_{session_number through one user path}``.
Parameters
----------
timeout : Tuple(float, :numpy_link:`DATETIME_UNITS<>`)
Threshold value and its unit of measure.
``session_start`` and ``session_end`` events are always placed before the first and after the last event
in each user's path.
Because user can have more than one session, it calculates timedelta between every two consecutive events in
each user's path.
If the calculated timedelta is more than selected timeout,
new synthetic events - ``session_start`` and ``session_end`` are created inside the user path,
marking session starting and ending points.
mark_truncated : bool, default False
If ``True`` - calculates timedelta between:
- first event in each user's path and first event in the whole eventstream.
- last event in each user's path and last event in the whole eventstream.
For users with timedelta less than selected ``timeout``,
a new synthetic event - ``session_start_cropped`` or ``session_end_cropped`` will be added.
session_col : str, default "session_id"
The name of the ``session_col``.
Returns
-------
Eventstream
``Eventstream`` with new synthetic events and ``session_col``.
+-----------------------------+----------------------------+-----------------+
| **event_name** | **event_type** | **timestamp** |
+-----------------------------+----------------------------+-----------------+
| session_start | session_start | first_event |
+-----------------------------+----------------------------+-----------------+
| session_end | session_end | last_event |
+-----------------------------+----------------------------+-----------------+
| session_start_cropped | session_start_cropped | first_event |
+-----------------------------+----------------------------+-----------------+
| session_end_cropped | session_end_cropped | last_event |
+-----------------------------+----------------------------+-----------------+
If the delta between timestamps of two consecutive events
(raw_event_n and raw_event_n+1) is greater than the selected ``timeout``
the user will have more than one session:
+--------------+-------------------+------------------+-------------------+------------------+
| **user_id** | **event_name** | **event_type** | **timestamp** | **session_col** |
+--------------+-------------------+------------------+-------------------+------------------+
| 1 | session_start | session_start | first_event | 1_0 |
+--------------+-------------------+------------------+-------------------+------------------+
| 1 | session_end | session_end | raw_event_n | 1_0 |
+--------------+-------------------+------------------+-------------------+------------------+
| 1 | session_start | session_start | raw_event_n+1 | 1_1 |
+--------------+-------------------+------------------+-------------------+------------------+
| 1 | session_end | session_end | last_event | 1_1 |
+--------------+-------------------+------------------+-------------------+------------------+
See Also
--------
.TimedeltaHist : Plot the distribution of the time deltas between two events.
.Eventstream.describe : Show general eventstream statistics.
.Eventstream.describe_events : Show general eventstream events statistics.
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: SplitSessionsParams
@track( # type: ignore
tracking_info={"event_name": "init"},
scope="split_sessions",
allowed_params=[],
)
def __init__(self, params: SplitSessionsParams) -> None:
super().__init__(params=params)
@track( # type: ignore
tracking_info={"event_name": "apply"},
scope="split_sessions",
allowed_params=[],
)
def apply(self, eventstream: EventstreamType) -> EventstreamType:
from retentioneering.eventstream.eventstream import Eventstream
user_col = eventstream.schema.user_id
time_col = eventstream.schema.event_timestamp
type_col = eventstream.schema.event_type
event_col = eventstream.schema.event_name
session_col = self.params.session_col
timeout, timeout_unit = self.params.timeout
mark_truncated = self.params.mark_truncated
df = eventstream.to_dataframe(copy=True)
df["ref"] = df[eventstream.schema.event_id]
df["prev_timedelta"] = df[time_col] - df.groupby(user_col)[time_col].shift(1)
df["next_timedelta"] = df.groupby(user_col)[time_col].shift(-1) - df[time_col]
df["prev_timedelta"] /= np.timedelta64(1, timeout_unit) # type: ignore
df["next_timedelta"] /= np.timedelta64(1, timeout_unit) # type: ignore
session_starts_mask = (df["prev_timedelta"] > timeout) | (df["prev_timedelta"].isnull())
session_ends_mask = (df["next_timedelta"] > timeout) | (df["next_timedelta"].isnull())
df["is_session_start"] = session_starts_mask
df[session_col] = df.groupby(user_col)["is_session_start"].transform(np.cumsum)
df[session_col] = df[user_col].astype(str) + "_" + df[session_col].astype(str)
session_starts = df[session_starts_mask].copy()
session_ends = df[session_ends_mask].copy()
session_starts[event_col] = "session_start"
session_starts[type_col] = "session_start"
session_starts["ref"] = None
session_ends[event_col] = "session_end"
session_ends[type_col] = "session_end"
session_ends["ref"] = None
df = df.drop(["prev_timedelta", "next_timedelta", "is_session_start"], axis=1)
if mark_truncated:
dataset_start = df[time_col].min()
dataset_end = df[time_col].max()
start_to_start = (session_starts[time_col] - dataset_start) / np.timedelta64(1, timeout_unit)
end_to_end = (dataset_end - session_ends[time_col]) / np.timedelta64(1, timeout_unit)
session_starts_truncated = session_starts[start_to_start < timeout].reset_index()
session_ends_truncated = session_ends[end_to_end < timeout].reset_index()
session_starts_truncated[event_col] = "session_start_cropped"
session_starts_truncated[type_col] = "session_start_cropped"
session_ends_truncated[event_col] = "session_end_cropped"
session_ends_truncated[type_col] = "session_end_cropped"
session_starts = pd.concat([session_starts, session_starts_truncated])
session_ends = pd.concat([session_ends, session_ends_truncated])
df = pd.concat([df, session_starts, session_ends])
raw_data_schema = eventstream.schema.to_raw_data_schema()
raw_data_schema.custom_cols.append({"custom_col": session_col, "raw_data_col": session_col})
eventstream = Eventstream(
schema=EventstreamSchema(custom_cols=[session_col]),
raw_data_schema=raw_data_schema,
raw_data=df,
relations=[{"raw_col": "ref", "eventstream": eventstream}],
)
return eventstream