diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py index ab3eb8ef24773..be1e0758e1feb 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_metrics.py @@ -53,6 +53,9 @@ def compute_derivative_rows(self, rows, metrics, key): ) for row_key, row in merged_rows.items(): + if row['query_signature'] == '94caeb4c54f97849': + logger.warning("row: %s", (row['schema_name'],row['digest'],row['sum_rows_examined'])) + prev = self._previous_statements.get(row_key) if prev is None: continue @@ -73,15 +76,14 @@ def compute_derivative_rows(self, rows, metrics, key): diffed_row = {k: row[k] - prev[k] if k in metric_columns else row[k] for k in row.keys()} - # if row['query_signature'] == '94caeb4c54f97849': - # logger.warning("row: %s", row) - # logger.warning("prev: %s", prev) - # logger.warning("diffed: %s", diffed_row) + if row['query_signature'] == '94caeb4c54f97849': + logger.warning("prev: %s", prev) + logger.warning("diffed: %s", diffed_row) # Check for negative values, but only in the columns used for metrics if any(diffed_row[k] < 0 for k in metric_columns): - # if row['query_signature'] == '94caeb4c54f97849': - # logger.warning("- value for %s in %s", row_key, diffed_row) + if row['query_signature'] == '94caeb4c54f97849': + logger.warning("- value for %s in %s", row_key, diffed_row) # A "break" might be expected here instead of "continue," but there are cases where a subset of rows # are removed. To avoid situations where all results are discarded every check run, we err on the side # of potentially including truncated rows that exceed previous run counts. @@ -89,8 +91,8 @@ def compute_derivative_rows(self, rows, metrics, key): # No changes to the query; no metric needed if all(diffed_row[k] == 0 for k in metric_columns): - # if row['query_signature'] == '94caeb4c54f97849': - # logger.warning("0 value for %s in %s", row_key, diffed_row) + if row['query_signature'] == '94caeb4c54f97849': + logger.warning("0 value for %s in %s", row_key, diffed_row) continue result.append(diffed_row) diff --git a/mysql/datadog_checks/mysql/statements.py b/mysql/datadog_checks/mysql/statements.py index f81a575449db2..c41df9e239eb5 100644 --- a/mysql/datadog_checks/mysql/statements.py +++ b/mysql/datadog_checks/mysql/statements.py @@ -92,10 +92,7 @@ def __init__(self, check, config, connection_args): ttl=60 * 60 / self._config.full_statement_text_samples_per_hour_per_query, ) # type: TTLCache - # digest_text_cache: cache the full digest text for statements to avoid querying the db for the same digest - self._digest_text_cache = Cache( - maxsize=10 * 1000, - ) # type: TTLCache + self._statement_rows = {} # type: Dict[(str, str), Dict[str, PyMysqlRow]] def _get_db_connection(self): """ @@ -195,17 +192,15 @@ def _collect_per_statement_metrics(self, tags): tags=tags + self._check._get_debug_tags(), hostname=self._check.resolved_hostname, ) - # self.log.info("_add_digest_text start") - # monotonic_rows = self._add_digest_text(monotonic_rows) - # self.log.info("_add_digest_text end") - self.log.info("_filter_query_rows start") + monotonic_rows = self._filter_query_rows(monotonic_rows) - self.log.info("_filter_query_rows end") - self.log.info("_normalize_queries start") monotonic_rows = self._normalize_queries(monotonic_rows) - self.log.info("_normalize_queries end") - self.log.info("compute_derivative_rows start") + self._log.warning("normalized_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in monotonic_rows if m['query_signature'] == '94caeb4c54f97849']) + monotonic_rows = self._add_associated_rows(monotonic_rows) + self._log.warning("monotonic_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in monotonic_rows if m['query_signature'] == '94caeb4c54f97849']) + rows = self._state.compute_derivative_rows(monotonic_rows, METRICS_COLUMNS, key=_row_key) + self._log.warning("derivative_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in rows if m['query_signature'] == '94caeb4c54f97849']) self.log.info("compute_derivative_rows end") return rows @@ -265,41 +260,6 @@ def _query_summary_per_statement(self): return rows - def _add_digest_text(self, rows): - # type: (List[PyMysqlRow]) -> List[PyMysqlRow] - """ - Add the full statement text to the rows - """ - saturated_rows = [] - digests = [] - # Find digests we don't have cached - for row in rows: - if self._digest_text_cache.get(row['digest']): - continue - digests.append(row['digest']) - - if digests: - # Query for uncached digests - sql_statement_text = """\ - SELECT `digest`, `digest_text` - FROM performance_schema.events_statements_summary_by_digest - WHERE `digest` IN ({})""".format( - ",".join(["%s"] * len(digests)) - ) - - with closing(self._get_db_connection().cursor(CommenterDictCursor)) as cursor: - cursor.execute(sql_statement_text, digests) - digest_rows = cursor.fetchall() or [] - for row in digest_rows: - self._digest_text_cache[row['digest']] = row['digest_text'] - # self.log.warning("Row digest: %s %s", row['digest'], row['digest_text']) - - for row in rows: - row = dict(copy.copy(row)) - row['digest_text'] = self._digest_text_cache.get(row['digest'], None) - saturated_rows.append(row) - - return saturated_rows def _filter_query_rows(self, rows): # type: (List[PyMysqlRow]) -> List[PyMysqlRow] @@ -332,6 +292,24 @@ def _normalize_queries(self, rows): normalized_rows.append(normalized_row) return normalized_rows + + def _add_associated_rows(self, rows): + """ + If two or more statements with different digests have the same query_signature, they are considered the same + Because only one digest statement may be updated, we cache all the rows for each digest, + update with any new rows and then return all the rows for all the query_signatures + """ + keys = set() + for row in rows: + key = (row['schema_name'], row['query_signature']) + if key not in self._statement_rows: + self._statement_rows[key] = {} + self._statement_rows[key][row['digest']] = row + keys.add(key) + + # self._log.warning("keys: %s", keys) + + return [self._statement_rows[key][digest] for key in keys for digest in self._statement_rows[key]] def _rows_to_fqt_events(self, rows, tags): for row in rows: diff --git a/mysql/tests/test_statements.py b/mysql/tests/test_statements.py index 8269073ea517a..590873bca42ff 100644 --- a/mysql/tests/test_statements.py +++ b/mysql/tests/test_statements.py @@ -152,6 +152,7 @@ def run_query(q): expected_tags.add("replication_role:" + aurora_replication_role) assert set(event['tags']) == expected_tags query_signature = compute_sql_signature(query) + print("query_signature: ", query_signature) matching_rows = [r for r in event['mysql_rows'] if r['query_signature'] == query_signature] assert len(matching_rows) == 1 row = matching_rows[0] @@ -607,6 +608,7 @@ def test_performance_schema_disabled(dbm_instance, dd_run_check): ), ], ) +@pytest.mark.unit def test_statement_metadata( aggregator, dd_run_check, dbm_instance, datadog_agent, metadata, expected_metadata_payload, root_conn ):