Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup downloads on timescaledb #4979

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions app/avo/resources/log_download_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class LogDownloadResource < Avo::BaseResource
self.title = :id
self.includes = []

class BackendFilter < ScopeBooleanFilter; end
filter BackendFilter, arguments: {default: LogDownload.backends.transform_values { true } }
class StatusFilter < ScopeBooleanFilter; end
filter StatusFilter, arguments: {default: LogDownload.statuses.transform_values { true } }

field :id, as: :id
field :key, as: :text
field :directory, as: :text
field :backend, as: :select, enum: LogDownload.backends
field :status, as: :select, enum: LogDownload.statuses
end
4 changes: 4 additions & 0 deletions app/controllers/avo/log_downloads_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/2.0/controllers.html
class Avo::LogDownloadsController < Avo::ResourcesController
end
99 changes: 99 additions & 0 deletions app/jobs/fastly_log_downloads_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
require "zlib"

# Process log files downloaded from Fastly and insert row by row into the database.
# It works in a similar way to FastlyLogProcessor, but it's optimized for a different
# use case: it processes log files downloaded from Fastly and inserts the raw data into
# the database in batches.
# The counters and other metrics are calculated in a separate job directly in
# the database through the continuous aggregates.
# Check Download::PerMinute, Download::PerHour and other classes as an example.
class FastlyLogDownloadsProcessor
class LogFileNotFoundError < ::StandardError; end

extend StatsD::Instrument

BATCH_SIZE = 5000

attr_accessor :bucket, :key
attr_reader :processed_count

def initialize(bucket, key)
@bucket = bucket
@key = key
@processed_count = 0
end

def perform
StatsD.increment("fastly_log_downloads_processor.started")
raise LogFileNotFoundError if body.nil?

count = 0
parse_success_downloads.each_slice(BATCH_SIZE) do |batch|
Download.insert_all batch
count += batch.size
end

if count > 0
element.update(status: "processed", processed_count: count)
else
element.update(status: "failed")
end

# This value may diverge from numbers from the fastly_log_processor as totals are
# not aggregated with the number of downloads but each row represents a download.
StatsD.gauge("fastly_log_downloads_processor.processed_count", count)
count
end

def body
@body ||= element&.body
end

def element
@element ||= LogDownload.pop(directory: @bucket, key: @key)
end

def parse_success_downloads
body.each_line.map do |log_line|
fragments = log_line.split
path, response_code = fragments[10, 2]
case response_code.to_i
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when 200, 304
m = path.match(PATH_PATTERN)
gem_name = m[:gem_name] || path
gem_version = m[:gem_version]
created_at = Time.parse fragments[4..9].join(' ')
env = parse_env fragments[12..-1]
payload = {env:}

{created_at:, gem_name:, gem_version:, payload:}
end
end.compact
end


# Parse the env into a hash of key value pairs
# example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0"
# output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"}
# case it says single word like jruby it appends true as the value
# example env = "jruby"
# output = {jruby: "true"}
# also removes some unwanted characters
def parse_env(output)
env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip
env = nil if env == "(null)"
env = env.split(' ').map do |info|
pair = info.split(/\/|-/,2)
pair << "true" if pair.size == 1
pair
end.to_h
end

statsd_count_success :perform, "fastly_log_downloads_processor.perform"
statsd_measure :perform, "fastly_log_downloads_processor.job_performance"

PATH_PATTERN = /\/gems\/(?<gem_name>.*)-(?<gem_version>\d+.*)\.gem/
private_constant :PATH_PATTERN
end
21 changes: 21 additions & 0 deletions app/jobs/fastly_log_downloads_processor_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Same as the FastlyLogProcessorJob but for saving it to TimescaleDB
# and the Download table as flat downloads.
class FastlyLogDownloadsProcessorJob < ApplicationJob
queue_as :default
queue_with_priority PRIORITIES.fetch(:stats)

include GoodJob::ActiveJobExtensions::Concurrency
good_job_control_concurrency_with(
# Maximum number of jobs with the concurrency key to be
# concurrently performed (excludes enqueued jobs)
#
# Limited to avoid overloading the gem_download table with
# too many concurrent conflicting updates
perform_limit: good_job_concurrency_perform_limit(default: 5),
key: name
)

def perform(bucket:, key:)
FastlyLogDownloadsProcessor.new(bucket, key).perform
end
end
90 changes: 90 additions & 0 deletions app/models/download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
class Download < DownloadsDB
extend Timescaledb::ActsAsHypertable

acts_as_hypertable time_column: 'created_at'

scope :time_bucket, -> (range='1m', query="count(*)") do
time_column = self.hypertable_options[:time_column]
select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}")
end

scope :per_minute, -> (query="count(*) as downloads") do
time_bucket('1m', query).group(1).order(1)
end

scope :gems_per_minute, -> do
per_minute("gem_name, count(*) as downloads").group(1,2)
end

scope :versions_per_minute, -> do
per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3)
end

cagg = -> (view_name) do
Class.new(DownloadsDB) do
self.table_name = "downloads_#{view_name}"

scope :sum_downloads, -> { select("sum(downloads)::bigint as downloads") }
scope :avg_downloads, -> { select("avg(downloads)::bigint as avg_downloads") }

scope :rollup, -> (range='1d', query=:sum_downloads) do
time_column = Download.hypertable_options[:time_column]
if query.is_a?(Symbol)
select("time_bucket('#{range}', #{time_column}) as #{time_column}")
.public_send(query)
.group(1)
else
select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}")
.group(1)
end
end

scope :per_hour, -> (query=:sum_downloads) do
rollup('1h', query)
end

scope :per_day, -> (query=:sum_downloads) do
rollup('1d', query)
end

