-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(query_log): add query_log table to hogql (#26822)
- Loading branch information
Showing
5 changed files
with
487 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
from typing import Any | ||
|
||
from posthog.hogql import ast | ||
from posthog.hogql.database.models import ( | ||
IntegerDatabaseField, | ||
StringDatabaseField, | ||
DateTimeDatabaseField, | ||
LazyTable, | ||
FieldOrTable, | ||
LazyTableToAdd, | ||
FloatDatabaseField, | ||
FunctionCallTable, | ||
BooleanDatabaseField, | ||
) | ||
|
||
QUERY_LOG_FIELDS: dict[str, FieldOrTable] = { | ||
"query_id": StringDatabaseField(name="query_id"), | ||
"query": StringDatabaseField(name="query"), # | ||
"query_start_time": DateTimeDatabaseField(name="event_time"), # | ||
"query_duration_ms": FloatDatabaseField(name="query_duration_ms"), # | ||
"created_by": IntegerDatabaseField(name="created_by"), | ||
"read_rows": IntegerDatabaseField(name="read_rows"), | ||
"read_bytes": IntegerDatabaseField(name="read_bytes"), | ||
"result_rows": IntegerDatabaseField(name="result_rows"), | ||
"result_bytes": IntegerDatabaseField(name="result_bytes"), | ||
"memory_usage": IntegerDatabaseField(name="memory_usage"), | ||
"status": StringDatabaseField(name="type"), | ||
"kind": StringDatabaseField(name="kind"), | ||
"query_type": StringDatabaseField(name="query_type"), | ||
"is_personal_api_key_request": BooleanDatabaseField(name="is_personal_api_key_request"), | ||
} | ||
|
||
RAW_QUERY_LOG_FIELDS: dict[str, FieldOrTable] = QUERY_LOG_FIELDS | { | ||
# below fields are necessary to compute some of the resulting fields | ||
"type": StringDatabaseField(name="type"), | ||
"is_initial_query": BooleanDatabaseField(name="is_initial_query"), | ||
"log_comment": StringDatabaseField(name="log_comment"), | ||
} | ||
|
||
STRING_FIELDS = { | ||
"query_type": ["query_type"], | ||
"query_id": ["client_query_id"], | ||
"query": ["query", "query"], | ||
"kind": ["query", "kind"], | ||
} | ||
INT_FIELDS = {"created_by": ["user_id"]} | ||
|
||
|
||
class QueryLogTable(LazyTable): | ||
fields: dict[str, FieldOrTable] = QUERY_LOG_FIELDS | ||
|
||
def to_printed_clickhouse(self, context) -> str: | ||
return "query_log" | ||
|
||
def to_printed_hogql(self) -> str: | ||
return "query_log" | ||
|
||
def lazy_select(self, table_to_add: LazyTableToAdd, context, node) -> Any: | ||
requested_fields = table_to_add.fields_accessed | ||
|
||
raw_table_name = "raw_query_log" | ||
|
||
def get_alias(name, chain): | ||
if name in STRING_FIELDS: | ||
keys = STRING_FIELDS[name] | ||
expr = ast.Call( | ||
name="JSONExtractString", | ||
args=[ast.Field(chain=[raw_table_name, "log_comment"])] + [ast.Constant(value=v) for v in keys], | ||
) | ||
return ast.Alias(alias=name, expr=expr) | ||
if name in INT_FIELDS: | ||
keys = INT_FIELDS[name] | ||
expr = ast.Call( | ||
name="JSONExtractInt", | ||
args=[ast.Field(chain=[raw_table_name, "log_comment"])] + [ast.Constant(value=v) for v in keys], | ||
) | ||
return ast.Alias(alias=name, expr=expr) | ||
if name == "is_personal_api_key_request": | ||
cmp_expr = ast.CompareOperation( | ||
op=ast.CompareOperationOp.Eq, | ||
left=ast.Constant(value="personal_api_key"), | ||
right=ast.Call( | ||
name="JSONExtractString", | ||
args=[ast.Field(chain=["log_comment"]), ast.Constant(value="access_method")], | ||
), | ||
) | ||
return ast.Alias(alias=name, expr=cmp_expr) | ||
return ast.Alias(alias=name, expr=ast.Field(chain=[raw_table_name, *chain])) | ||
|
||
fields: list[ast.Expr] = [get_alias(name, chain) for name, chain in requested_fields.items()] | ||
|
||
return ast.SelectQuery( | ||
select=fields, | ||
select_from=ast.JoinExpr(table=ast.Field(chain=[raw_table_name])), | ||
where=ast.And( | ||
exprs=[ | ||
ast.CompareOperation( | ||
op=ast.CompareOperationOp.Eq, | ||
left=ast.Constant(value=context.project_id), | ||
right=ast.Call( | ||
name="JSONExtractInt", | ||
args=[ast.Field(chain=["log_comment"]), ast.Constant(value="user_id")], | ||
), | ||
), | ||
ast.CompareOperation( | ||
op=ast.CompareOperationOp.In, | ||
left=ast.Field(chain=["type"]), | ||
right=ast.Array( | ||
exprs=[ | ||
ast.Constant(value="QueryFinish"), | ||
ast.Constant(value="ExceptionBeforeStart"), | ||
ast.Constant(value="ExceptionWhileProcessing"), | ||
] | ||
), | ||
), | ||
ast.Field(chain=["is_initial_query"]), | ||
] | ||
), | ||
) | ||
|
||
|
||
class RawQueryLogTable(FunctionCallTable): | ||
fields: dict[str, FieldOrTable] = RAW_QUERY_LOG_FIELDS | ||
|
||
name: str = "raw_query_log" | ||
|
||
def to_printed_clickhouse(self, context) -> str: | ||
return "clusterAllReplicas(posthog, system.query_log)" | ||
|
||
def to_printed_hogql(self) -> str: | ||
return "query_log" |
53 changes: 53 additions & 0 deletions
53
posthog/hogql/database/schema/test/test_table_query_log.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
from unittest.mock import MagicMock, patch | ||
from posthog.clickhouse.client import sync_execute | ||
from posthog.hogql.context import HogQLContext | ||
from posthog.hogql.database.database import create_hogql_database | ||
from posthog.hogql.query import execute_hogql_query | ||
from posthog.test.base import ( | ||
APIBaseTest, | ||
ClickhouseTestMixin, | ||
) | ||
|
||
|
||
class TestPersonOptimization(ClickhouseTestMixin, APIBaseTest): | ||
""" | ||
Mostly tests for the optimization of pre-filtering before aggregating. See https://github.com/PostHog/posthog/pull/25604 | ||
""" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.database = create_hogql_database(self.team.pk) | ||
self.context = HogQLContext(database=self.database, team_id=self.team.pk, enable_select_queries=True) | ||
|
||
@patch("posthog.hogql.query.sync_execute", wraps=sync_execute) | ||
def test_dumb_query(self, mock_sync_execute: MagicMock): | ||
response = execute_hogql_query("select query_start_time from query_log limit 10", self.team) | ||
|
||
ch_query = f"""SELECT | ||
query_log.query_start_time AS query_start_time | ||
FROM | ||
(SELECT | ||
toTimeZone(raw_query_log.event_time, %(hogql_val_0)s) AS query_start_time | ||
FROM | ||
clusterAllReplicas(posthog, system.query_log) AS raw_query_log | ||
WHERE | ||
and(ifNull(equals({self.team.pk}, JSONExtractInt(raw_query_log.log_comment, %(hogql_val_1)s)), 0), in(raw_query_log.type, [%(hogql_val_2)s, %(hogql_val_3)s, %(hogql_val_4)s]), raw_query_log.is_initial_query)) AS query_log | ||
LIMIT 10 SETTINGS readonly=2, max_execution_time=60, allow_experimental_object_type=1, format_csv_allow_double_quotes=0, max_ast_elements=4000000, max_expanded_ast_elements=4000000, max_bytes_before_external_group_by=0""" | ||
|
||
from unittest.mock import ANY | ||
|
||
mock_sync_execute.assert_called_once_with( | ||
ch_query, | ||
{ | ||
"hogql_val_0": "UTC", | ||
"hogql_val_1": "user_id", | ||
"hogql_val_2": "QueryFinish", | ||
"hogql_val_3": "ExceptionBeforeStart", | ||
"hogql_val_4": "ExceptionWhileProcessing", | ||
}, | ||
with_column_types=True, | ||
workload=ANY, | ||
team_id=self.team.pk, | ||
readonly=True, | ||
) | ||
assert response.results is not None |
Oops, something went wrong.