Source code for retentioneering.data_processors_lib.label_new_users

from __future__ import annotations

from typing import List, Literal, Union

from pandas import DataFrame

from retentioneering.backend.tracker import track
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamType
from retentioneering.params_model import ParamsModel
from retentioneering.widget.widgets import ListOfIntNewUsers


[docs]class LabelNewUsersParams(ParamsModel): """ A class with parameters for :py:class:`.LabelNewUsers` class. """ new_users_list: Union[List[int], List[str], Literal["all"]] _widgets = {"new_users_list": ListOfIntNewUsers()}
[docs]class LabelNewUsers(DataProcessor): """ Create a new synthetic event for each user: ``new_user`` or ``existing_user``. Parameters ---------- new_users_list : list of int or list of str or `all` If the `list of user_ids` is given - ``new_user`` event will be created for each user from the list. Event ``existing_user`` will be added to the rest of the users. If ``all`` - ``new_user`` synthetic event will be created for all users from the input ``eventstream``. Returns ------- Eventstream Eventstream with new synthetic events, one for each user: +-----------------+-----------------+------------------------+ | **event_name** | **event_type** | **timestamp** | +-----------------+-----------------+------------------------+ | new_user | new_user | first_event | +-----------------+-----------------+------------------------+ | existing_user | existing_user | first_event | +-----------------+-----------------+------------------------+ Notes ----- See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details. """ params: LabelNewUsersParams @track( # type: ignore tracking_info={"event_name": "init"}, scope="label_new_users", allowed_params=[], ) def __init__(self, params: LabelNewUsersParams): super().__init__(params=params) @track( # type: ignore tracking_info={"event_name": "apply"}, scope="label_new_users", allowed_params=[], ) def apply(self, eventstream: EventstreamType) -> EventstreamType: from retentioneering.eventstream.eventstream import Eventstream events: DataFrame = eventstream.to_dataframe(copy=True) user_col = eventstream.schema.user_id type_col = eventstream.schema.event_type event_col = eventstream.schema.event_name new_users_list = self.params.new_users_list matched_events = events.groupby(user_col, as_index=False).first() if new_users_list == "all": matched_events[type_col] = "new_user" matched_events[event_col] = "new_user" else: new_user_mask = matched_events[user_col].isin(new_users_list) matched_events.loc[new_user_mask, type_col] = "new_user" # type: ignore matched_events.loc[~new_user_mask, type_col] = "existing_user" # type: ignore matched_events[event_col] = matched_events[type_col] matched_events["ref"] = None eventstream = Eventstream( raw_data_schema=eventstream.schema.to_raw_data_schema(), raw_data=matched_events, relations=[{"raw_col": "ref", "eventstream": eventstream}], ) return eventstream