scope :per_week, -> (query=:sum_downloads) do
rollup('1w', query)
end

scope :per_month, -> (query=:sum_downloads) do
rollup('1mon', query)
end

scope :per_year, -> (query=:sum_downloads) do
rollup('1y', query)
end

def readonly?
true
end

def self.refresh!
connection_pool.with_connection do |conn|
# Fixme: This is a workaround to guarantee we're in a fresh connection
conn.reset! if conn.transaction_open?
conn.raw_connection.exec("CALL refresh_continuous_aggregate('#{table_name}', NULL, NULL)")
end
end
end
end

MaterializedViews = [
PerMinute = cagg['per_minute'],
PerHour = cagg['per_hour'],
PerDay = cagg['per_day'],
PerMonth = cagg['per_month'],
GemsPerMinute = cagg['gems_per_minute'],
GemsPerHour = cagg['gems_per_hour'],
GemsPerDay = cagg['gems_per_day'],
GemsPerMonth = cagg['gems_per_month'],
VersionsPerMinute = cagg['versions_per_minute'],
VersionsPerHour = cagg['versions_per_hour'],
VersionsPerDay = cagg['versions_per_day'],
VersionsPerMonth = cagg['versions_per_month']
]
end
5 changes: 5 additions & 0 deletions app/models/downloads_db.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class DownloadsDB < ApplicationRecord
self.abstract_class = true

connects_to database: { writing: :downloads }
end
31 changes: 31 additions & 0 deletions app/models/log_download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Mimic LogTicket model to store the log files but for the downloads database.
# It will be backfilled with the log files from the main database to the downloads database.
# There will be a background job to process the log files
class LogDownload < DownloadsDB
enum backend: { s3: 0, local: 1 }
enum status: %i[pending processing failed processed].index_with(&:to_s)

def self.pop(key: nil, directory: nil)
scope = pending.limit(1).order("id ASC")
scope = scope.where(key: key) if key
scope = scope.where(directory: directory) if directory
scope.lock(true).sole.tap do |log_download|
log_download.update_column(:status, "processing")
end
rescue ActiveRecord::RecordNotFound
nil # no ticket in queue found by `sole` call
end

def fs
@fs ||=
if s3?
RubygemFs::S3.new(bucket: directory)
else
RubygemFs::Local.new(directory)
end
end

def body
fs.get(key)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

module Maintenance
# Helper to keep backfilling LogTickets to TimescaleDB downloads table.
# It will be used to migrate the data from the old LogTicket table to the new LogDownload table.
# It will be executed in the background and it will be a one time task.
# Later, after all pending LogTickets are migrated, this job will be removed.
class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task
def collection
LogDownload.where(status: "pending")
end

def process(element)
FastlyLogDownloadsProcessor.new(element.directory, element.key).perform
end
end
end
3 changes: 2 additions & 1 deletion config/initializers/zeitwerk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@

Rails.autoloaders.once.inflector.inflect(
"http" => "HTTP",
"oidc" => "OIDC"
"oidc" => "OIDC",
"downloads_db" => "DownloadsDB"
)
63 changes: 63 additions & 0 deletions db/downloads_migrate/20240708184547_create_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
class CreateDownloads < ActiveRecord::Migration[7.1]

disable_ddl_transaction!

def self.up
self.down if Download.table_exists?

hypertable_options = {
time_column: 'created_at',
chunk_time_interval: '1 day',
compress_segmentby: 'gem_name, gem_version',
compress_orderby: 'created_at DESC',
compression_interval: '7 days'
}

create_table(:downloads, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :created_at, null: false
t.text :gem_name, :gem_version, null: false
t.jsonb :payload
end

{
per_minute: Download.per_minute,
per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1),
per_day: Download::PerHour.per_day(:sum_downloads).group(1),
per_month: Download::PerDay.per_month(:sum_downloads).group(1),

gems_per_minute: Download.gems_per_minute,
gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads)::bigint as downloads").group(1,2),
gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads)::bigint as downloads").group(1,2),
gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads)::bigint as downloads").group(1,2),

versions_per_minute: Download.versions_per_minute,
versions_per_hour: Download::VersionsPerMinute.sum_downloads.per_hour("gem_name, gem_version").group(1,2,3),
versions_per_day: Download::VersionsPerHour.sum_downloads.per_day("gem_name, gem_version").group(1,2,3),
versions_per_month: Download::VersionsPerDay.sum_downloads.per_month("gem_name, gem_version").group(1,2,3)
}.each do |name, scope|
frame = name.to_s.split('per_').last
create_continuous_aggregate(
"downloads_#{name}",
scope.to_sql,
refresh_policies: {
schedule_interval: "INTERVAL '1 #{frame}'",
start_offset: "INTERVAL '3 #{frame}'",
end_offset: "INTERVAL '1 minute'"
})
end
end
def self.down
%w[month day hour minute].each do |frame|
["downloads_per_#{frame}",
"downloads_gems_per_#{frame}",
"downloads_versions_per_#{frame}",
].each do |view|
safety_assured do
execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade")
end
end
end

drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists?
end
end
16 changes: 16 additions & 0 deletions db/downloads_migrate/20240823181725_create_log_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Mimic LogTicket table to store the log files but for the downloads database
# It will be used to store the log files to be processed during the migration
class CreateLogDownloads < ActiveRecord::Migration[7.1]
def change
create_table :log_downloads do |t|
t.string :key
t.string :directory
t.integer :backend
t.string :status, default: "pending"
t.integer :processed_count, default: 0
t.timestamps
end

add_index :log_downloads, [:key, :directory], unique: true
end
end
Loading
Loading