Skip to content

Commit

Permalink
Flushing metrics on ephemeral executions (#4291)
Browse files Browse the repository at this point in the history
As things currently stand, metrics are flushed by a background thread
every 10m. However, for the batch/swarming use case, we will lose
metrics for jobs that finish before this interval. To handle that,
monitor.stop will be called before the bot exits.

Finally, sigterm is handled in run_bot to avoid losing metrics when
instances get preempted

This PR is part of #4271.
  • Loading branch information
vitorguidi authored Oct 7, 2024
1 parent af2907b commit d155aac
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 82 deletions.
103 changes: 55 additions & 48 deletions src/clusterfuzz/_internal/metrics/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,61 @@ def _time_series_sort_key(ts):
return ts.points[-1].interval.start_time


class _FlusherThread(threading.Thread):
"""Flusher thread."""
def _flush_metrics():
"""Flushes all metrics stored in _metrics_store"""
project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member
utils.get_application_id())
try:
time_series = []
end_time = time.time()
for metric, labels, start_time, value in _metrics_store.iter_values():
if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member
):
start_time = end_time
series = _TimeSeries()
metric.monitoring_v3_time_series(series, labels, start_time, end_time,
value)
time_series.append(series)
if len(time_series) == MAX_TIME_SERIES_PER_CALL:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
time_series = []
if time_series:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
except Exception as e:
if environment.is_android():
# FIXME: This exception is extremely common on Android. We are already
# aware of the problem, don't make more noise about it.
logs.warning(f'Failed to flush metrics: {e}')
else:
logs.error(f'Failed to flush metrics: {e}')

def __init__(self):
super().__init__()
self.daemon = True
self.stop_event = threading.Event()

def run(self):
"""Run the flusher thread."""
project_path = _monitoring_v3_client.common_project_path( # pylint: disable=no-member
utils.get_application_id())
class _MonitoringDaemon():
"""Wrapper for the daemon threads responsible for flushing metrics."""

def __init__(self, flush_function, tick_interval):
self._tick_interval = tick_interval
self._flush_function = flush_function
self._flushing_thread = threading.Thread(
name='flushing_thread', target=self._flush_loop, daemon=True)
self._flushing_thread_stop_event = threading.Event()

def _flush_loop(self):
while True:
try:
if self.stop_event.wait(FLUSH_INTERVAL_SECONDS):
return
should_stop = self._flushing_thread_stop_event.wait(
timeout=self._tick_interval)
self._flush_function()
if should_stop:
break

time_series = []
end_time = time.time()
for metric, labels, start_time, value in _metrics_store.iter_values():
if (metric.metric_kind == metric_pb2.MetricDescriptor.MetricKind.GAUGE # pylint: disable=no-member
):
start_time = end_time

series = _TimeSeries()
metric.monitoring_v3_time_series(series, labels, start_time, end_time,
value)
time_series.append(series)

if len(time_series) == MAX_TIME_SERIES_PER_CALL:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
time_series = []

if time_series:
time_series.sort(key=_time_series_sort_key)
_create_time_series(project_path, time_series)
except Exception as e:
if environment.is_android():
# FIXME: This exception is extremely common on Android. We are already
# aware of the problem, don't make more noise about it.
logs.warning(f'Failed to flush metrics: {e}')
else:
logs.error(f'Failed to flush metrics: {e}')
def start(self):
self._flushing_thread.start()

def stop(self):
self.stop_event.set()
self.join()
self._flushing_thread_stop_event.set()
self._flushing_thread.join()


