Skip to content

Commit

Permalink
Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sethsamuel committed Dec 27, 2024
1 parent 16df724 commit 724de2b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def compute_derivative_rows(self, rows, metrics, key):
)

for row_key, row in merged_rows.items():
if row['query_signature'] == '94caeb4c54f97849':
logger.warning("row: %s", (row['schema_name'],row['digest'],row['sum_rows_examined']))

prev = self._previous_statements.get(row_key)
if prev is None:
continue
Expand All @@ -73,24 +76,23 @@ def compute_derivative_rows(self, rows, metrics, key):

diffed_row = {k: row[k] - prev[k] if k in metric_columns else row[k] for k in row.keys()}

# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("row: %s", row)
# logger.warning("prev: %s", prev)
# logger.warning("diffed: %s", diffed_row)
if row['query_signature'] == '94caeb4c54f97849':
logger.warning("prev: %s", prev)
logger.warning("diffed: %s", diffed_row)

# Check for negative values, but only in the columns used for metrics
if any(diffed_row[k] < 0 for k in metric_columns):
# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("- value for %s in %s", row_key, diffed_row)
if row['query_signature'] == '94caeb4c54f97849':
logger.warning("- value for %s in %s", row_key, diffed_row)
# A "break" might be expected here instead of "continue," but there are cases where a subset of rows
# are removed. To avoid situations where all results are discarded every check run, we err on the side
# of potentially including truncated rows that exceed previous run counts.
continue

# No changes to the query; no metric needed
if all(diffed_row[k] == 0 for k in metric_columns):
# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("0 value for %s in %s", row_key, diffed_row)
if row['query_signature'] == '94caeb4c54f97849':
logger.warning("0 value for %s in %s", row_key, diffed_row)
continue

result.append(diffed_row)
Expand Down
72 changes: 25 additions & 47 deletions mysql/datadog_checks/mysql/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ def __init__(self, check, config, connection_args):
ttl=60 * 60 / self._config.full_statement_text_samples_per_hour_per_query,
) # type: TTLCache

# digest_text_cache: cache the full digest text for statements to avoid querying the db for the same digest
self._digest_text_cache = Cache(
maxsize=10 * 1000,
) # type: TTLCache
self._statement_rows = {} # type: Dict[(str, str), Dict[str, PyMysqlRow]]

def _get_db_connection(self):
"""
Expand Down Expand Up @@ -195,17 +192,15 @@ def _collect_per_statement_metrics(self, tags):
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)
# self.log.info("_add_digest_text start")
# monotonic_rows = self._add_digest_text(monotonic_rows)
# self.log.info("_add_digest_text end")
self.log.info("_filter_query_rows start")

monotonic_rows = self._filter_query_rows(monotonic_rows)
self.log.info("_filter_query_rows end")
self.log.info("_normalize_queries start")
monotonic_rows = self._normalize_queries(monotonic_rows)
self.log.info("_normalize_queries end")
self.log.info("compute_derivative_rows start")
self._log.warning("normalized_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in monotonic_rows if m['query_signature'] == '94caeb4c54f97849'])
monotonic_rows = self._add_associated_rows(monotonic_rows)
self._log.warning("monotonic_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in monotonic_rows if m['query_signature'] == '94caeb4c54f97849'])

rows = self._state.compute_derivative_rows(monotonic_rows, METRICS_COLUMNS, key=_row_key)
self._log.warning("derivative_rows: %s", [(m['schema_name'],m['digest'],m['sum_rows_examined']) for m in rows if m['query_signature'] == '94caeb4c54f97849'])
self.log.info("compute_derivative_rows end")
return rows

Expand Down Expand Up @@ -265,41 +260,6 @@ def _query_summary_per_statement(self):

return rows

def _add_digest_text(self, rows):
# type: (List[PyMysqlRow]) -> List[PyMysqlRow]
"""
Add the full statement text to the rows
"""
saturated_rows = []
digests = []
# Find digests we don't have cached
for row in rows:
if self._digest_text_cache.get(row['digest']):
continue
digests.append(row['digest'])

if digests:
# Query for uncached digests
sql_statement_text = """\
SELECT `digest`, `digest_text`
FROM performance_schema.events_statements_summary_by_digest
WHERE `digest` IN ({})""".format(
",".join(["%s"] * len(digests))
)

with closing(self._get_db_connection().cursor(CommenterDictCursor)) as cursor:
cursor.execute(sql_statement_text, digests)
digest_rows = cursor.fetchall() or []
for row in digest_rows:
self._digest_text_cache[row['digest']] = row['digest_text']
# self.log.warning("Row digest: %s %s", row['digest'], row['digest_text'])

for row in rows:
row = dict(copy.copy(row))
row['digest_text'] = self._digest_text_cache.get(row['digest'], None)
saturated_rows.append(row)

return saturated_rows

def _filter_query_rows(self, rows):
# type: (List[PyMysqlRow]) -> List[PyMysqlRow]
Expand Down Expand Up @@ -332,6 +292,24 @@ def _normalize_queries(self, rows):
normalized_rows.append(normalized_row)

return normalized_rows

def _add_associated_rows(self, rows):
"""
If two or more statements with different digests have the same query_signature, they are considered the same
Because only one digest statement may be updated, we cache all the rows for each digest,
update with any new rows and then return all the rows for all the query_signatures
"""
keys = set()
for row in rows:
key = (row['schema_name'], row['query_signature'])
if key not in self._statement_rows:
self._statement_rows[key] = {}
self._statement_rows[key][row['digest']] = row
keys.add(key)

# self._log.warning("keys: %s", keys)

return [self._statement_rows[key][digest] for key in keys for digest in self._statement_rows[key]]

def _rows_to_fqt_events(self, rows, tags):
for row in rows:
Expand Down
2 changes: 2 additions & 0 deletions mysql/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def run_query(q):
expected_tags.add("replication_role:" + aurora_replication_role)
assert set(event['tags']) == expected_tags
query_signature = compute_sql_signature(query)
print("query_signature: ", query_signature)
matching_rows = [r for r in event['mysql_rows'] if r['query_signature'] == query_signature]
assert len(matching_rows) == 1
row = matching_rows[0]
Expand Down Expand Up @@ -607,6 +608,7 @@ def test_performance_schema_disabled(dbm_instance, dd_run_check):
),
],
)
@pytest.mark.unit
def test_statement_metadata(
aggregator, dd_run_check, dbm_instance, datadog_agent, metadata, expected_metadata_payload, root_conn
):
Expand Down

0 comments on commit 724de2b

Please sign in to comment.