Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregation option based on past partitions #121

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/travis/logs/aggregate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def setup

def run
loop do
aggregate_logs
aggregate_logs(partitions: partitions?)
sleep sleep_interval
end
end
Expand Down Expand Up @@ -45,10 +45,10 @@ def run_ranges
end
end

def aggregate_logs
def aggregate_logs(partitions: false)
lock.exclusive do
begin
aggregator.run
aggregator.run(partitions: partitions)
rescue StandardError => e
Travis::Exceptions.handle(e)
end
Expand All @@ -66,6 +66,10 @@ def aggregate_logs
private def lock
@lock ||= Travis::Logs::Lock.new('logs.aggregate')
end

private def partitions?
Travis.config.logs.aggregate_partitions?
end
end
end
end
1 change: 1 addition & 0 deletions lib/travis/logs/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Config < Travis::Config
logs: {
aggregatable_order: nil,
aggregate_clean_skip_empty: true,
aggregate_partitions: false,
aggregate_pool: {
max_queue: 0,
max_threads: 20,
Expand Down
28 changes: 28 additions & 0 deletions lib/travis/logs/database.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'active_support/core_ext/numeric'
require 'logger'
require 'sequel'
require 'pg'
Expand Down Expand Up @@ -205,6 +206,33 @@ def aggregatable_logs(regular_interval, force_interval, limit,
query.map(:log_id).uniq
end

def aggregatable_logs_by_partition(per_partition_limit,
order: :created_at,
cutoff: 1.day)
maint.restrict!
ids = []

cutoff = Time.now.utc - cutoff
cutoff_partition = cutoff.strftime('log_parts_p%Y_%m_%d')

partitions = db[
%[SELECT partman.show_partitions('public.log_parts')]
].map do |row|
row.fetch(:show_partitions).gsub(/[()]/, '').split(',').last

This comment was marked as spam.

end

partitions.each do |partition|
next if partition >= cutoff_partition

query = db[partition.to_sym].select(:log_id)
.limit(per_partition_limit)
query = query.order(order.to_sym) unless order.nil?
ids += query.map(:log_id)
end

ids.uniq
end

def min_log_part_id
maint.restrict!
db['SELECT min(id) AS id FROM log_parts'].first[:id]
Expand Down
13 changes: 10 additions & 3 deletions lib/travis/logs/services/aggregate_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ def initialize(database = nil, pool_config = {})
end
end

def run
def run(partitions: false)
Travis.logger.info('fetching aggregatable ids')

ids = aggregatable_ids
ids = aggregatable_ids(partitions: partitions)
if ids.empty?
Travis.logger.info('no aggregatable ids')
return
Expand Down Expand Up @@ -150,7 +150,14 @@ def aggregate_log(log_id)
end
end

private def aggregatable_ids
private def aggregatable_ids(partitions: false)
if partitions
return database.aggregatable_logs_by_partition(
per_aggregate_limit,
order: aggregatable_order
)
end

database.aggregatable_logs(
intervals[:sweeper], intervals[:force],
per_aggregate_limit,
Expand Down