_StoreValue = collections.namedtuple(
Expand Down Expand Up @@ -496,7 +502,7 @@ def _set_value(self, point, value):
# Global state.
_metrics_store = _MetricsStore()
_monitoring_v3_client = None
_flusher_thread = None
_monitoring_daemon = None
_monitored_resource = None

# Add fields very conservatively here. There is a limit of 10 labels per metric
Expand Down Expand Up @@ -564,7 +570,7 @@ def _time_to_timestamp(interval, attr, time_seconds):
def initialize():
"""Initialize if monitoring is enabled for this bot."""
global _monitoring_v3_client
global _flusher_thread
global _monitoring_daemon

if environment.get_value('LOCAL_DEVELOPMENT'):
return
Expand All @@ -576,14 +582,15 @@ def initialize():
_initialize_monitored_resource()
_monitoring_v3_client = monitoring_v3.MetricServiceClient(
credentials=credentials.get_default()[0])
_flusher_thread = _FlusherThread()
_flusher_thread.start()
_monitoring_daemon = _MonitoringDaemon(_flush_metrics,
FLUSH_INTERVAL_SECONDS)
_monitoring_daemon.start()


def stop():
"""Stops monitoring and cleans up (only if monitoring is enabled)."""
if _flusher_thread:
_flusher_thread.stop()
if _monitoring_daemon:
_monitoring_daemon.stop()


def metrics_store():
Expand Down
63 changes: 33 additions & 30 deletions src/clusterfuzz/_internal/tests/core/metrics/monitor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
# pylint: disable=protected-access

import os
import queue
import time
import unittest
from unittest.mock import patch

from clusterfuzz._internal.metrics import monitor
from clusterfuzz._internal.metrics import monitoring_metrics
Expand Down Expand Up @@ -175,37 +174,40 @@ def test_gauge_metric_failure(self):
self.assertIsInstance(gauge, monitor._MockMetric)


class TestFlusherThread(unittest.TestCase):
"""Sets up the flusher thread and mocks MetricServiceClient."""
class TestMonitoringDaemon(unittest.TestCase):
"""Tests that the monitoring daemon correctly flushes, and terminates."""

@patch(
'clusterfuzz._internal.metrics.monitor.monitoring_v3.MetricServiceClient')
def test_flusher_thread(self, mock_client):
"""Sets up the flusher thread and calls run()."""
def test_monitoring_daemon_calls_flush_while_looping(self):
"""Tests that flushes happen during the flushing loop."""
calls = 0

monitor._monitoring_v3_client = mock_client.return_value
monitor._flusher_thread = monitor._FlusherThread()
monitor.FLUSH_INTERVAL_SECONDS = 1
monitoring_metrics.BOT_COUNT.set(1, {'revision': '1'})
monitoring_metrics.HOST_INCONSISTENT_COUNT.increment()
os.environ['BOT_NAME'] = 'bot-1'
def mock_flush():
nonlocal calls
calls += 1

daemon = monitor._MonitoringDaemon(mock_flush, 1)
daemon.start()
time.sleep(2)
assert calls > 0
daemon.stop()
assert not daemon._flushing_thread.is_alive()

call_queue = queue.Queue()
mock_create_time_series = mock_client.return_value.create_time_series
def test_monitoring_daemon_flushes_after_stop(self):
"""Tests that flushes happen during prior to exit."""
calls = 0

mock_create_time_series.side_effect = (
lambda **kwargs: call_queue.put(kwargs))
def mock_flush():
nonlocal calls
calls += 1

monitor._flusher_thread.start()
try:
args = call_queue.get(timeout=10)
time_series = args['time_series']
for i in range(1, len(time_series)):
self.assertLessEqual(time_series[i - 1].points[0].interval.start_time,
time_series[i].points[0].interval.start_time)
except queue.Empty:
self.fail("Queue timed out, no arguments received from mocked method")
monitor._flusher_thread.stop()
# Impose an absurdly large ticking interval, so only the
# closing flush happens
daemon = monitor._MonitoringDaemon(mock_flush, 10000)
daemon.start()
assert calls == 0
daemon.stop()
assert not daemon._flushing_thread.is_alive()
assert calls == 1


@unittest.skip('Skip this because it\'s only used by metzman for debugging.')
Expand All @@ -217,14 +219,15 @@ def test_flush(self):
monitor.credentials._use_anonymous_credentials = lambda: False
monitor._monitoring_v3_client = monitor.monitoring_v3.MetricServiceClient(
credentials=monitor.credentials.get_default()[0])
monitor._flusher_thread = monitor._FlusherThread()
monitor.FLUSH_INTERVAL_SECONDS = 1
monitor._monitoring_daemon = monitor._MonitoringDaemon(
monitor._flush_metrics, monitor.FLUSH_INTERVAL_SECONDS)
monitoring_metrics.BOT_COUNT.set(1, {'revision': '1'})
monitor.utils.get_application_id = lambda: 'google.com:clusterfuzz'
os.environ['BOT_NAME'] = 'bot-1'
monitor._initialize_monitored_resource()
monitor._monitored_resource.labels['zone'] = 'us-central1-b'
monitor._flusher_thread.run()
monitor._monitoring_daemon.start()

def test_cumulative_distribution_metric_geometric(self):
"""Test _CumulativeDistributionMetric with geometric bucketer."""
Expand Down
23 changes: 19 additions & 4 deletions src/python/bot/startup/run_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import contextlib
import multiprocessing
import os
import signal
import sys
import time
import traceback
Expand Down Expand Up @@ -84,6 +85,23 @@ def lease_all_tasks(task_list):
yield


def handle_sigterm(signo, stack_frame): #pylint: disable=unused-argument
logs.info('Handling sigterm, stopping monitoring daemon.')
monitor.stop()
logs.info('Sigterm handled, metrics flushed.')


@contextlib.contextmanager
def wrap_with_monitoring():
"""Wraps execution so we flush metrics on exit"""
try:
monitor.initialize()
signal.signal(signal.SIGTERM, handle_sigterm)
yield
finally:
monitor.stop()


def schedule_utask_mains():
"""Schedules utask_mains from preprocessed utasks on Google Cloud Batch."""
from clusterfuzz._internal.google_cloud_utils import batch
Expand Down Expand Up @@ -186,7 +204,6 @@ def main():

dates.initialize_timezone_from_environment()
environment.set_bot_environment()
monitor.initialize()

if not profiler.start_if_needed('python_profiler_bot'):
sys.exit(-1)
Expand Down Expand Up @@ -245,7 +262,7 @@ def main():
multiprocessing.set_start_method('spawn')

try:
with ndb_init.context():
with wrap_with_monitoring(), ndb_init.context():
main()
exit_code = 0
except Exception:
Expand All @@ -254,7 +271,5 @@ def main():
sys.stderr.flush()
exit_code = 1

monitor.stop()

# Prevent python GIL deadlocks on shutdown. See https://crbug.com/744680.
os._exit(exit_code) # pylint: disable=protected-access

0 comments on commit d155aac

Please sign in to comment.