Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: smaller cohort resource usage for replay filters #25699

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Any, NamedTuple, cast, Optional, Union
from datetime import datetime, timedelta

import posthoganalytics

from posthog.hogql import ast
from posthog.hogql.ast import CompareOperation
from posthog.hogql.parser import parse_select
Expand Down Expand Up @@ -33,12 +35,24 @@ def is_group_property(p: Property) -> bool:
return p.type == "group"


def is_dynamic_cohort_property(p: Property) -> bool:
return p.type == "cohort"


def is_static_cohort_property(p: Property) -> bool:
return p.type == "precalculated-cohort"


class SessionRecordingQueryResult(NamedTuple):
results: list
has_more_recording: bool
timings: list[QueryTiming] | None = None


class UnexpectedQueryProperties(Exception):
pass


class SessionRecordingListFromFilters:
SESSION_RECORDINGS_DEFAULT_LIMIT = 50

Expand Down Expand Up @@ -224,11 +238,19 @@ def _where_predicates(self) -> Union[ast.And, ast.Or]:
)
)

remaining_properties = self._strip_person_and_event_properties(self._filter.property_groups)
if remaining_properties:
logger.info(
"session_replay_query_builder has unhandled properties", unhandled_properties=remaining_properties
cohort_subquery = CohortPropertyGroupsSubQuery(self._team, self._filter, self.ttl_days).get_queries()
if cohort_subquery:
optional_exprs.append(
ast.CompareOperation(
op=ast.CompareOperationOp.In,
left=ast.Field(chain=["s", "distinct_id"]),
right=cohort_subquery,
)
)

remaining_properties = self._strip_person_and_event_and_cohort_properties(self._filter.property_groups)
if remaining_properties:
posthoganalytics.capture_exception(UnexpectedQueryProperties(remaining_properties))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think anything in here is likely going to cause a join and impact performance, lets use exception capture to see when that happens

optional_exprs.append(property_to_expr(remaining_properties, team=self._team, scope="replay"))

if self._filter.console_log_filters.values:
Expand Down Expand Up @@ -267,11 +289,15 @@ def _where_predicates(self) -> Union[ast.And, ast.Or]:
def _having_predicates(self) -> ast.Expr:
return property_to_expr(self._filter.having_predicates, team=self._team, scope="replay")

def _strip_person_and_event_properties(self, property_group: PropertyGroup) -> PropertyGroup | None:
def _strip_person_and_event_and_cohort_properties(self, property_group: PropertyGroup) -> PropertyGroup | None:
property_groups_to_keep = [
g
for g in property_group.flat
if not is_event_property(g) and not is_person_property(g) and not is_group_property(g)
if not is_event_property(g)
and not is_person_property(g)
and not is_group_property(g)
and not is_dynamic_cohort_property(g)
and not is_static_cohort_property(g)
]

return (
Expand Down Expand Up @@ -334,6 +360,53 @@ def _where_predicates(self) -> ast.Expr:
)


class CohortPropertyGroupsSubQuery:
_team: Team
_filter: SessionRecordingsFilter
_ttl_days: int

raw_cohort_to_distinct_id = """
select distinct_id
from person_distinct_ids
where person_id IN (
SELECT person_id
FROM cohort_people
pauldambra marked this conversation as resolved.
Show resolved Hide resolved
WHERE
{where_predicates}
)
"""

def __init__(self, team: Team, filter: SessionRecordingsFilter, ttl_days: int):
self._team = team
self._filter = filter
self._ttl_days = ttl_days

def get_queries(self) -> ast.SelectQuery | ast.SelectUnionQuery | None:
if self.cohort_properties:
return parse_select(
self.raw_cohort_to_distinct_id,
{"where_predicates": property_to_expr(self.cohort_properties, team=self._team, scope="replay")},
)

return None

@cached_property
def cohort_properties(self) -> PropertyGroup | None:
cohort_property_groups = [
g
for g in self._filter.property_groups.flat
if is_dynamic_cohort_property(g) or is_static_cohort_property(g)
]
return (
PropertyGroup(
type=self._filter.property_operand,
values=cohort_property_groups,
)
if cohort_property_groups
else None
)


class PersonsIdCompareOperation:
_team: Team
_filter: SessionRecordingsFilter
Expand Down
Loading
Loading