Skip to content

Commit

Permalink
[postgres] Fix UnboundLocalError caused by referencing local variable…
Browse files Browse the repository at this point in the history
… start_time before assigned (#18870)

* fix unboundlocal error

* add test

* add changelog
  • Loading branch information
lu-zhengda authored Oct 18, 2024
1 parent 8f4378c commit 34afd9d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 85 deletions.
1 change: 1 addition & 0 deletions postgres/changelog.d/18870.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `UnboundLocalError` in postgres schema collection, ensuring proper reset of `_is_schemas_collection_in_progress` to allow consecutive collections.
177 changes: 92 additions & 85 deletions postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,98 +275,105 @@ def report_postgres_metadata(self):
}
self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding))

elapsed_s_schemas = time.time() - self._last_schemas_query_time
if (
self._collect_schemas_enabled
and not self._is_schemas_collection_in_progress
and elapsed_s_schemas >= self.schemas_collection_interval
):
self._is_schemas_collection_in_progress = True
status = "success"
try:
schema_metadata = self._collect_schema_info()
# We emit an event for each batch of tables to reduce total data in memory
# and keep event size reasonable
base_event = {
"host": self._check.resolved_hostname,
"agent_version": datadog_agent.get_version(),
"dbms": "postgres",
"kind": "pg_databases",
"collection_interval": self.schemas_collection_interval,
"dbms_version": self._payload_pg_version(),
"tags": self._tags_no_db,
"cloud_metadata": self._config.cloud_metadata,
}

# Tuned from experiments on staging, we may want to make this dynamic based on schema size in the future
chunk_size = 50
total_tables = 0
start_time = time.time()

for database in schema_metadata:
dbname = database["name"]
if not self._should_collect_metadata(dbname, "database"):
continue

with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
for schema in database["schemas"]:
if not self._should_collect_metadata(schema["name"], "schema"):
continue

tables = self._query_tables_for_schema(cursor, schema["id"], dbname)
self._log.debug(
"Tables found for schema '{schema}' in database '{database}':"
"{tables}".format(
schema=database["schemas"],
database=dbname,
tables=[table["name"] for table in tables],
)
)
table_chunks = list(get_list_chunks(tables, chunk_size))
if not self._collect_schemas_enabled:
self._log.debug("Skipping schema collection because it is disabled")
return
if self._is_schemas_collection_in_progress:
self._log.debug("Skipping schema collection because it is in progress")
return
if time.time() - self._last_schemas_query_time < self.schemas_collection_interval:
self._log.debug("Skipping schema collection because it was recently collected")
return

self._collect_postgres_schemas()

buffer_column_count = 0
tables_buffer = []
@tracked_method(agent_check_getter=agent_check_getter)
def _collect_postgres_schemas(self):
self._is_schemas_collection_in_progress = True
status = "success"
start_time = time.time()
total_tables = 0
try:
schema_metadata = self._collect_schema_info()
# We emit an event for each batch of tables to reduce total data in memory
# and keep event size reasonable
base_event = {
"host": self._check.resolved_hostname,
"agent_version": datadog_agent.get_version(),
"dbms": "postgres",
"kind": "pg_databases",
"collection_interval": self.schemas_collection_interval,
"dbms_version": self._payload_pg_version(),
"tags": self._tags_no_db,
"cloud_metadata": self._config.cloud_metadata,
}

# Tuned from experiments on staging, we may want to make this dynamic based on schema size in the future
chunk_size = 50

for database in schema_metadata:
dbname = database["name"]
if not self._should_collect_metadata(dbname, "database"):
continue

