From b3cb4bd44938f13fe75dbd191e280ad8b0738228 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 5 May 2017 10:36:27 -0400 Subject: [PATCH] Add aggregation option based on past partitions so that cleanup can be performed an any straggling log parts prior to partition removal. --- lib/travis/logs/aggregate.rb | 10 +++++--- lib/travis/logs/config.rb | 1 + lib/travis/logs/database.rb | 28 ++++++++++++++++++++++ lib/travis/logs/services/aggregate_logs.rb | 13 +++++++--- 4 files changed, 46 insertions(+), 6 deletions(-) diff --git a/lib/travis/logs/aggregate.rb b/lib/travis/logs/aggregate.rb index 44fd636d..8cf0d9d7 100644 --- a/lib/travis/logs/aggregate.rb +++ b/lib/travis/logs/aggregate.rb @@ -15,7 +15,7 @@ def setup def run loop do - aggregate_logs + aggregate_logs(partitions: partitions?) sleep sleep_interval end end @@ -45,10 +45,10 @@ def run_ranges end end - def aggregate_logs + def aggregate_logs(partitions: false) lock.exclusive do begin - aggregator.run + aggregator.run(partitions: partitions) rescue StandardError => e Travis::Exceptions.handle(e) end @@ -66,6 +66,10 @@ def aggregate_logs private def lock @lock ||= Travis::Logs::Lock.new('logs.aggregate') end + + private def partitions? + Travis.config.logs.aggregate_partitions? + end end end end diff --git a/lib/travis/logs/config.rb b/lib/travis/logs/config.rb index fcfa1f4e..4a0868c7 100644 --- a/lib/travis/logs/config.rb +++ b/lib/travis/logs/config.rb @@ -15,6 +15,7 @@ class Config < Travis::Config logs: { aggregatable_order: nil, aggregate_clean_skip_empty: true, + aggregate_partitions: false, aggregate_pool: { max_queue: 0, max_threads: 20, diff --git a/lib/travis/logs/database.rb b/lib/travis/logs/database.rb index 05078a82..2cb95633 100644 --- a/lib/travis/logs/database.rb +++ b/lib/travis/logs/database.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require 'active_support/core_ext/numeric' require 'logger' require 'sequel' require 'pg' @@ -206,6 +207,33 @@ def aggregatable_logs(regular_interval, force_interval, limit, query.map(:log_id).uniq end + def aggregatable_logs_by_partition(per_partition_limit, + order: :created_at, + cutoff: 1.day) + maint.restrict! + ids = [] + + cutoff = Time.now.utc - cutoff + cutoff_partition = cutoff.strftime('log_parts_p%Y_%m_%d') + + partitions = db[ + %[SELECT partman.show_partitions('public.log_parts')] + ].map do |row| + row.fetch(:show_partitions).gsub(/[()]/, '').split(',').last + end + + partitions.each do |partition| + next if partition >= cutoff_partition + + query = db[partition.to_sym].select(:log_id) + .limit(per_partition_limit) + query = query.order(order.to_sym) unless order.nil? + ids += query.map(:log_id) + end + + ids.uniq + end + def min_log_part_id maint.restrict! db['SELECT min(id) AS id FROM log_parts'].first[:id] diff --git a/lib/travis/logs/services/aggregate_logs.rb b/lib/travis/logs/services/aggregate_logs.rb index d21ce11a..15d2020e 100644 --- a/lib/travis/logs/services/aggregate_logs.rb +++ b/lib/travis/logs/services/aggregate_logs.rb @@ -33,10 +33,10 @@ def initialize(database = nil, pool_config = {}) end end - def run + def run(partitions: false) Travis.logger.info('fetching aggregatable ids') - ids = aggregatable_ids + ids = aggregatable_ids(partitions: partitions) if ids.empty? Travis.logger.info('no aggregatable ids') return @@ -150,7 +150,14 @@ def aggregate_log(log_id) end end - private def aggregatable_ids + private def aggregatable_ids(partitions: false) + if partitions + return database.aggregatable_logs_by_partition( + per_aggregate_limit, + order: aggregatable_order + ) + end + database.aggregatable_logs( intervals[:sweeper], intervals[:force], per_aggregate_limit,