Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
sethsamuel committed Dec 26, 2024
1 parent f422cd0 commit 16df724
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,36 @@ def compute_derivative_rows(self, rows, metrics, key):
# the absence of metrics. On any given check run, most rows will have no difference so this optimization
# avoids having to send a lot of unnecessary metrics.


diffed_row = {k: row[k] - prev[k] if k in metric_columns else row[k] for k in row.keys()}

# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("row: %s", row)
# logger.warning("prev: %s", prev)
# logger.warning("diffed: %s", diffed_row)

# Check for negative values, but only in the columns used for metrics
if any(diffed_row[k] < 0 for k in metric_columns):
# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("- value for %s in %s", row_key, diffed_row)
# A "break" might be expected here instead of "continue," but there are cases where a subset of rows
# are removed. To avoid situations where all results are discarded every check run, we err on the side
# of potentially including truncated rows that exceed previous run counts.
continue

# No changes to the query; no metric needed
if all(diffed_row[k] == 0 for k in metric_columns):
# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("0 value for %s in %s", row_key, diffed_row)
continue

result.append(diffed_row)

self._previous_statements.clear()
self._previous_statements = merged_rows
# for _, row in self._previous_statements.items():
# if row['query_signature'] == '94caeb4c54f97849':
# logger.warning("setting prev: %s", row)

return result

Expand Down
33 changes: 29 additions & 4 deletions mysql/datadog_checks/mysql/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ def collect_per_statement_metrics(self):
rows = self._collect_per_statement_metrics(tags)
if not rows:
return
self.log.info("_rows_to_fqt_events start")
for event in self._rows_to_fqt_events(rows, tags):
self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding))
self.log.info("_rows_to_fqt_events end")
self.log.info("payload start")
payload = {
'host': self._check.resolved_hostname,
'timestamp': time.time() * 1000,
Expand All @@ -176,23 +179,34 @@ def collect_per_statement_metrics(self):
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)
self.log.info("payload end")

def _collect_per_statement_metrics(self, tags):
# type: () -> List[PyMysqlRow]

self._get_statement_count(tags)

self.log.info("_query_summary_per_statement start")
monotonic_rows = self._query_summary_per_statement()
self.log.info("_query_summary_per_statement end")
self._check.gauge(
"dd.mysql.statement_metrics.rows",
len(monotonic_rows),
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)
monotonic_rows = self._add_digest_text(monotonic_rows)
# self.log.info("_add_digest_text start")
# monotonic_rows = self._add_digest_text(monotonic_rows)
# self.log.info("_add_digest_text end")
self.log.info("_filter_query_rows start")
monotonic_rows = self._filter_query_rows(monotonic_rows)
self.log.info("_filter_query_rows end")
self.log.info("_normalize_queries start")
monotonic_rows = self._normalize_queries(monotonic_rows)
self.log.info("_normalize_queries end")
self.log.info("compute_derivative_rows start")
rows = self._state.compute_derivative_rows(monotonic_rows, METRICS_COLUMNS, key=_row_key)
self.log.info("compute_derivative_rows end")
return rows

def _get_statement_count(self, tags):
Expand All @@ -219,6 +233,7 @@ def _query_summary_per_statement(self):
sql_statement_summary = """\
SELECT `schema_name`,
`digest`,
`digest_text`,
`count_star`,
`sum_timer_wait`,
`sum_lock_time`,
Expand All @@ -229,17 +244,24 @@ def _query_summary_per_statement(self):
`sum_select_scan`,
`sum_select_full_join`,
`sum_no_index_used`,
`sum_no_good_index_used`
`sum_no_good_index_used`,
`last_seen`
FROM performance_schema.events_statements_summary_by_digest
WHERE LAST_SEEN > %s
WHERE `last_seen` >= %s
"""

with closing(self._get_db_connection().cursor(CommenterDictCursor)) as cursor:
cursor.execute(sql_statement_summary, [self._last_seen])

rows = cursor.fetchall() or [] # type: ignore

self._last_seen = max(row['last_seen'] for row in rows)
# self.log.warning("Rows: %s", len(rows))
if rows:
self._last_seen = max(row['last_seen'] for row in rows)
# self.log.warning("Last seen: %s", self._last_seen)
# for row in rows:
# if row['digest'] == '6b5a1b14bbeef4253f3d88bd6d2f41cf' or row['digest'] == '98c344ecca8effb370ff4296412a2d73':
# self.log.warning("Row: %s", row)

return rows

Expand Down Expand Up @@ -270,6 +292,7 @@ def _add_digest_text(self, rows):
digest_rows = cursor.fetchall() or []
for row in digest_rows:
self._digest_text_cache[row['digest']] = row['digest_text']
# self.log.warning("Row digest: %s %s", row['digest'], row['digest_text'])

for row in rows:
row = dict(copy.copy(row))
Expand Down Expand Up @@ -304,6 +327,8 @@ def _normalize_queries(self, rows):
normalized_row['dd_tables'] = metadata.get('tables', None)
normalized_row['dd_commands'] = metadata.get('commands', None)
normalized_row['dd_comments'] = metadata.get('comments', None)
# if row['digest'] == '6b5a1b14bbeef4253f3d88bd6d2f41cf':
# self.log.warning("Normalized Row: %s", normalized_row)
normalized_rows.append(normalized_row)

return normalized_rows
Expand Down

0 comments on commit 16df724

Please sign in to comment.