Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove early row count and update batch export metrics #22810

Merged
merged 14 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 45 additions & 41 deletions posthog/api/app_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import uuid
from typing import Any

from django.db.models import Q, Sum
from django.db.models.functions import Coalesce, TruncDay
from django.db.models.functions import TruncDay
from rest_framework import mixins, request, response, viewsets
from rest_framework.decorators import action

Expand All @@ -24,6 +23,7 @@
AppMetricsRequestSerializer,
)
from posthog.utils import relative_date_parse
from posthog.batch_exports.models import fetch_batch_export_run_count


class AppMetricsViewSet(TeamAndOrgViewSetMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet):
Expand All @@ -32,11 +32,8 @@ class AppMetricsViewSet(TeamAndOrgViewSetMixin, mixins.RetrieveModelMixin, views

def retrieve(self, request: request.Request, *args: Any, **kwargs: Any) -> response.Response:
try:
rows = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"])
dates, successes, failures = self.get_batch_export_runs_app_metrics_queryset(batch_export_id=kwargs["pk"])

dates = [row["dates"].strftime("%Y-%m-%d") for row in rows]
successes = [row["successes"] for row in rows]
failures = [row["failures"] for row in rows]
return response.Response(
{
"metrics": {
Expand Down Expand Up @@ -83,30 +80,7 @@ def error_details(self, request: request.Request, *args: Any, **kwargs: Any) ->
return response.Response({"result": error_details})

def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str):
"""Use the Django ORM to fetch app metrics for batch export runs.

Attempts to (roughly) match the following (much more readable) query:
```
select
date_trunc('day', last_updated_at) as dates,
sum(case when status = 'Completed' then coalesce(records_total_count, 0) else 0) as successes,
sum(case when status != 'Completed' then coalesce(records_total_count, 0) else 0) as failures
from
posthog_batchexportrun
where
batch_export_id = :batch_export_id
and last_updated_at between :date_from and :date_to
and status != 'Running'
group by
date_trunc('day', last_updated_at)
order by
dates
```

A truncated 'last_updated_at' is used as the grouping date as it reflects when a particular run
was last updated. It feels easier to explain to users that if they see metrics for today, those
correspond to runs that happened today, even if the runs themselves exported data from a year ago
(because it was a backfill).
"""Use the Django ORM and ClickHouse to fetch app metrics for batch export runs.

Raises:
ValueError: If provided 'batch_export_id' is not a valid UUID.
Expand All @@ -120,22 +94,52 @@ def get_batch_export_runs_app_metrics_queryset(self, batch_export_id: str):
relative_date_parse(before, self.team.timezone_info) if before else dt.datetime.now(dt.timezone.utc)
)
date_range = (after_datetime, before_datetime)
return (
BatchExportRun.objects.filter(batch_export_id=batch_export_uuid, last_updated_at__range=date_range)
.annotate(dates=TruncDay("last_updated_at"))
.values("dates")
.annotate(
successes=Sum(
Coalesce("records_total_count", 0), filter=Q(status=BatchExportRun.Status.COMPLETED), default=0
),
failures=Sum(
Coalesce("records_total_count", 0), filter=~Q(status=BatchExportRun.Status.COMPLETED), default=0
runs = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this get all the runs, not just the latest for that date range? We only want the latest, if something failed, but was retried, then we'd only want to show success in the sparkgraph and metrics (because a user ultimately doesn't care if it was retried as long as it was exported same as webhooks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if nothing failed but it was retried (manually, by the user) anyways? I think it makes sense to show duplicates if the user requests duplicates. So, we need all runs.

Copy link
Contributor Author

@tomasfarias tomasfarias Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, no new runs are automatically created in the event of failure, unless manually requested by us or users. Retries are part of a run, not a separate run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 would you mind changing the labels in the UI too with this PR 'events' -> 'runs' in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, latest commit changed this to runs. I'll make the UI change next.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in latest commit, looks like this:
image

BatchExportRun.objects.filter(
batch_export_id=batch_export_uuid,
last_updated_at__range=date_range,
status__in=(
BatchExportRun.Status.COMPLETED,
BatchExportRun.Status.FAILED,
BatchExportRun.Status.FAILED_RETRYABLE,
),
)
.order_by("dates")
.annotate(day=TruncDay("last_updated_at"))
.order_by("day")
.all()
)

dates = []
successes = []
failures = []
current_day: dt.datetime | None = None
for run in runs:
if current_day is None:
current_day = run.day
dates.append(run.day.strftime("%Y-%m-%d"))
successes.append(0)
failures.append(0)

elif current_day < run.day:
current_day = run.day
dates.append(run.day.strftime("%Y-%m-%d"))
successes.append(0)
failures.append(0)

count = fetch_batch_export_run_count(
team_id=run.batch_export.team_id,
data_interval_start=run.data_interval_start,
data_interval_end=run.data_interval_end,
)
Copy link
Contributor

@tiina303 tiina303 Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO a metric based on number of events should never exist. We should instead look at count of runs only.

ok, lets instead look at time ranges exported which is easy to do over runs, we can make all the graphs say "exports succeeded" / "exports failed" (because it's the latest run only not all runs) instead of "events sent" / "events failed" in the UI and we can remove this.


if run.status == BatchExportRun.Status.COMPLETED:
successes[-1] += count

elif run.status in (BatchExportRun.Status.FAILED, BatchExportRun.Status.FAILED_RETRYABLE):
failures[-1] += count

return dates, successes, failures


class HistoricalExportsAppMetricsViewSet(
TeamAndOrgViewSetMixin,
Expand Down
31 changes: 25 additions & 6 deletions posthog/api/test/test_app_metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime as dt
import json
import uuid
from unittest import mock

from freezegun.api import freeze_time
Expand All @@ -8,6 +9,7 @@
from posthog.api.test.batch_exports.conftest import start_test_worker
from posthog.api.test.batch_exports.operations import create_batch_export_ok
from posthog.batch_exports.models import BatchExportRun
from posthog.client import sync_execute
from posthog.models.activity_logging.activity_log import Detail, Trigger, log_activity
from posthog.models.plugin import Plugin, PluginConfig
from posthog.models.utils import UUIDT
Expand All @@ -18,6 +20,23 @@
SAMPLE_PAYLOAD = {"dateRange": ["2021-06-10", "2022-06-12"], "parallelism": 1}


def insert_event(
team_id: int,
timestamp: dt.datetime,
):
sync_execute(
"INSERT INTO `sharded_events` (uuid, team_id, event, timestamp) VALUES",
[
{
"uuid": uuid.uuid4(),
"team_id": team_id,
"event": "test_event",
"timestamp": timestamp,
}
],
)


@freeze_time("2021-12-05T13:23:00Z")
class TestAppMetricsAPI(ClickhouseTestMixin, APIBaseTest):
maxDiff = None
Expand Down Expand Up @@ -119,18 +138,19 @@ def test_retrieve_batch_export_runs_app_metrics(self):
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
records_completed=3,
records_total_count=3,
)
for _ in range(3):
insert_event(team_id=self.team.pk, timestamp=last_updated_at - dt.timedelta(minutes=1))

BatchExportRun.objects.create(
batch_export_id=batch_export_id,
data_interval_end=last_updated_at - dt.timedelta(hours=2),
data_interval_start=last_updated_at - dt.timedelta(hours=3),
status=BatchExportRun.Status.FAILED,
records_completed=0,
records_total_count=5,
)
for _ in range(5):
timestamp = last_updated_at - dt.timedelta(hours=2, minutes=1)
insert_event(team_id=self.team.pk, timestamp=timestamp)

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
self.assertEqual(response.status_code, status.HTTP_200_OK)
Expand Down Expand Up @@ -197,9 +217,8 @@ def test_retrieve_batch_export_runs_app_metrics_defaults_to_zero(self):
data_interval_end=last_updated_at,
data_interval_start=last_updated_at - dt.timedelta(hours=1),
status=BatchExportRun.Status.COMPLETED,
records_completed=1,
records_total_count=1,
)
insert_event(team_id=self.team.pk, timestamp=last_updated_at - dt.timedelta(minutes=1))

response = self.client.get(f"/api/projects/@current/app_metrics/{batch_export_id}?date_from=-7d")
self.assertEqual(response.status_code, status.HTTP_200_OK)
Expand Down
40 changes: 40 additions & 0 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections.abc
import dataclasses
import datetime as dt
import enum
Expand Down Expand Up @@ -116,6 +117,45 @@ class Status(models.TextChoices):
)


def fetch_batch_export_run_count(
*,
team_id: int,
data_interval_start: dt.datetime,
data_interval_end: dt.datetime,
exclude_events: collections.abc.Iterable[str] | None = None,
include_events: collections.abc.Iterable[str] | None = None,
) -> int:
"""Fetch a list of batch export log entries from ClickHouse."""
if exclude_events:
exclude_events_statement = f"AND event NOT IN ({','.join(exclude_events)})"
else:
exclude_events_statement = ""

if include_events:
include_events_statement = f"AND event IN ({','.join(include_events)})"
else:
include_events_statement = ""

data_interval_start_ch = data_interval_start.strftime("%Y-%m-%d %H:%M:%S")
data_interval_end_ch = data_interval_end.strftime("%Y-%m-%d %H:%M:%S")

clickhouse_query = f"""
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved
SELECT count(*)
FROM events
WHERE
team_id = {team_id}
AND timestamp >= toDateTime64('{data_interval_start_ch}', 6, 'UTC')
AND timestamp < toDateTime64('{data_interval_end_ch}', 6, 'UTC')
{exclude_events_statement}
{include_events_statement}
"""

try:
return sync_execute(clickhouse_query)[0][0]
except Exception:
return 0


BATCH_EXPORT_INTERVALS = [
("hour", "hour"),
("day", "day"),
Expand Down
50 changes: 3 additions & 47 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import collections.abc
import dataclasses
import datetime as dt
Expand Down Expand Up @@ -27,7 +26,7 @@
get_export_finished_metric,
get_export_started_metric,
)
from posthog.temporal.common.clickhouse import ClickHouseClient, get_client
from posthog.temporal.common.clickhouse import ClickHouseClient
from posthog.temporal.common.client import connect
from posthog.temporal.common.logger import bind_temporal_worker_logger

Expand Down Expand Up @@ -329,12 +328,11 @@ class StartBatchExportRunInputs:
is_backfill: bool = False


RecordsTotalCount = int | None
BatchExportRunId = str


@activity.defn
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[BatchExportRunId, RecordsTotalCount]:
async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId:
"""Activity that creates an BatchExportRun and returns the count of records to export.

Intended to be used in all export workflows, usually at the start, to create a model
Expand All @@ -350,56 +348,14 @@ async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> tuple[Bat
inputs.data_interval_end,
)

delta = dt.datetime.fromisoformat(inputs.data_interval_end) - dt.datetime.fromisoformat(inputs.data_interval_start)
async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

try:
count = await asyncio.wait_for(
get_rows_count(
client=client,
team_id=inputs.team_id,
interval_start=inputs.data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
is_backfill=inputs.is_backfill,
),
timeout=(delta / 12).total_seconds(),
)
except asyncio.TimeoutError:
count = None

if count is None:
logger.info(
"Batch export for range %s - %s will continue without a count of rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)
elif count > 0:
logger.info(
"Batch export for range %s - %s will export %s rows",
inputs.data_interval_start,
inputs.data_interval_end,
count,
)
else:
logger.info(
"Batch export for range %s - %s has no rows to export",
inputs.data_interval_start,
inputs.data_interval_end,
)

run = await acreate_batch_export_run(
batch_export_id=uuid.UUID(inputs.batch_export_id),
data_interval_start=inputs.data_interval_start,
data_interval_end=inputs.data_interval_end,
status=BatchExportRun.Status.STARTING,
records_total_count=count,
)

return str(run.id), count
return str(run.id)


@dataclasses.dataclass
Expand Down
Loading
Loading