diff --git a/examples/ih/Conversion_Modeling_Reporting.ipynb b/examples/ih/Conversion_Reporting.ipynb similarity index 81% rename from examples/ih/Conversion_Modeling_Reporting.ipynb rename to examples/ih/Conversion_Reporting.ipynb index f8c86aec..fa176603 100644 --- a/examples/ih/Conversion_Modeling_Reporting.ipynb +++ b/examples/ih/Conversion_Reporting.ipynb @@ -6,16 +6,10 @@ "metadata": {}, "outputs": [], "source": [ - "import polars as pl\n", - "from pdstools import read_ds_export, IH\n", - "from pdstools.utils import cdh_utils\n", - "from ih_helper import interaction_history\n", + "from pdstools import IH\n", "\n", "import plotly.io as pio\n", "import plotly as plotly\n", - "import plotly.express as px\n", - "import plotly.graph_objs as go\n", - "from plotly.subplots import make_subplots\n", "\n", "plotly.offline.init_notebook_mode()\n", "pio.renderers.default = \"vscode\"" @@ -25,7 +19,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Conversion Results" + "# Conversion Results\n", + "\n", + "Visualization of conversion modeling results from IH data." ] }, { @@ -37,7 +33,7 @@ "from pathlib import Path\n", "\n", "ih_export_file = Path(\n", - " \"./Data-pxStrategyResult_InteractionFiles_20241213T091932_GMT.zip\"\n", + " \"./Data-pxStrategyResult_InteractionFiles_20241213T091932_GMT.zip \"\n", ")\n", "\n", "if not ih_export_file.exists():\n", @@ -103,17 +99,17 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "# ih.plots.trend_line(\n", - "# experiment_field=\"ExperimentGroup\", # should be optional, can also give query to select only the Test group\n", - "# granularity=\"1d\", # string language polars\n", - "# positive_labels=[\"Conversion\"],\n", - "# negative_labels=[\"Impression\", \"Pending\"],\n", - "# title=\"Conversion Rate trends\",\n", - "# )" + "ih.plots.trend_bar(\n", + " experiment_field=\"ExperimentGroup\",\n", + " every=\"1w\",\n", + " positive_labels=[\"Conversion\"],\n", + " negative_labels=[\"Impression\", \"Pending\"],\n", + " title=\"Conversion Rates over Time\",\n", + ")" ] }, { @@ -136,6 +132,17 @@ " title = \"Overall Engagement\",\n", ")" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ih.plots.trend_line(\n", + " title=\"Engagement Rates over Time\",\n", + ")" + ] } ], "metadata": { diff --git a/examples/ih/ih_helper.py b/examples/ih/ih_helper.py deleted file mode 100644 index 35a39661..00000000 --- a/examples/ih/ih_helper.py +++ /dev/null @@ -1,87 +0,0 @@ -import datetime -import random -import polars as pl -from pdstools.utils import cdh_utils - -# Some day will move into a proper IH class - -# ih.plots.gauge(conversion/engagement) etc -# constructor define objective (conversion and engagement) labels (positives/negatives) - -class interaction_history: - interactions_period_days = 21 - accept_rate = 0.2 - accept_avg_duration_minutes = 10 - convert_over_accept_rate_test = 0.5 - convert_over_accept_rate_control = 0.3 - convert_avg_duration_days = 2 - - def __init__(self, outcome_definitions): - pass - - def generate(self, n): - now = datetime.datetime.now() - - - # def _interpolate(min, max, i, n): - # return min + (max - min) * i / (n - 1) - - - def to_prpc_time_str(timestamp): - return cdh_utils.to_prpc_date_time(timestamp)[0:15] - - - ih_fake_impressions = pl.DataFrame( - { - "InteractionID": [str(int(1e9 + i)) for i in range(n)], - "pyChannel": random.choices(["Web", "Email"], k=n), - "pyIssue": "Acquisition", - "pyGroup": "Phones", - "pyName": "AppleIPhone1564GB", - "ExperimentGroup": ["Conversion-Test", "Conversion-Control"] * int(n / 2), - "TimeStamp": [ - (now - datetime.timedelta(days=i * self.interactions_period_days / n)) - for i in range(n) - ], - "AcceptDurationMinutes": [ - random.uniform(0, 2 * self.accept_avg_duration_minutes) for i in range(n) - ], - "ConvertDurationDays": [ - random.uniform(0, 2 * self.convert_avg_duration_days) for i in range(n) - ], - } - ).with_columns( - pyOutcome=pl.when(pl.col.pyChannel == "Web") - .then(pl.lit("Impression")) - .otherwise(pl.lit("Pending")) - ) - ih_fake_accepts = ih_fake_impressions.sample(fraction=self.accept_rate).with_columns( - pl.col.TimeStamp + pl.duration(minutes=pl.col("AcceptDurationMinutes")), - pyOutcome=pl.when(pl.col.pyChannel == "Web") - .then(pl.lit("Clicked")) - .otherwise(pl.lit("Accepted")), - ) - ih_fake_converts_test = ih_fake_accepts.filter(pl.col.ExperimentGroup=="Conversion-Test").sample( - fraction=self.convert_over_accept_rate_test - ).with_columns( - pl.col.TimeStamp + pl.duration(days=pl.col("ConvertDurationDays")), - pyOutcome=pl.lit("Conversion"), - ) - ih_fake_converts_control = ih_fake_accepts.filter(pl.col.ExperimentGroup=="Conversion-Control").sample( - fraction=self.convert_over_accept_rate_control - ).with_columns( - pl.col.TimeStamp + pl.duration(days=pl.col("ConvertDurationDays")), - pyOutcome=pl.lit("Conversion"), - ) - - ih_data=pl.concat([ih_fake_impressions, ih_fake_accepts, ih_fake_converts_test, ih_fake_converts_control]).with_columns( - pxOutcomeTime=pl.col("TimeStamp").map_elements( - to_prpc_time_str, return_dtype=pl.String - ), - ).filter(pl.col("TimeStamp") < pl.lit(now)).drop( - ["AcceptDurationMinutes", "ConvertDurationDays", "TimeStamp"] - ).sort( - "InteractionID", "pxOutcomeTime" - ).lazy() - - return ih_data \ No newline at end of file diff --git a/python/pdstools/ih/Aggregates.py b/python/pdstools/ih/Aggregates.py index 00334ffd..8264d451 100644 --- a/python/pdstools/ih/Aggregates.py +++ b/python/pdstools/ih/Aggregates.py @@ -1,8 +1,8 @@ -from itertools import chain -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, List, Optional import polars as pl from ..utils.namespaces import LazyNamespace +from ..utils.cdh_utils import safe_flatten_list if TYPE_CHECKING: from .IH import IH as IH_Class @@ -15,17 +15,42 @@ def __init__(self, ih: "IH_Class"): def summary_by_experiment( self, - experiment_field: str, + experiment_field: Optional[str] = None, + every: Optional[str] = None, by: Optional[List[str]] = None, - positive_labels: List[str] = None, - negative_labels: List[str] = None, - ): + positive_labels: Optional[List[str]] = None, + negative_labels: Optional[List[str]] = None, + ) -> pl.LazyFrame: + """Groups the IH data summarizing into success rate (CTR) and standard error (StdErr). - if by is not None: - if isinstance(by, str): - by = [by] - else: - by = [] + It groups by the "experiment field" (TODO in the future this can be optional or multiple). When + given, the 'every' argument is used to divide the timerange into buckets. It uses the same string + language as Polars. + + Every interaction is considered to have only one outcome: positive, negative or none. When any + outcome in the interaction is in the positive labels, the outcome is considered positive. Next, + when any is in the negative labels, the outcome of the interaction is considered negative. Otherwise + there is no defined outcome and the interaction is ignored in calculations of success rate or error. + + Parameters + ---------- + experiment_field : Optional[str], optional + Optional field that contains the experiments + every : Optional[str], optional + Every interval start and period length, by default None + by : Optional[List[str]], optional + Extra grouping keys, by default None + positive_labels : Optional[List[str]], optional + Outcome label(s) for the positive responses, by default None + negative_labels : Optional[List[str]], optional + Outcome label(s) for the negative responses, by default None + + Returns + ------- + pl.LazyFrame + A polars frame with the grouping keys and columns for the total number of Positives, Negatives, + number of Interactions, success rate (CTR) and standard error (StdErr). + """ if positive_labels is None: positive_labels = ["Accepted", "Accept", "Clicked", "Click"] @@ -38,11 +63,26 @@ def summary_by_experiment( "NoResponse", ] + if every is not None: + source = self.ih.data.with_columns(pl.col.OutcomeTime.dt.truncate(every)) + else: + source = self.ih.data + + group_by_clause = safe_flatten_list( + [experiment_field] + [by] + (["OutcomeTime"] if every is not None else []) + ) + if len(group_by_clause) == 0: + group_by_clause = None + summary = ( - self.ih.data.filter( + source.filter( pl.col.ExperimentGroup.is_not_null() & (pl.col.ExperimentGroup != "") ) - .group_by([experiment_field] + by + ["InteractionID"]) + .group_by( + (group_by_clause + ["InteractionID"]) + if group_by_clause is not None + else ["InteractionID"] + ) .agg( # Take only one outcome per interaction. TODO should perhaps be the last one. InteractionOutcome=pl.when(pl.col.Outcome.is_in(positive_labels).any()) @@ -51,7 +91,7 @@ def summary_by_experiment( .then(pl.lit(False)), Outcomes=pl.col.Outcome.unique().sort(), # for debugging ) - .group_by([experiment_field] + by) + .group_by(group_by_clause) .agg( Positives=pl.col.InteractionOutcome.filter( pl.col.InteractionOutcome @@ -74,7 +114,11 @@ def summary_by_experiment( ).sqrt() ) ) - .sort([experiment_field] + by) ) + if group_by_clause is None: + summary = summary.drop("literal") # created by empty group_by + else: + summary = summary.sort(group_by_clause) + return summary diff --git a/python/pdstools/ih/Plots.py b/python/pdstools/ih/Plots.py index eab42c79..036ce438 100644 --- a/python/pdstools/ih/Plots.py +++ b/python/pdstools/ih/Plots.py @@ -1,6 +1,5 @@ from typing import TYPE_CHECKING, Dict, List, Optional import polars as pl -import plotly.io as pio import plotly as plotly import plotly.express as px import plotly.graph_objs as go @@ -23,9 +22,9 @@ def experiment_gauges( by: Optional[str] = "Channel", positive_labels: Optional[List[str]] = None, negative_labels: Optional[List[str]] = None, - reference_values: Optional[Dict] = None, + reference_values: Optional[Dict[str, float]] = None, title: Optional[str] = "Experiment Overview", - return_df:Optional[bool] = False, + return_df: Optional[bool] = False, ): # TODO currently only supporting a single by @@ -38,7 +37,7 @@ def experiment_gauges( if return_df: return plot_data - + plot_data = plot_data.collect() cols = plot_data[by].unique().shape[0] @@ -104,13 +103,17 @@ def tree_map( self, experiment_field: str, by: Optional[List[str]] = None, - positive_labels: List[str] = None, - negative_labels: List[str] = None, + positive_labels: Optional[List[str]] = None, + negative_labels: Optional[List[str]] = None, title: Optional[str] = "Detailed Click Through Rates", - return_df:Optional[bool] = False, + return_df: Optional[bool] = False, ): if by is None: - by = [f for f in ["Channel", "Issue", "Group", "Name"] if f in self.ih.data.collect_schema().names()] + by = [ + f + for f in ["Channel", "Issue", "Group", "Name"] + if f in self.ih.data.collect_schema().names() + ] plot_data = self.ih.aggregates.summary_by_experiment( experiment_field=experiment_field, @@ -121,7 +124,7 @@ def tree_map( if return_df: return plot_data - + plot_data = plot_data.collect() fig = px.treemap( @@ -135,9 +138,108 @@ def tree_map( title=title, hover_data=["StdErr", "Positives", "Negatives"], height=640, + template="pega", ) fig.update_coloraxes(showscale=False) fig.update_traces(textinfo="label+value") fig.update_layout(margin=dict(t=50, l=25, r=25, b=25)) return fig + + def trend_bar( + self, + experiment_field: str, + every: str = "1d", + by: Optional[str] = None, + positive_labels: Optional[List[str]] = None, + negative_labels: Optional[List[str]] = None, + title: Optional[str] = "Click Through Trend", + return_df: Optional[bool] = False, + ): + + plot_data = self.ih.aggregates.summary_by_experiment( + experiment_field=experiment_field, + every=every, + by=by, + positive_labels=positive_labels, + negative_labels=negative_labels, + ) + if return_df: + return plot_data + + fig = px.bar( + plot_data.collect(), + x="OutcomeTime", + y="CTR", + color=experiment_field, + error_y="StdErr", + facet_row=by, + barmode="group", + custom_data=[experiment_field], + template="pega", + title=title, + ) + fig.update_yaxes(tickformat=",.3%").update_layout(xaxis_title=None) + return fig + + def trend_line( + self, + experiment_field: Optional[str] = None, + every: Optional[str] = "1d", + by: Optional[str] = None, + positive_labels: Optional[List[str]] = None, + negative_labels: Optional[List[str]] = None, + title: Optional[str] = "Click Through Trend", + return_df: Optional[bool] = False, + ): + plot_data = self.ih.aggregates.summary_by_experiment( + experiment_field=experiment_field, + every=every, + by=by, + positive_labels=positive_labels, + negative_labels=negative_labels, + ) + if return_df: + return plot_data + + fig = px.line( + plot_data.collect(), + x="OutcomeTime", + y="CTR", + color=experiment_field, + facet_row=by, + custom_data=[experiment_field] if experiment_field is not None else None, + template="pega", + title=title, + ) + + add_confidence_interval = (experiment_field is None) # doesn't work for multiple lines + if add_confidence_interval: + conf_data = ( + plot_data.select( + x=pl.col("OutcomeTime"), + y_upper=pl.col("CTR") + pl.col("StdErr"), + y_lower=pl.col("CTR") - pl.col("StdErr"), + ) + .collect() + .to_dict(as_series=False) + ) + + # Add continuous interval, see https://plotly.com/python/continuous-error-bars/ + x = conf_data["x"] + y_upper = conf_data["y_upper"] + y_lower = conf_data["y_lower"] + fig.add_trace( + go.Scatter( + x=x + x[::-1], # x, then x reversed + y=y_upper + y_lower[::-1], # upper, then lower reversed + fill="toself", + fillcolor="rgba(0,100,80,0.2)", + line=dict(color="rgba(255,255,255,0)"), + hoverinfo="skip", + showlegend=False, + ) + ) + + fig.update_yaxes(tickformat=",.3%").update_layout(xaxis_title=None) + return fig diff --git a/python/pdstools/utils/cdh_utils.py b/python/pdstools/utils/cdh_utils.py index c1cfa2da..7994b316 100644 --- a/python/pdstools/utils/cdh_utils.py +++ b/python/pdstools/utils/cdh_utils.py @@ -1,6 +1,8 @@ import datetime +from functools import partial import io import logging +from operator import is_not import re import tempfile import warnings @@ -476,7 +478,9 @@ def auc_to_gini(auc: float) -> float: return 2 * safe_range_auc(auc) - 1 -def _capitalize(fields: Union[str, Iterable[str]], extra:Optional[List[str]]=[]) -> List[str]: +def _capitalize( + fields: Union[str, Iterable[str]], extra_endwords: Optional[Iterable[str]] = None +) -> List[str]: """Applies automatic capitalization, aligned with the R couterpart. Parameters @@ -566,7 +570,7 @@ def _capitalize(fields: Union[str, Iterable[str]], extra:Optional[List[str]]=[]) "Strategy", "ModelTechnique", ] - + if not isinstance(fields, list): fields = [fields] fields = [re.sub("^p(x|y|z)", "", field.lower()) for field in fields] @@ -579,9 +583,9 @@ def _capitalize(fields: Union[str, Iterable[str]], extra:Optional[List[str]]=[]) return fields -def _polars_capitalize(df: F, extra:Optional[List[str]]=[]) -> F: +def _polars_capitalize(df: F, extra_endwords: Optional[Iterable[str]] = None) -> F: cols = df.collect_schema().names() - renamed_cols = _capitalize(cols, extra) + renamed_cols = _capitalize(cols, extra_endwords) def deduplicate(columns: List[str]): seen: Dict[str, int] = {} @@ -1151,3 +1155,15 @@ def create_working_and_temp_dir( else tempfile.mkdtemp(prefix="tmp_", dir=working_dir) ) return working_dir, Path(temp_dir_name) + + +# Safe flattening of nested lists, removing None elements, and not splitting strings +def safe_flatten_list(alist: List) -> List: + alist = list(filter(partial(is_not, None), alist)) + alist = [ + item + for sublist in [[item] if type(item) is not list else item for item in alist] + for item in sublist + ] + alist = list(filter(partial(is_not, None), alist)) + return alist