Skip to content

Commit

Permalink
Cleans metrics raw data if they are inactive.
Browse files Browse the repository at this point in the history
The truncating of the raw metrics data is only done when new measures
are pushed. Therefore, if no new measures are pushed, and the archive
policy is updated to reduce the back window, the raw data points for
metrics that are not receiving new data points are never truncated.

The goal of this PR is to propose a method to identify metrics that are
in "inactive state", meaning, not receiving new data points, and execute
their raw data points truncation when the archive policy backwindow is
changed.
  • Loading branch information
rafaelweingartner committed Apr 26, 2024
1 parent 7a289c9 commit 3963a4a
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 17 deletions.
15 changes: 10 additions & 5 deletions gnocchi/carbonara.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ def __eq__(self, other):
and self.block_size == other.block_size
and self.back_window == other.back_window)

def _set_values(self, values):
if self.block_size is not None and len(self.ts) != 0:
index = numpy.searchsorted(values['timestamps'],
self.first_block_timestamp())
values = values[index:]
super(BoundTimeSerie, self).set_values(values)

def set_values(self, values, before_truncate_callback=None):
"""Set the timestamps and values in this timeseries.
Expand All @@ -357,11 +364,9 @@ def set_values(self, values, before_truncate_callback=None):
maximum size.
:return: None of the return value of before_truncate_callback
"""
if self.block_size is not None and len(self.ts) != 0:
index = numpy.searchsorted(values['timestamps'],
self.first_block_timestamp())
values = values[index:]
super(BoundTimeSerie, self).set_values(values)
if values is not None and len(values) > 0:
self._set_values(values)

if before_truncate_callback:
return_value = before_truncate_callback(self)
else:
Expand Down
117 changes: 111 additions & 6 deletions gnocchi/chef.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import hashlib

import daiquiri
import datetime

from gnocchi import carbonara
from gnocchi import indexer


LOG = daiquiri.getLogger(__name__)


Expand All @@ -45,9 +46,114 @@ class Chef(object):
def __init__(self, coord, incoming, index, storage):
self.coord = coord
self.incoming = incoming
# This variable is an instance of the indexer,
# which means, database connector.
self.index = index
self.storage = storage

def clean_raw_data_inactive_metrics(
self, delay_consider_metric_inactive):
"""Cleans metrics raw data if they are inactive.
The truncating of the raw metrics data is only done when new
measures are pushed. Therefore, if no new measures are pushed, and the
archive policy was updated to reduce the backwindow, the raw
datapoints for metrics that are not receiving new datapoints are never
truncated.
The goal of this method is to identify metrics that are in
"inactive state", meaning, not receiving new datapoints, and execute
their raw data points truncation.
To consider that a metric is inactive, we will use the parameter
"delay_consider_metric_inactive" to indicate for how long a
metric can be without receiving new datapoints before we consider it as
inactive.
Furthermore, we check the column "backwindow_changed", to determine
if the archive policy was updated. If it was, we need to run a cleanup
for each metric that is inactive again, as the backwindow might have
been changed.
:param delay_consider_metric_inactive: The time (in seconds)
without new measures for a metric that we accept before we start
cleaning it.
:type delay_consider_metric_inactive: int
"""

if delay_consider_metric_inactive < 0:
LOG.debug("The option 'delay_consider_metric_inactive' [%s] "
"is negative; therefore, we do not execute the metric "
"raw data cleanup.", delay_consider_metric_inactive)
return
current_timestamp = datetime.datetime.now()
datetime_to_consult_metrics = current_timestamp - datetime.timedelta(
seconds=delay_consider_metric_inactive)

metrics_to_clean = self.index.list_metrics_for_cleanup(
attribute_filter={"<": {
"last_measure_push": datetime_to_consult_metrics}}
)

LOG.debug("Metrics [%s] found to execute the raw data cleanup.",
metrics_to_clean)

