classGroupEventsBulkParams(ParamsModel):grouping_rules:Union[List[GroupEventsRule],GroupingRulesDict]ignore_intersections:bool=False_widgets={# @TODO: is stub for editor, fix later"grouping_rules":EnumWidget(),}defcombine_masks(masks:List[pd.Series])->pd.Series:mask_arrays=[mask.valuesformaskinmasks]combined_mask=np.sum(mask_arrays,axis=0)>1# type: ignoreresult_mask=pd.Series(combined_mask,index=masks[0].index)returnresult_mask
[docs]@docstrings.get_sections(base="GroupEventsBulk")# type: ignoreclassGroupEventsBulk(DataProcessor):""" Apply multiple grouping rules simultaneously. See also :py:meth:`GroupEvents<retentioneering.data_processors_lib.group_events.GroupEvents>` Parameters ---------- grouping_rules : list or dict - If list, each list element is a dictionary with mandatory keys ``event_name`` and ``func`` and an optional key ``event_type``. Their meaning is the same as for :py:meth:`GroupEvents<retentioneering.data_processors_lib.group_events.GroupEvents>`. - If dict, the keys are considered as ``event_name``, values are considered as ``func``. Setting ``event_type`` is not supported in this case. ignore_intersections : bool, default False If ``False``, a ``ValueError`` is raised in case any event from the input eventstream matches more than one grouping rule. Otherwise, the first appropriate rule from ``grouping_rules`` is applied. Returns ------- Eventstream ``Eventstream`` with the grouped events according to the given grouping rules. """params:GroupEventsBulkParams@time_performance(scope="group_events_bulk",event_name="init",)def__init__(self,params:GroupEventsBulkParams)->None:super().__init__(params=params)@time_performance(scope="group_events_bulk",event_name="apply",)defapply(self,df:pd.DataFrame,schema:EventstreamSchemaType)->pd.DataFrame:rules=self.params.grouping_rulesignore_intersections=self.params.ignore_intersectionsifisinstance(rules,dict):rules_list:List[GroupEventsRule]=[]forkey,valinrules.items():rules_list.append(GroupEventsRule(event_name=key,func=val))# type: ignorerules=rules_listparent_info={"shape":df.shape,"hash":hash_dataframe(df),}masks:List[pd.Series]=[]source=df.copy()forruleinrules:event_name=rule.event_namefunc:Callable=rule.funcevent_type=rule.event_typeifrule.event_typeelse"group_alias"expected_args_count=len(signature(func).parameters)ifexpected_args_count==1:mask=func(df)# type: ignoreifnotignore_intersections:source_mask=func(source)# type: ignoremasks.append(source_mask)else:mask=func(df,schema)ifnotignore_intersections:source_mask=func(source,schema)masks.append(source_mask)withpd.option_context("mode.chained_assignment",None):df.loc[mask,schema.event_type]=event_typedf.loc[mask,schema.event_name]=event_nameifnotignore_intersections:intersection_mask=combine_masks(masks)has_intersections=intersection_mask.any()ifhas_intersections:raiseValueError("GroupEventsBulk Dataprocessor error. Mapping rules are intersected. Use ignore_intersections=True or fix the intersections")collect_data_performance(scope="group_events_bulk",event_name="metadata",called_params=self.to_dict()["values"],performance_data={"parent":parent_info,"child":{"shape":df.shape,"hash":hash_dataframe(df),},},)returndf