Source code for retentioneering.data_processors_lib.label_new_users
from __future__ import annotations
from typing import List, Literal, Union
import pandas as pd
from retentioneering.backend.tracker import collect_data_performance, time_performance
from retentioneering.data_processor import DataProcessor
from retentioneering.eventstream.types import EventstreamSchemaType
from retentioneering.params_model import ParamsModel
from retentioneering.utils.doc_substitution import docstrings
from retentioneering.utils.hash_object import hash_dataframe
from retentioneering.widget.widgets import ListOfUsers
[docs]class LabelNewUsersParams(ParamsModel):
    """
    A class with parameters for :py:class:`.LabelNewUsers` class.
    """
    new_users_list: Union[List[int], List[str], Literal["all"]]
    _widgets = {"new_users_list": ListOfUsers()} 
[docs]@docstrings.get_sections(
    base="LabelNewUsers",
)  # type: ignore
class LabelNewUsers(DataProcessor):
    """
    Create a new synthetic event for each user:
    ``new_user`` or ``existing_user``.
    Parameters
    ----------
    new_users_list : list of int or list of str or `all`
        If the `list of user_ids` is given - ``new_user`` event will be created for each user from the list.
        Event ``existing_user`` will be added to the rest of the users.
        If ``all`` - ``new_user`` synthetic event will be created for all users from the input ``eventstream``.
    Returns
    -------
    Eventstream
        Eventstream with new synthetic events, one for each user:
        +-----------------+-----------------+------------------------+
        | **event_name**  | **event_type**  | **timestamp**          |
        +-----------------+-----------------+------------------------+
        | new_user        | new_user        | first_event            |
        +-----------------+-----------------+------------------------+
        | existing_user   | existing_user   | first_event            |
        +-----------------+-----------------+------------------------+
    Notes
    -----
    See :doc:`Data processors user guide</user_guides/dataprocessors>` for the details.
    """
    params: LabelNewUsersParams
    @time_performance(
        scope="label_new_users",
        event_name="init",
    )
    def __init__(self, params: LabelNewUsersParams):
        super().__init__(params=params)
    @time_performance(  # type: ignore
        scope="label_new_users",
        event_name="apply",
    )
    def apply(self, df: pd.DataFrame, schema: EventstreamSchemaType) -> pd.DataFrame:
        user_col = schema.user_id
        type_col = schema.event_type
        event_col = schema.event_name
        new_users_list = self.params.new_users_list
        matched_events = df.groupby(user_col, as_index=False).first()
        if new_users_list == "all":
            matched_events[type_col] = "new_user"
            matched_events[event_col] = "new_user"
        else:
            new_user_mask = matched_events[user_col].isin(new_users_list)
            matched_events.loc[new_user_mask, type_col] = "new_user"  # type: ignore
            matched_events.loc[~new_user_mask, type_col] = "existing_user"  # type: ignore
            matched_events[event_col] = matched_events[type_col]
        result = pd.concat([df, matched_events])
        collect_data_performance(
            scope="label_new_users",
            event_name="metadata",
            called_params={"new_users_list": new_users_list},
            performance_data={
                "parent": {
                    "shape": df.shape,
                    "hash": hash_dataframe(df),
                },
                "child": {
                    "shape": result.shape,
                    "hash": hash_dataframe(result),
                },
            },
        )
        return result