for metric in metrics_to_clean:
LOG.debug("Executing the raw data cleanup for metric [%s].",
metric)
try:
metrid_id = metric.id
# To properly generate the lock here, we need to use the
# same process as it is done in the measures processing.
# Therefore, we have to use the sack to control the locks
# in this processing here. See 'process_new_measures_for_sack'
# for more details.
sack_for_metric = self.incoming.sack_for_metric(metrid_id)
metric_lock = self.get_sack_lock(sack_for_metric)

if not metric_lock.acquire():
LOG.debug(
"Metric [%s] is locked, cannot clean it up now.",
metric.id)
continue

agg_methods = list(metric.archive_policy.aggregation_methods)
block_size = metric.archive_policy.max_block_size
back_window = metric.archive_policy.back_window

if any(filter(lambda x: x.startswith("rate:"), agg_methods)):
back_window += 1

raw_measure = self.storage. \
_get_or_create_unaggregated_timeseries_unbatched(metric)

if raw_measure:
ts = carbonara.BoundTimeSerie.unserialize(raw_measure,
block_size,
back_window)
# Set values as none to be added. This will trigger a
# truncation process only.
ts.set_values(None)

self.storage._store_unaggregated_timeseries_unbatched(
metric, ts.serialize())

self.index.update_metric_last_cleanup_timestamp(metric.id)
self.index.update_backwindow_changed(metric.id)
else:
LOG.info("No raw measures found for metric [%s] for "
"cleanup.", metric.id)

metric_lock.release()
except Exception:
LOG.error("Unable to lock metric [%s] for cleanup.",
metric, exc_info=True)
continue

if metrics_to_clean:
LOG.debug("Cleaned up metrics [%s].", metrics_to_clean)