with self.db_pool.get_connection(dbname, self._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
for schema in database["schemas"]:
if not self._should_collect_metadata(schema["name"], "schema"):
continue

tables = self._query_tables_for_schema(cursor, schema["id"], dbname)
self._log.debug(
"Tables found for schema '{schema}' in database '{database}':"
"{tables}".format(
schema=database["schemas"],
database=dbname,
tables=[table["name"] for table in tables],
)
)
table_chunks = list(get_list_chunks(tables, chunk_size))

for tables in table_chunks:
table_info = self._query_table_information(cursor, schema['name'], tables)
buffer_column_count = 0
tables_buffer = []

tables_buffer = [*tables_buffer, *table_info]
for t in table_info:
buffer_column_count += len(t.get("columns", []))
for tables in table_chunks:
table_info = self._query_table_information(cursor, schema['name'], tables)

if buffer_column_count >= 100_000:
self._flush_schema(base_event, database, schema, tables_buffer)
total_tables += len(tables_buffer)
tables_buffer = []
buffer_column_count = 0
tables_buffer = [*tables_buffer, *table_info]
for t in table_info:
buffer_column_count += len(t.get("columns", []))

if len(tables_buffer) > 0:
if buffer_column_count >= 100_000:
self._flush_schema(base_event, database, schema, tables_buffer)
total_tables += len(tables_buffer)
except Exception as e:
self._log.error("Error collecting schema metadata: %s", e)
status = "error"
finally:
elapsed_ms = (time.time() - start_time) * 1000
self._check.histogram(
"dd.postgres.schema.time",
elapsed_ms,
tags=self._check.tags + ["status:" + status],
hostname=self._check.resolved_hostname,
raw=True,
)
self._check.gauge(
"dd.postgres.schema.tables_count",
total_tables,
tags=self._check.tags + ["status:" + status],
hostname=self._check.resolved_hostname,
raw=True,
)
datadog_agent.emit_agent_telemetry("postgres", "schema_tables_elapsed_ms", elapsed_ms, "gauge")
datadog_agent.emit_agent_telemetry("postgres", "schema_tables_count", total_tables, "gauge")

self._is_schemas_collection_in_progress = False
tables_buffer = []
buffer_column_count = 0

if len(tables_buffer) > 0:
self._flush_schema(base_event, database, schema, tables_buffer)
total_tables += len(tables_buffer)
except Exception as e:
self._log.error("Error collecting schema metadata: %s", e)
status = "error"
finally:
self._is_schemas_collection_in_progress = False
elapsed_ms = (time.time() - start_time) * 1000
self._check.histogram(
"dd.postgres.schema.time",
elapsed_ms,
tags=self._check.tags + ["status:" + status],
hostname=self._check.resolved_hostname,
raw=True,
)
self._check.gauge(
"dd.postgres.schema.tables_count",
total_tables,
tags=self._check.tags + ["status:" + status],
hostname=self._check.resolved_hostname,
raw=True,
)
datadog_agent.emit_agent_telemetry("postgres", "schema_tables_elapsed_ms", elapsed_ms, "gauge")
datadog_agent.emit_agent_telemetry("postgres", "schema_tables_count", total_tables, "gauge")

def _should_collect_metadata(self, name, metadata_type):
for re_str in self._config.schemas_metadata_config.get(
Expand Down
33 changes: 33 additions & 0 deletions postgres/tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from concurrent.futures.thread import ThreadPoolExecutor
from typing import List

import mock
import pytest

from datadog_checks.base.utils.db.utils import DBMAsyncJob
Expand Down Expand Up @@ -371,6 +372,38 @@ def test_collect_schemas_max_tables(integration_check, dbm_instance, aggregator)
assert len(database_metadata[0]['schemas'][0]['tables']) == 1


def test_collect_schemas_interrupted(integration_check, dbm_instance, aggregator):
dbm_instance["collect_schemas"] = {'enabled': True, 'collection_interval': 0.5, 'max_tables': 1}
dbm_instance['relations'] = []
dbm_instance["database_autodiscovery"] = {"enabled": True, "include": ["datadog"]}
del dbm_instance['dbname']
check = integration_check(dbm_instance)
with mock.patch('datadog_checks.postgres.metadata.PostgresMetadata._collect_schema_info', side_effect=Exception):
run_one_check(check, dbm_instance)
# ensures _is_schemas_collection_in_progress is reset to False after an exception
assert check.metadata_samples._is_schemas_collection_in_progress is False
dbm_metadata = aggregator.get_event_platform_events("dbm-metadata")
assert [e for e in dbm_metadata if e['kind'] == 'pg_databases'] == []

# next run should succeed
run_one_check(check, dbm_instance)
dbm_metadata = aggregator.get_event_platform_events("dbm-metadata")

for schema_event in (e for e in dbm_metadata if e['kind'] == 'pg_databases'):
database_metadata = schema_event['metadata']
assert len(database_metadata[0]['schemas'][0]['tables']) == 1

# Rerun check with relations enabled
dbm_instance['relations'] = [{'relation_regex': '.*'}]
check = integration_check(dbm_instance)
run_one_check(check, dbm_instance)
dbm_metadata = aggregator.get_event_platform_events("dbm-metadata")

for schema_event in (e for e in dbm_metadata if e['kind'] == 'pg_databases'):
database_metadata = schema_event['metadata']
assert len(database_metadata[0]['schemas'][0]['tables']) == 1


def assert_fields(keys: List[str], fields: List[str]):
for field in fields:
assert field in keys
Expand Down

0 comments on commit 34afd9d

Please sign in to comment.