Skip to content

Commit

Permalink
feat: Revert "Revert "feat: Add inserted_at column to events table"" (#…
Browse files Browse the repository at this point in the history
…16615)

Revert "Revert "feat: Add inserted_at column to events table" (#16535)"

This reverts commit 59847f1.
  • Loading branch information
fuziontech authored Jul 17, 2023
1 parent ff66de0 commit 53e82ed
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from infi.clickhouse_orm import migrations

from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.client import sync_execute
from posthog.models.event.sql import (
EVENTS_TABLE_JSON_MV_SQL,
KAFKA_EVENTS_TABLE_JSON_SQL,
)
from posthog.settings import CLICKHOUSE_CLUSTER

ADD_COLUMNS_BASE_SQL = """
ALTER TABLE {table}
ON CLUSTER '{cluster}'
ADD COLUMN IF NOT EXISTS inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL
"""


def add_columns_to_required_tables(_):
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="events", cluster=CLICKHOUSE_CLUSTER))
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="writable_events", cluster=CLICKHOUSE_CLUSTER))
sync_execute(ADD_COLUMNS_BASE_SQL.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER))


operations = [
run_sql_with_exceptions(f"DROP TABLE IF EXISTS events_json_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
run_sql_with_exceptions(f"DROP TABLE IF EXISTS kafka_events_json ON CLUSTER '{CLICKHOUSE_CLUSTER}'"),
migrations.RunPython(add_columns_to_required_tables),
run_sql_with_exceptions(KAFKA_EVENTS_TABLE_JSON_SQL()),
run_sql_with_exceptions(EVENTS_TABLE_JSON_MV_SQL()),
]
9 changes: 5 additions & 4 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@

, _timestamp DateTime
, _offset UInt64

, inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL

) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_events', sipHash64(distinct_id))

Expand Down Expand Up @@ -538,6 +538,7 @@
group2_created_at,
group3_created_at,
group4_created_at,
NOW64() AS inserted_at,
_timestamp,
_offset
FROM posthog_test.kafka_events_json
Expand Down Expand Up @@ -1456,7 +1457,7 @@

, _timestamp DateTime
, _offset UInt64

, inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL

, INDEX kafka_timestamp_minmax_sharded_events _timestamp TYPE minmax GRANULARITY 3

Expand Down Expand Up @@ -1656,7 +1657,7 @@

, _timestamp DateTime
, _offset UInt64

, inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL

) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_events', sipHash64(distinct_id))

Expand Down Expand Up @@ -2074,7 +2075,7 @@

, _timestamp DateTime
, _offset UInt64

, inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL

, INDEX kafka_timestamp_minmax_sharded_events _timestamp TYPE minmax GRANULARITY 3

Expand Down
22 changes: 17 additions & 5 deletions posthog/models/event/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@

from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine, trim_quotes_expr
from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme
from posthog.clickhouse.kafka_engine import (
KAFKA_COLUMNS,
STORAGE_POLICY,
kafka_engine,
trim_quotes_expr,
)
from posthog.clickhouse.table_engines import (
Distributed,
ReplacingMergeTree,
ReplicationScheme,
)
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON

EVENTS_DATA_TABLE = lambda: "sharded_events"
Expand All @@ -14,6 +23,8 @@
)
DROP_EVENTS_TABLE_SQL = lambda: f"DROP TABLE IF EXISTS {EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"

INSERTED_AT_COLUMN = ", inserted_at Nullable(DateTime64(6, 'UTC')) DEFAULT NULL"

EVENTS_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
Expand Down Expand Up @@ -86,7 +97,7 @@
table_name=EVENTS_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
engine=EVENTS_DATA_TABLE_ENGINE(),
extra_fields=KAFKA_COLUMNS,
extra_fields=KAFKA_COLUMNS + INSERTED_AT_COLUMN,
materialized_columns=EVENTS_TABLE_MATERIALIZED_COLUMNS,
indexes=f"""
, {index_by_kafka_timestamp(EVENTS_DATA_TABLE())}
Expand Down Expand Up @@ -138,6 +149,7 @@
group2_created_at,
group3_created_at,
group4_created_at,
NOW64() AS inserted_at,
_timestamp,
_offset
FROM {database}.kafka_events_json
Expand All @@ -154,7 +166,7 @@
table_name="writable_events",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
extra_fields=KAFKA_COLUMNS,
extra_fields=KAFKA_COLUMNS + INSERTED_AT_COLUMN,
materialized_columns="",
indexes="",
)
Expand All @@ -164,7 +176,7 @@
table_name="events",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=EVENTS_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
extra_fields=KAFKA_COLUMNS,
extra_fields=KAFKA_COLUMNS + INSERTED_AT_COLUMN,
materialized_columns=EVENTS_TABLE_PROXY_MATERIALIZED_COLUMNS,
indexes="",
)
Expand Down

0 comments on commit 53e82ed

Please sign in to comment.