def expunge_metrics(self, cleanup_batch_size, sync=False):
"""Remove deleted metrics.
Expand Down Expand Up @@ -124,7 +230,7 @@ def refresh_metrics(self, metrics, timeout=None, sync=False):
metrics_by_id[metric_id]: measures
for metric_id, measures
in metrics_and_measures.items()
})
}, self.index)
LOG.debug("Measures for %d metrics processed",
len(metric_ids))
except Exception:
Expand Down Expand Up @@ -165,7 +271,7 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False):
self.storage.add_measures_to_metrics({
metric: measures[metric.id]
for metric in metrics
})
}, self.index)
LOG.debug("Measures for %d metrics processed",
len(metrics))
return len(measures)
Expand All @@ -180,7 +286,6 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False):
def get_sack_lock(self, sack):
# FIXME(jd) Some tooz drivers have a limitation on lock name length
# (e.g. MySQL). This should be handled by tooz, but it's not yet.
lock_name = hashlib.new(
'sha1',
('gnocchi-sack-%s-lock' % str(sack)).encode()).hexdigest().encode()
lock_name = ('gnocchi-sack-%s-lock' % str(sack)).encode()
lock_name = hashlib.new('sha1', lock_name).hexdigest().encode()
return self.coord.get_lock(lock_name)
3 changes: 3 additions & 0 deletions gnocchi/cli/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def prepare_service(conf=None):

opts.set_defaults()
policy_opts.set_defaults(conf, 'policy.yaml')

conf = service.prepare_service(conf=conf)
return conf

Expand Down Expand Up @@ -113,4 +114,6 @@ def api():
if virtual_env is not None:
args.extend(["-H", os.getenv("VIRTUAL_ENV", ".")])

LOG.info("Starting gnocchi api server with [%s] and arguments [%s]", uwsgi, args)
print("Starting gnocchi api server with [%s] and arguments [%s]" % (uwsgi, args))
return os.execl(uwsgi, uwsgi, *args)
15 changes: 12 additions & 3 deletions gnocchi/cli/metricd.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ def _configure(self):
self.conf.coordination_url)
self.store = storage.get_driver(self.conf)
self.incoming = incoming.get_driver(self.conf)
self.index = indexer.get_driver(self.conf)
self.indexer = indexer.get_driver(self.conf)
self.chef = chef.Chef(self.coord, self.incoming,
self.index, self.store)
self.indexer, self.store)

def run(self):
self._configure()
Expand Down Expand Up @@ -267,9 +267,18 @@ def __init__(self, worker_id, conf):
worker_id, conf, conf.metricd.metric_cleanup_delay)

def _run_job(self):
LOG.debug("Cleaning up deleted metrics with batch size [%s].",
self.conf.metricd.cleanup_batch_size)
self.chef.expunge_metrics(self.conf.metricd.cleanup_batch_size)
LOG.debug("Metrics marked for deletion removed from backend")

LOG.debug("Starting the cleaning of raw data points for metrics that "
"are no longer receiving measures.")
self.chef.clean_raw_data_inactive_metrics(
self.conf.metricd.delay_consider_metric_inactive)
LOG.debug("Finished the cleaning of raw data points for metrics that "
"are no longer receiving measures.")


class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
Expand All @@ -288,7 +297,7 @@ def __init__(self, conf):

def on_reload(self):
# NOTE(sileht): We do not implement reload() in Workers so all workers
# will received SIGHUP and exit gracefully, then their will be
# will receive SIGHUP and exit gracefully, then their will be
# restarted with the new number of workers. This is important because
# we use the number of worker to declare the capability in tooz and
# to select the block of metrics to proceed.
Expand Down
6 changes: 6 additions & 0 deletions gnocchi/incoming/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,15 @@ def process_measures_for_sack(self, sack):
sack, metric_id)
processed_files[metric_id] = files
m = self._make_measures_array()

count = 0
total_files = len(files)
for f in files:
count = count + 1
abspath = self._build_measure_path(metric_id, f)
with open(abspath, "rb") as e:
LOG.debug("(%s/%s) Reading metric file [%s].",
count, total_files, abspath)
m = numpy.concatenate((
m, self._unserialize_measures(f, e.read())))
measures[metric_id] = m
Expand Down
21 changes: 21 additions & 0 deletions gnocchi/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ def list_metrics(details=False, status='active',
resource_policy_filter=None):
raise exceptions.NotImplementedError

@staticmethod
def list_metrics_for_cleanup(attribute_filter=None):
raise exceptions.NotImplementedError

@staticmethod
def create_archive_policy(archive_policy):
raise exceptions.NotImplementedError
Expand Down Expand Up @@ -438,6 +442,23 @@ def delete_resources(resource_type='generic',
def delete_metric(id):
raise exceptions.NotImplementedError

@staticmethod
def update_metric_last_cleanup_timestamp(metric_id):
raise exceptions.NotImplementedError

@staticmethod
def update_backwindow_changed(metric_id):
raise exceptions.NotImplementedError

@staticmethod
def update_backwindow_changed_for_metrics_archive_policy(
archive_policy_name):
raise exceptions.NotImplementedError

@staticmethod
def update_metric_last_push_timestamp(metric_id):
raise exceptions.NotImplementedError

@staticmethod
def expunge_metric(id):
raise exceptions.NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

"""create metric status column
Revision ID: 18fff4509e3e
Revises: 04eba72e4f90
Create Date: 2024-04-24 09:16:00
"""

import datetime
from alembic import op
from sqlalchemy.sql import func

import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = '18fff4509e3e'
down_revision = '04eba72e4f90'
branch_labels = None
depends_on = None


def upgrade():
op.add_column("metric",
sa.Column("backwindow_changed", sa.Boolean,
nullable=False, default=False))
for column_name in ['last_measure_push', 'last_cleanup_timestamp']:
op.add_column("metric", sa.Column(column_name,
sa.DateTime(timezone=True),
nullable=False,
server_default=func.now()))
index_name = "idx_%s" % column_name
op.create_index(index_name, 'metric',
[column_name], unique=False)
Loading

0 comments on commit 3963a4a

Please sign in to comment.