From 3963a4ace9d81f909b548f08c9fa703460ace8fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Weing=C3=A4rtner?= Date: Fri, 26 Apr 2024 10:55:05 -0300 Subject: [PATCH] Cleans metrics raw data if they are inactive. The truncating of the raw metrics data is only done when new measures are pushed. Therefore, if no new measures are pushed, and the archive policy is updated to reduce the back window, the raw data points for metrics that are not receiving new data points are never truncated. The goal of this PR is to propose a method to identify metrics that are in "inactive state", meaning, not receiving new data points, and execute their raw data points truncation when the archive policy backwindow is changed. --- gnocchi/carbonara.py | 15 ++- gnocchi/chef.py | 117 +++++++++++++++++- gnocchi/cli/api.py | 3 + gnocchi/cli/metricd.py | 15 ++- gnocchi/incoming/file.py | 6 + gnocchi/indexer/__init__.py | 21 ++++ ...reate_metric_last_push_timestamp_column.py | 48 +++++++ gnocchi/indexer/sqlalchemy.py | 53 +++++++- gnocchi/indexer/sqlalchemy_base.py | 12 ++ gnocchi/opts.py | 8 ++ gnocchi/rest/api.py | 16 ++- gnocchi/rest/app.py | 7 ++ gnocchi/rest/auth_helper.py | 11 ++ gnocchi/service.py | 1 + gnocchi/storage/__init__.py | 12 +- 15 files changed, 328 insertions(+), 17 deletions(-) create mode 100644 gnocchi/indexer/alembic/versions/18fff4509e3e_create_metric_last_push_timestamp_column.py diff --git a/gnocchi/carbonara.py b/gnocchi/carbonara.py index 99d4b0dea..62e470190 100644 --- a/gnocchi/carbonara.py +++ b/gnocchi/carbonara.py @@ -348,6 +348,13 @@ def __eq__(self, other): and self.block_size == other.block_size and self.back_window == other.back_window) + def _set_values(self, values): + if self.block_size is not None and len(self.ts) != 0: + index = numpy.searchsorted(values['timestamps'], + self.first_block_timestamp()) + values = values[index:] + super(BoundTimeSerie, self).set_values(values) + def set_values(self, values, before_truncate_callback=None): """Set the timestamps and values in this timeseries. @@ -357,11 +364,9 @@ def set_values(self, values, before_truncate_callback=None): maximum size. :return: None of the return value of before_truncate_callback """ - if self.block_size is not None and len(self.ts) != 0: - index = numpy.searchsorted(values['timestamps'], - self.first_block_timestamp()) - values = values[index:] - super(BoundTimeSerie, self).set_values(values) + if values is not None and len(values) > 0: + self._set_values(values) + if before_truncate_callback: return_value = before_truncate_callback(self) else: diff --git a/gnocchi/chef.py b/gnocchi/chef.py index a51f88ad1..74aadccdb 100644 --- a/gnocchi/chef.py +++ b/gnocchi/chef.py @@ -17,10 +17,11 @@ import hashlib import daiquiri +import datetime +from gnocchi import carbonara from gnocchi import indexer - LOG = daiquiri.getLogger(__name__) @@ -45,9 +46,114 @@ class Chef(object): def __init__(self, coord, incoming, index, storage): self.coord = coord self.incoming = incoming + # This variable is an instance of the indexer, + # which means, database connector. self.index = index self.storage = storage + def clean_raw_data_inactive_metrics( + self, delay_consider_metric_inactive): + """Cleans metrics raw data if they are inactive. + + The truncating of the raw metrics data is only done when new + measures are pushed. Therefore, if no new measures are pushed, and the + archive policy was updated to reduce the backwindow, the raw + datapoints for metrics that are not receiving new datapoints are never + truncated. + + The goal of this method is to identify metrics that are in + "inactive state", meaning, not receiving new datapoints, and execute + their raw data points truncation. + + To consider that a metric is inactive, we will use the parameter + "delay_consider_metric_inactive" to indicate for how long a + metric can be without receiving new datapoints before we consider it as + inactive. + + Furthermore, we check the column "backwindow_changed", to determine + if the archive policy was updated. If it was, we need to run a cleanup + for each metric that is inactive again, as the backwindow might have + been changed. + + :param delay_consider_metric_inactive: The time (in seconds) + without new measures for a metric that we accept before we start + cleaning it. + + :type delay_consider_metric_inactive: int + """ + + if delay_consider_metric_inactive < 0: + LOG.debug("The option 'delay_consider_metric_inactive' [%s] " + "is negative; therefore, we do not execute the metric " + "raw data cleanup.", delay_consider_metric_inactive) + return + current_timestamp = datetime.datetime.now() + datetime_to_consult_metrics = current_timestamp - datetime.timedelta( + seconds=delay_consider_metric_inactive) + + metrics_to_clean = self.index.list_metrics_for_cleanup( + attribute_filter={"<": { + "last_measure_push": datetime_to_consult_metrics}} + ) + + LOG.debug("Metrics [%s] found to execute the raw data cleanup.", + metrics_to_clean) + + for metric in metrics_to_clean: + LOG.debug("Executing the raw data cleanup for metric [%s].", + metric) + try: + metrid_id = metric.id + # To properly generate the lock here, we need to use the + # same process as it is done in the measures processing. + # Therefore, we have to use the sack to control the locks + # in this processing here. See 'process_new_measures_for_sack' + # for more details. + sack_for_metric = self.incoming.sack_for_metric(metrid_id) + metric_lock = self.get_sack_lock(sack_for_metric) + + if not metric_lock.acquire(): + LOG.debug( + "Metric [%s] is locked, cannot clean it up now.", + metric.id) + continue + + agg_methods = list(metric.archive_policy.aggregation_methods) + block_size = metric.archive_policy.max_block_size + back_window = metric.archive_policy.back_window + + if any(filter(lambda x: x.startswith("rate:"), agg_methods)): + back_window += 1 + + raw_measure = self.storage. \ + _get_or_create_unaggregated_timeseries_unbatched(metric) + + if raw_measure: + ts = carbonara.BoundTimeSerie.unserialize(raw_measure, + block_size, + back_window) + # Set values as none to be added. This will trigger a + # truncation process only. + ts.set_values(None) + + self.storage._store_unaggregated_timeseries_unbatched( + metric, ts.serialize()) + + self.index.update_metric_last_cleanup_timestamp(metric.id) + self.index.update_backwindow_changed(metric.id) + else: + LOG.info("No raw measures found for metric [%s] for " + "cleanup.", metric.id) + + metric_lock.release() + except Exception: + LOG.error("Unable to lock metric [%s] for cleanup.", + metric, exc_info=True) + continue + + if metrics_to_clean: + LOG.debug("Cleaned up metrics [%s].", metrics_to_clean) + def expunge_metrics(self, cleanup_batch_size, sync=False): """Remove deleted metrics. @@ -124,7 +230,7 @@ def refresh_metrics(self, metrics, timeout=None, sync=False): metrics_by_id[metric_id]: measures for metric_id, measures in metrics_and_measures.items() - }) + }, self.index) LOG.debug("Measures for %d metrics processed", len(metric_ids)) except Exception: @@ -165,7 +271,7 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False): self.storage.add_measures_to_metrics({ metric: measures[metric.id] for metric in metrics - }) + }, self.index) LOG.debug("Measures for %d metrics processed", len(metrics)) return len(measures) @@ -180,7 +286,6 @@ def process_new_measures_for_sack(self, sack, blocking=False, sync=False): def get_sack_lock(self, sack): # FIXME(jd) Some tooz drivers have a limitation on lock name length # (e.g. MySQL). This should be handled by tooz, but it's not yet. - lock_name = hashlib.new( - 'sha1', - ('gnocchi-sack-%s-lock' % str(sack)).encode()).hexdigest().encode() + lock_name = ('gnocchi-sack-%s-lock' % str(sack)).encode() + lock_name = hashlib.new('sha1', lock_name).hexdigest().encode() return self.coord.get_lock(lock_name) diff --git a/gnocchi/cli/api.py b/gnocchi/cli/api.py index fc82ac8c1..0a2401c5e 100644 --- a/gnocchi/cli/api.py +++ b/gnocchi/cli/api.py @@ -38,6 +38,7 @@ def prepare_service(conf=None): opts.set_defaults() policy_opts.set_defaults(conf, 'policy.yaml') + conf = service.prepare_service(conf=conf) return conf @@ -113,4 +114,6 @@ def api(): if virtual_env is not None: args.extend(["-H", os.getenv("VIRTUAL_ENV", ".")]) + LOG.info("Starting gnocchi api server with [%s] and arguments [%s]", uwsgi, args) + print("Starting gnocchi api server with [%s] and arguments [%s]" % (uwsgi, args)) return os.execl(uwsgi, uwsgi, *args) diff --git a/gnocchi/cli/metricd.py b/gnocchi/cli/metricd.py index 129f00b6a..884726fb4 100644 --- a/gnocchi/cli/metricd.py +++ b/gnocchi/cli/metricd.py @@ -72,9 +72,9 @@ def _configure(self): self.conf.coordination_url) self.store = storage.get_driver(self.conf) self.incoming = incoming.get_driver(self.conf) - self.index = indexer.get_driver(self.conf) + self.indexer = indexer.get_driver(self.conf) self.chef = chef.Chef(self.coord, self.incoming, - self.index, self.store) + self.indexer, self.store) def run(self): self._configure() @@ -267,9 +267,18 @@ def __init__(self, worker_id, conf): worker_id, conf, conf.metricd.metric_cleanup_delay) def _run_job(self): + LOG.debug("Cleaning up deleted metrics with batch size [%s].", + self.conf.metricd.cleanup_batch_size) self.chef.expunge_metrics(self.conf.metricd.cleanup_batch_size) LOG.debug("Metrics marked for deletion removed from backend") + LOG.debug("Starting the cleaning of raw data points for metrics that " + "are no longer receiving measures.") + self.chef.clean_raw_data_inactive_metrics( + self.conf.metricd.delay_consider_metric_inactive) + LOG.debug("Finished the cleaning of raw data points for metrics that " + "are no longer receiving measures.") + class MetricdServiceManager(cotyledon.ServiceManager): def __init__(self, conf): @@ -288,7 +297,7 @@ def __init__(self, conf): def on_reload(self): # NOTE(sileht): We do not implement reload() in Workers so all workers - # will received SIGHUP and exit gracefully, then their will be + # will receive SIGHUP and exit gracefully, then their will be # restarted with the new number of workers. This is important because # we use the number of worker to declare the capability in tooz and # to select the block of metrics to proceed. diff --git a/gnocchi/incoming/file.py b/gnocchi/incoming/file.py index b21a0cb66..794e5325a 100644 --- a/gnocchi/incoming/file.py +++ b/gnocchi/incoming/file.py @@ -194,9 +194,15 @@ def process_measures_for_sack(self, sack): sack, metric_id) processed_files[metric_id] = files m = self._make_measures_array() + + count = 0 + total_files = len(files) for f in files: + count = count + 1 abspath = self._build_measure_path(metric_id, f) with open(abspath, "rb") as e: + LOG.debug("(%s/%s) Reading metric file [%s].", + count, total_files, abspath) m = numpy.concatenate(( m, self._unserialize_measures(f, e.read()))) measures[metric_id] = m diff --git a/gnocchi/indexer/__init__.py b/gnocchi/indexer/__init__.py index 9704a1f3a..0895691d3 100644 --- a/gnocchi/indexer/__init__.py +++ b/gnocchi/indexer/__init__.py @@ -406,6 +406,10 @@ def list_metrics(details=False, status='active', resource_policy_filter=None): raise exceptions.NotImplementedError + @staticmethod + def list_metrics_for_cleanup(attribute_filter=None): + raise exceptions.NotImplementedError + @staticmethod def create_archive_policy(archive_policy): raise exceptions.NotImplementedError @@ -438,6 +442,23 @@ def delete_resources(resource_type='generic', def delete_metric(id): raise exceptions.NotImplementedError + @staticmethod + def update_metric_last_cleanup_timestamp(metric_id): + raise exceptions.NotImplementedError + + @staticmethod + def update_backwindow_changed(metric_id): + raise exceptions.NotImplementedError + + @staticmethod + def update_backwindow_changed_for_metrics_archive_policy( + archive_policy_name): + raise exceptions.NotImplementedError + + @staticmethod + def update_metric_last_push_timestamp(metric_id): + raise exceptions.NotImplementedError + @staticmethod def expunge_metric(id): raise exceptions.NotImplementedError diff --git a/gnocchi/indexer/alembic/versions/18fff4509e3e_create_metric_last_push_timestamp_column.py b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_metric_last_push_timestamp_column.py new file mode 100644 index 000000000..5f05913a1 --- /dev/null +++ b/gnocchi/indexer/alembic/versions/18fff4509e3e_create_metric_last_push_timestamp_column.py @@ -0,0 +1,48 @@ +# Copyright 2015 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""create metric status column + +Revision ID: 18fff4509e3e +Revises: 04eba72e4f90 +Create Date: 2024-04-24 09:16:00 + +""" + +import datetime +from alembic import op +from sqlalchemy.sql import func + +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '18fff4509e3e' +down_revision = '04eba72e4f90' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("metric", + sa.Column("backwindow_changed", sa.Boolean, + nullable=False, default=False)) + for column_name in ['last_measure_push', 'last_cleanup_timestamp']: + op.add_column("metric", sa.Column(column_name, + sa.DateTime(timezone=True), + nullable=False, + server_default=func.now())) + index_name = "idx_%s" % column_name + op.create_index(index_name, 'metric', + [column_name], unique=False) diff --git a/gnocchi/indexer/sqlalchemy.py b/gnocchi/indexer/sqlalchemy.py index cd67d2fe1..ecc1f4908 100644 --- a/gnocchi/indexer/sqlalchemy.py +++ b/gnocchi/indexer/sqlalchemy.py @@ -765,14 +765,25 @@ def create_metric(self, id, creator, archive_policy_name, raise return m + def list_metrics_for_cleanup(self, attribute_filter=None): + sql_alchemy_filter = sqlalchemy.or_( + Metric.last_measure_push >= Metric.last_cleanup_timestamp, + Metric.backwindow_changed.is_(True)) + + return self.list_metrics( + attribute_filter=attribute_filter, + sql_alchemy_filter=sql_alchemy_filter) + @retry_on_deadlock def list_metrics(self, details=False, status='active', limit=None, marker=None, sorts=None, policy_filter=None, resource_policy_filter=None, - attribute_filter=None): + attribute_filter=None, sql_alchemy_filter=None): sorts = sorts or [] with self.facade.independent_reader() as session: q = select(Metric).filter(Metric.status == status) + if sql_alchemy_filter is not None: + q = q.filter(sql_alchemy_filter) if details: q = q.options(sqlalchemy.orm.joinedload(Metric.resource)) if policy_filter or resource_policy_filter or attribute_filter: @@ -832,6 +843,7 @@ def list_metrics(self, details=False, status='active', sort_keys=sort_keys, marker=metric_marker, sort_dirs=sort_dirs) + except ValueError as e: raise indexer.InvalidPagination(e) except exception.InvalidSortKey as e: @@ -1395,6 +1407,45 @@ def delete_metric(self, id): if session.execute(stmt).rowcount == 0: raise indexer.NoSuchMetric(id) + def update_metric_last_push_timestamp(self, id): + with self.facade.writer() as session: + now_timestamp = datetime.datetime.now() + LOG.debug("Updating last push timestamp for metric [%s] with " + "timestamp [%s].", id, now_timestamp) + stmt = update(Metric).filter( + Metric.id == id, Metric.status == "active").values( + last_measure_push=now_timestamp) + if session.execute(stmt).rowcount == 0: + raise indexer.NoSuchMetric(id) + + def update_backwindow_changed(self, id, value=False): + with self.facade.writer() as session: + stmt = update(Metric).filter(Metric.id == id).values( + backwindow_changed=value) + if session.execute(stmt).rowcount == 0: + raise indexer.NoSuchMetric(id) + + def update_backwindow_changed_for_metrics_archive_policy( + self, archive_policy_name): + with self.facade.writer() as session: + stmt = update(Metric).filter( + Metric.archive_policy_name == archive_policy_name).values( + backwindow_changed=True) + if session.execute(stmt).rowcount == 0: + LOG.info("No metric was updated for archive_policy [%s]. " + "This might indicate that the archive policy is not " + "used by any metric.", archive_policy_name) + + def update_metric_last_cleanup_timestamp(self, id): + with self.facade.writer() as session: + now_timestamp = datetime.datetime.now() + LOG.debug("Updating last cleanup timestamp for metric [%s] " + "with timestamp [%s].", id, now_timestamp) + stmt = update(Metric).filter(Metric.id == id).values( + last_cleanup_timestamp=now_timestamp) + if session.execute(stmt).rowcount == 0: + raise indexer.NoSuchMetric(id) + @staticmethod def _build_sort_keys(sorts, unique_keys): # transform the api-wg representation to the oslo.db one diff --git a/gnocchi/indexer/sqlalchemy_base.py b/gnocchi/indexer/sqlalchemy_base.py index dfc630e3e..fafb19e81 100644 --- a/gnocchi/indexer/sqlalchemy_base.py +++ b/gnocchi/indexer/sqlalchemy_base.py @@ -99,12 +99,24 @@ class Metric(Base, GnocchiBase, indexer.Metric): sqlalchemy.ForeignKey('resource.id', ondelete="SET NULL", name="fk_metric_resource_id_resource_id")) + name = sqlalchemy.Column(sqlalchemy.String(255)) unit = sqlalchemy.Column(sqlalchemy.String(31)) status = sqlalchemy.Column(sqlalchemy.Enum('active', 'delete', name="metric_status_enum"), nullable=False, server_default='active') + last_measure_push = sqlalchemy.Column('last_measure_push', + types.TimestampUTC, nullable=False, + default=lambda: utils.utcnow()) + + last_cleanup_timestamp = sqlalchemy.Column( + 'last_cleanup_timestamp', types.TimestampUTC, + nullable=False, default=lambda: utils.utcnow()) + + backwindow_changed = sqlalchemy.Column( + "archive_policy_updated", sqlalchemy.Boolean, + nullable=False, default=False) def jsonify(self): d = { diff --git a/gnocchi/opts.py b/gnocchi/opts.py index 9b40071ea..cce0b858f 100644 --- a/gnocchi/opts.py +++ b/gnocchi/opts.py @@ -173,6 +173,14 @@ def list_opts(): min=1, help="Number of metrics that should be deleted " "simultaneously by one janitor."), + cfg.IntOpt('delay_consider_metric_inactive', + default=-1, + help="Interval, in seconds, that defines for how long a " + "metric be without receiving new measures before " + "we consider it as inactive and enable the " + "truncation of its raw data. The default is '-1', " + "which means that we do not clean up metrics that " + "are no longer receiving new measures."), )), ("api", ( cfg.StrOpt('paste_config', diff --git a/gnocchi/rest/api.py b/gnocchi/rest/api.py index cbec5b575..b6c75bbf8 100644 --- a/gnocchi/rest/api.py +++ b/gnocchi/rest/api.py @@ -261,7 +261,6 @@ def patch(self): voluptuous.Optional("back_window"): voluptuous.All( voluptuous.Coerce(int), voluptuous.Range(min=0)) })) - # Validate the data try: ap_items = [archive_policy.ArchivePolicyItem(**item) for item in body['definition']] @@ -269,6 +268,21 @@ def patch(self): abort(400, str(e)) try: + original_archive_policy = pecan.request.indexer.get_archive_policy( + self.archive_policy) + if body.get('back_window') and body.get( + 'back_window') != original_archive_policy.back_window: + LOG.info("Backwindow change in the archive policy. Therefore, " + "we will mark all of its metrics to be updated " + "according to it by the Janitor.") + pecan.request.indexer. \ + update_backwindow_changed_for_metrics_archive_policy( + self.archive_policy) + else: + LOG.debug("No need to update metrics backwindow change " + "status, because it did not change for this archive " + "policy [%s].", self.archive_policy) + return pecan.request.indexer.update_archive_policy( self.archive_policy, ap_items, back_window=body.get('back_window')) diff --git a/gnocchi/rest/app.py b/gnocchi/rest/app.py index 86c3747c3..c99a746c5 100644 --- a/gnocchi/rest/app.py +++ b/gnocchi/rest/app.py @@ -177,6 +177,10 @@ def load_app(conf, not_implemented_middleware=True): LOG.info("WSGI config used: %s", cfg_path) appname = "gnocchi+" + conf.api.auth_mode + + LOG.info("Starting application [%s] with configuration [%s].", + appname, APPCONFIGS) + app = deploy.loadapp("config:" + cfg_path, name=appname, global_conf={'configkey': configkey}) return http_proxy_to_wsgi.HTTPProxyToWSGI( @@ -194,6 +198,9 @@ def _setup_app(root, conf, not_implemented_middleware): if not_implemented_middleware: app = webob.exc.HTTPExceptionMiddleware(NotImplementedMiddleware(app)) + LOG.info("Application setup for context path [%s] and " + "configurations [%s].", + root, conf.__dict__ if conf else None) return app diff --git a/gnocchi/rest/auth_helper.py b/gnocchi/rest/auth_helper.py index d5df68939..003bc5780 100644 --- a/gnocchi/rest/auth_helper.py +++ b/gnocchi/rest/auth_helper.py @@ -14,12 +14,17 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import daiquiri + import webob import werkzeug.http from gnocchi.rest import api +LOG = daiquiri.getLogger(__name__) + + class KeystoneAuthHelper(object): @staticmethod def get_current_user(request): @@ -119,6 +124,8 @@ class BasicAuthHelper(object): @staticmethod def get_current_user(request): hdr = request.headers.get("Authorization") + LOG.debug("Processing basic auth request [%s]. Found " + "Authorization header [%s].", request, hdr) auth_hdr = (hdr.decode('utf-8') if isinstance(hdr, bytes) else hdr) @@ -134,6 +141,7 @@ def get_current_user(request): def get_auth_info(self, request): user = self.get_current_user(request) roles = [] + if user == "admin": roles.append("admin") return { @@ -154,6 +162,8 @@ class RemoteUserAuthHelper(object): @staticmethod def get_current_user(request): user = request.remote_user + LOG.debug("Processing remote user authentication for request [%s]. " + "The remote user found is [%s].", request, user) if user is None: api.abort(401) return user.decode('iso-8859-1') @@ -161,6 +171,7 @@ def get_current_user(request): def get_auth_info(self, request): user = self.get_current_user(request) roles = [] + if user == "admin": roles.append("admin") return { diff --git a/gnocchi/service.py b/gnocchi/service.py index c3f7dc77c..8a73cecda 100644 --- a/gnocchi/service.py +++ b/gnocchi/service.py @@ -78,6 +78,7 @@ def prepare_service(args=None, conf=None, logging_level = logging.WARNING logging.getLogger("gnocchi").setLevel(logging_level) + LOG.info("Preparing gnocchi service for configuration [%s].", conf.__dict__ if conf else None) # HACK(jd) I'm not happy about that, fix AP class to handle a conf object? archive_policy.ArchivePolicy.DEFAULT_AGGREGATION_METHODS = ( conf.archive_policy.default_aggregation_methods diff --git a/gnocchi/storage/__init__.py b/gnocchi/storage/__init__.py index cc3b5fc87..048c7cbf0 100644 --- a/gnocchi/storage/__init__.py +++ b/gnocchi/storage/__init__.py @@ -579,7 +579,7 @@ def _delete_metric_splits(self, metrics_keys_aggregations, version=3): in metrics_keys_aggregations.items() for key, aggregation in keys_and_aggregations)) - def add_measures_to_metrics(self, metrics_and_measures): + def add_measures_to_metrics(self, metrics_and_measures, indexer_driver): """Update a metric with a new measures, computing new aggregations. :param metrics_and_measures: A dict there keys are `storage.Metric` @@ -682,6 +682,16 @@ def _map_compute_splits_operations(bound_timeserie): new_boundts.append((metric, ts.serialize())) + # At this point, we update the metric with the last timestamp of + # the datapoint/measure that was processed. It is not a sure thing + # to update the timestamp here, because the actual data is only + # persisted in the next steps, and some error might happen or the + # MetricD can be stopped. However, for the use case we are using + # this new column, it is fine. If the sack is not processed by this + # agent, it will be processed by another one, and then the + # timestamp will be updated. + indexer_driver.update_metric_last_push_timestamp(metric.id) + with self.statistics.time("splits delete"): self._delete_metric_splits(splits_to_delete) self.statistics["splits delete"] += len(splits_to_delete)