Source code for retentioneering.data_processors_lib.label_new_users
from __future__ import annotations
from typing import List, Literal, Union
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfUsers
[docs]class LabelNewUsersParams(ParamsModel):
"""
A class with parameters for :py:class:`.LabelNewUsers` class.
"""
new_users_list: Union[List[int], List[str], Literal["all"]]
_widgets = {"new_users_list": ListOfUsers()}
[docs]@docstrings.get_sections(
base="LabelNewUsers",
) # type: ignore
class LabelNewUsers(DataProcessor):
"""
Create a new synthetic event for each user:
``new_user`` or ``existing_user``.
Parameters
----------
new_users_list : list of int or list of str or `all`
If the `list of user_ids` is given - ``new_user`` event will be created for each user from the list.
Event ``existing_user`` will be added to the rest of the users.
If ``all`` - ``new_user`` synthetic event will be created for all users from the input ``eventstream``.
Returns
-------
Eventstream
Eventstream with new synthetic events, one for each user:
+-----------------+-----------------+------------------------+
| **event_name** | **event_type** | **timestamp** |
+-----------------+-----------------+------------------------+
| new_user | new_user | first_event |
+-----------------+-----------------+------------------------+
| existing_user | existing_user | first_event |
+-----------------+-----------------+------------------------+
Notes
-----
See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
"""
params: LabelNewUsersParams
@time_performance(
scope="label_new_users",
event_name="init",
)
def __init__(self, params: LabelNewUsersParams):
super().__init__(params=params)
@time_performance( # type: ignore
scope="label_new_users",
event_name="apply",
)
def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
user_col = schema.user_id
type_col = schema.event_type
event_col = schema.event_name
new_users_list = self.params.new_users_list
matched_events = df.groupby(user_col, as_index=False).first()
if new_users_list == "all":
matched_events[type_col] = "new_user"
matched_events[event_col] = "new_user"
else:
new_user_mask = matched_events[user_col].isin(new_users_list)
matched_events.loc[new_user_mask, type_col] = "new_user" # type: ignore
matched_events.loc[~new_user_mask, type_col] = "existing_user" # type: ignore
matched_events[event_col] = matched_events[type_col]
result = pd.concat([df, matched_events])
collect_data_performance(
scope="label_new_users",
event_name="metadata",
called_params={"new_users_list": new_users_list},
performance_data={
"parent": {
"shape": df.shape,
"hash": hash_dataframe(df),
},
"child": {
"shape": result.shape,
"hash": hash_dataframe(result),
},
},
)
return result