diff --git a/app/avo/resources/log_download_resource.rb b/app/avo/resources/log_download_resource.rb new file mode 100644 index 00000000000..b3613efc4d0 --- /dev/null +++ b/app/avo/resources/log_download_resource.rb @@ -0,0 +1,15 @@ +class LogDownloadResource < Avo::BaseResource + self.title = :id + self.includes = [] + + class BackendFilter < ScopeBooleanFilter; end + filter BackendFilter, arguments: {default: LogDownload.backends.transform_values { true } } + class StatusFilter < ScopeBooleanFilter; end + filter StatusFilter, arguments: {default: LogDownload.statuses.transform_values { true } } + + field :id, as: :id + field :key, as: :text + field :directory, as: :text + field :backend, as: :select, enum: LogDownload.backends + field :status, as: :select, enum: LogDownload.statuses +end diff --git a/app/controllers/avo/log_downloads_controller.rb b/app/controllers/avo/log_downloads_controller.rb new file mode 100644 index 00000000000..a763031854f --- /dev/null +++ b/app/controllers/avo/log_downloads_controller.rb @@ -0,0 +1,4 @@ +# This controller has been generated to enable Rails' resource routes. +# More information on https://docs.avohq.io/2.0/controllers.html +class Avo::LogDownloadsController < Avo::ResourcesController +end diff --git a/app/jobs/fastly_log_downloads_processor.rb b/app/jobs/fastly_log_downloads_processor.rb new file mode 100644 index 00000000000..767e7babb8f --- /dev/null +++ b/app/jobs/fastly_log_downloads_processor.rb @@ -0,0 +1,99 @@ +require "zlib" + +# Process log files downloaded from Fastly and insert row by row into the database. +# It works in a similar way to FastlyLogProcessor, but it's optimized for a different +# use case: it processes log files downloaded from Fastly and inserts the raw data into +# the database in batches. +# The counters and other metrics are calculated in a separate job directly in +# the database through the continuous aggregates. +# Check Download::PerMinute, Download::PerHour and other classes as an example. +class FastlyLogDownloadsProcessor + class LogFileNotFoundError < ::StandardError; end + + extend StatsD::Instrument + + BATCH_SIZE = 5000 + + attr_accessor :bucket, :key + attr_reader :processed_count + + def initialize(bucket, key) + @bucket = bucket + @key = key + @processed_count = 0 + end + + def perform + StatsD.increment("fastly_log_downloads_processor.started") + raise LogFileNotFoundError if body.nil? + + count = 0 + parse_success_downloads.each_slice(BATCH_SIZE) do |batch| + Download.insert_all batch + count += batch.size + end + + if count > 0 + element.update(status: "processed", processed_count: count) + else + element.update(status: "failed") + end + + # This value may diverge from numbers from the fastly_log_processor as totals are + # not aggregated with the number of downloads but each row represents a download. + StatsD.gauge("fastly_log_downloads_processor.processed_count", count) + count + end + + def body + @body ||= element&.body + end + + def element + @element ||= LogDownload.pop(directory: @bucket, key: @key) + end + + def parse_success_downloads + body.each_line.map do |log_line| + fragments = log_line.split + path, response_code = fragments[10, 2] + case response_code.to_i + # Only count successful downloads + # NB: we consider a 304 response a download attempt + when 200, 304 + m = path.match(PATH_PATTERN) + gem_name = m[:gem_name] || path + gem_version = m[:gem_version] + created_at = Time.parse fragments[4..9].join(' ') + env = parse_env fragments[12..-1] + payload = {env:} + + {created_at:, gem_name:, gem_version:, payload:} + end + end.compact + end + + + # Parse the env into a hash of key value pairs + # example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0" + # output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"} + # case it says single word like jruby it appends true as the value + # example env = "jruby" + # output = {jruby: "true"} + # also removes some unwanted characters + def parse_env(output) + env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip + env = nil if env == "(null)" + env = env.split(' ').map do |info| + pair = info.split(/\/|-/,2) + pair << "true" if pair.size == 1 + pair + end.to_h + end + + statsd_count_success :perform, "fastly_log_downloads_processor.perform" + statsd_measure :perform, "fastly_log_downloads_processor.job_performance" + + PATH_PATTERN = /\/gems\/(?.*)-(?\d+.*)\.gem/ + private_constant :PATH_PATTERN +end diff --git a/app/jobs/fastly_log_downloads_processor_job.rb b/app/jobs/fastly_log_downloads_processor_job.rb new file mode 100644 index 00000000000..9f0b74031e9 --- /dev/null +++ b/app/jobs/fastly_log_downloads_processor_job.rb @@ -0,0 +1,21 @@ +# Same as the FastlyLogProcessorJob but for saving it to TimescaleDB +# and the Download table as flat downloads. +class FastlyLogDownloadsProcessorJob < ApplicationJob + queue_as :default + queue_with_priority PRIORITIES.fetch(:stats) + + include GoodJob::ActiveJobExtensions::Concurrency + good_job_control_concurrency_with( + # Maximum number of jobs with the concurrency key to be + # concurrently performed (excludes enqueued jobs) + # + # Limited to avoid overloading the gem_download table with + # too many concurrent conflicting updates + perform_limit: good_job_concurrency_perform_limit(default: 5), + key: name + ) + + def perform(bucket:, key:) + FastlyLogDownloadsProcessor.new(bucket, key).perform + end +end diff --git a/app/models/download.rb b/app/models/download.rb new file mode 100644 index 00000000000..70f7f6e26b6 --- /dev/null +++ b/app/models/download.rb @@ -0,0 +1,90 @@ +class Download < DownloadsDB + extend Timescaledb::ActsAsHypertable + + acts_as_hypertable time_column: 'created_at' + + scope :time_bucket, -> (range='1m', query="count(*)") do + time_column = self.hypertable_options[:time_column] + select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") + end + + scope :per_minute, -> (query="count(*) as downloads") do + time_bucket('1m', query).group(1).order(1) + end + + scope :gems_per_minute, -> do + per_minute("gem_name, count(*) as downloads").group(1,2) + end + + scope :versions_per_minute, -> do + per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3) + end + + cagg = -> (view_name) do + Class.new(DownloadsDB) do + self.table_name = "downloads_#{view_name}" + + scope :sum_downloads, -> { select("sum(downloads)::bigint as downloads") } + scope :avg_downloads, -> { select("avg(downloads)::bigint as avg_downloads") } + + scope :rollup, -> (range='1d', query=:sum_downloads) do + time_column = Download.hypertable_options[:time_column] + if query.is_a?(Symbol) + select("time_bucket('#{range}', #{time_column}) as #{time_column}") + .public_send(query) + .group(1) + else + select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") + .group(1) + end + end + + scope :per_hour, -> (query=:sum_downloads) do + rollup('1h', query) + end + + scope :per_day, -> (query=:sum_downloads) do + rollup('1d', query) + end + + scope :per_week, -> (query=:sum_downloads) do + rollup('1w', query) + end + + scope :per_month, -> (query=:sum_downloads) do + rollup('1mon', query) + end + + scope :per_year, -> (query=:sum_downloads) do + rollup('1y', query) + end + + def readonly? + true + end + + def self.refresh! + connection_pool.with_connection do |conn| + # Fixme: This is a workaround to guarantee we're in a fresh connection + conn.reset! if conn.transaction_open? + conn.raw_connection.exec("CALL refresh_continuous_aggregate('#{table_name}', NULL, NULL)") + end + end + end + end + + MaterializedViews = [ + PerMinute = cagg['per_minute'], + PerHour = cagg['per_hour'], + PerDay = cagg['per_day'], + PerMonth = cagg['per_month'], + GemsPerMinute = cagg['gems_per_minute'], + GemsPerHour = cagg['gems_per_hour'], + GemsPerDay = cagg['gems_per_day'], + GemsPerMonth = cagg['gems_per_month'], + VersionsPerMinute = cagg['versions_per_minute'], + VersionsPerHour = cagg['versions_per_hour'], + VersionsPerDay = cagg['versions_per_day'], + VersionsPerMonth = cagg['versions_per_month'] + ] +end diff --git a/app/models/downloads_db.rb b/app/models/downloads_db.rb new file mode 100644 index 00000000000..6c448398c97 --- /dev/null +++ b/app/models/downloads_db.rb @@ -0,0 +1,5 @@ +class DownloadsDB < ApplicationRecord + self.abstract_class = true + + connects_to database: { writing: :downloads } +end diff --git a/app/models/log_download.rb b/app/models/log_download.rb new file mode 100644 index 00000000000..2d1f6d87029 --- /dev/null +++ b/app/models/log_download.rb @@ -0,0 +1,31 @@ +# Mimic LogTicket model to store the log files but for the downloads database. +# It will be backfilled with the log files from the main database to the downloads database. +# There will be a background job to process the log files +class LogDownload < DownloadsDB + enum backend: { s3: 0, local: 1 } + enum status: %i[pending processing failed processed].index_with(&:to_s) + + def self.pop(key: nil, directory: nil) + scope = pending.limit(1).order("id ASC") + scope = scope.where(key: key) if key + scope = scope.where(directory: directory) if directory + scope.lock(true).sole.tap do |log_download| + log_download.update_column(:status, "processing") + end + rescue ActiveRecord::RecordNotFound + nil # no ticket in queue found by `sole` call + end + + def fs + @fs ||= + if s3? + RubygemFs::S3.new(bucket: directory) + else + RubygemFs::Local.new(directory) + end + end + + def body + fs.get(key) + end +end diff --git a/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb new file mode 100644 index 00000000000..0750ad0b594 --- /dev/null +++ b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Maintenance + # Helper to keep backfilling LogTickets to TimescaleDB downloads table. + # It will be used to migrate the data from the old LogTicket table to the new LogDownload table. + # It will be executed in the background and it will be a one time task. + # Later, after all pending LogTickets are migrated, this job will be removed. + class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task + def collection + LogDownload.where(status: "pending") + end + + def process(element) + FastlyLogDownloadsProcessor.new(element.directory, element.key).perform + end + end +end diff --git a/config/initializers/zeitwerk.rb b/config/initializers/zeitwerk.rb index cba98d517fc..896e8817c1b 100644 --- a/config/initializers/zeitwerk.rb +++ b/config/initializers/zeitwerk.rb @@ -11,5 +11,6 @@ Rails.autoloaders.once.inflector.inflect( "http" => "HTTP", - "oidc" => "OIDC" + "oidc" => "OIDC", + "downloads_db" => "DownloadsDB" ) diff --git a/db/downloads_migrate/20240708184547_create_downloads.rb b/db/downloads_migrate/20240708184547_create_downloads.rb new file mode 100644 index 00000000000..1ac3998ceea --- /dev/null +++ b/db/downloads_migrate/20240708184547_create_downloads.rb @@ -0,0 +1,63 @@ +class CreateDownloads < ActiveRecord::Migration[7.1] + + disable_ddl_transaction! + + def self.up + self.down if Download.table_exists? + + hypertable_options = { + time_column: 'created_at', + chunk_time_interval: '1 day', + compress_segmentby: 'gem_name, gem_version', + compress_orderby: 'created_at DESC', + compression_interval: '7 days' + } + + create_table(:downloads, id: false, hypertable: hypertable_options) do |t| + t.timestamptz :created_at, null: false + t.text :gem_name, :gem_version, null: false + t.jsonb :payload + end + + { + per_minute: Download.per_minute, + per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1), + per_day: Download::PerHour.per_day(:sum_downloads).group(1), + per_month: Download::PerDay.per_month(:sum_downloads).group(1), + + gems_per_minute: Download.gems_per_minute, + gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads)::bigint as downloads").group(1,2), + gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads)::bigint as downloads").group(1,2), + gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads)::bigint as downloads").group(1,2), + + versions_per_minute: Download.versions_per_minute, + versions_per_hour: Download::VersionsPerMinute.sum_downloads.per_hour("gem_name, gem_version").group(1,2,3), + versions_per_day: Download::VersionsPerHour.sum_downloads.per_day("gem_name, gem_version").group(1,2,3), + versions_per_month: Download::VersionsPerDay.sum_downloads.per_month("gem_name, gem_version").group(1,2,3) + }.each do |name, scope| + frame = name.to_s.split('per_').last + create_continuous_aggregate( + "downloads_#{name}", + scope.to_sql, + refresh_policies: { + schedule_interval: "INTERVAL '1 #{frame}'", + start_offset: "INTERVAL '3 #{frame}'", + end_offset: "INTERVAL '1 minute'" + }) + end + end + def self.down + %w[month day hour minute].each do |frame| + ["downloads_per_#{frame}", + "downloads_gems_per_#{frame}", + "downloads_versions_per_#{frame}", + ].each do |view| + safety_assured do + execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade") + end + end + end + + drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists? + end +end diff --git a/db/downloads_migrate/20240823181725_create_log_downloads.rb b/db/downloads_migrate/20240823181725_create_log_downloads.rb new file mode 100644 index 00000000000..21725c43ca8 --- /dev/null +++ b/db/downloads_migrate/20240823181725_create_log_downloads.rb @@ -0,0 +1,16 @@ +# Mimic LogTicket table to store the log files but for the downloads database +# It will be used to store the log files to be processed during the migration +class CreateLogDownloads < ActiveRecord::Migration[7.1] + def change + create_table :log_downloads do |t| + t.string :key + t.string :directory + t.integer :backend + t.string :status, default: "pending" + t.integer :processed_count, default: 0 + t.timestamps + end + + add_index :log_downloads, [:key, :directory], unique: true + end +end diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index c390619acd2..d1613ada846 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -10,9 +10,128 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 0) do +ActiveRecord::Schema[7.1].define(version: 2024_08_23_181725) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" enable_extension "timescaledb" + create_table "downloads", id: false, force: :cascade do |t| + t.timestamptz "created_at", null: false + t.text "gem_name", null: false + t.text "gem_version", null: false + t.jsonb "payload" + t.index ["created_at"], name: "downloads_created_at_idx", order: :desc + end + + create_table "log_downloads", force: :cascade do |t| + t.string "key" + t.string "directory" + t.integer "backend" + t.string "status", default: "pending" + t.integer "processed_count", default: 0 + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key", "directory"], name: "index_log_downloads_on_key_and_directory", unique: true + end + + create_hypertable "downloads", time_column: "created_at", chunk_time_interval: "1 day", compress_segmentby: "gem_name, gem_version", compress_orderby: "created_at DESC", compression_interval: "P7D" + create_continuous_aggregate("downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)) + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_gems_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + gem_name, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_gems_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + gem_name, + (sum(downloads))::bigint AS downloads + FROM downloads_gems_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_gems_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + gem_name, + (sum(downloads))::bigint AS downloads + FROM downloads_gems_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_gems_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + gem_name, + (sum(downloads))::bigint AS downloads + FROM downloads_gems_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_versions_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + gem_name, + gem_version, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name, gem_version + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_versions_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + gem_name, + gem_version, + (sum(downloads))::bigint AS downloads + FROM downloads_versions_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name, gem_version + SQL + + create_continuous_aggregate("downloads_versions_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + gem_name, + gem_version, + (sum(downloads))::bigint AS downloads + FROM downloads_versions_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name, gem_version + SQL + + create_continuous_aggregate("downloads_versions_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + gem_name, + gem_version, + (sum(downloads))::bigint AS downloads + FROM downloads_versions_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name, gem_version + SQL + end diff --git a/lib/shoryuken/sqs_worker.rb b/lib/shoryuken/sqs_worker.rb index e12dbc834dc..996eec6c732 100644 --- a/lib/shoryuken/sqs_worker.rb +++ b/lib/shoryuken/sqs_worker.rb @@ -24,6 +24,11 @@ def perform(_sqs_msg, body) StatsD.increment("fastly_log_processor.duplicated") else FastlyLogProcessorJob.perform_later(bucket:, key:) + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + StatsD.increment("fastly_log_downloads_processor.enqueued") + LogDownload.create!(backend: "s3", key: key, directory: bucket) + FastlyLogDownloadsProcessorJob.perform_later(bucket:, key:) + end end end end diff --git a/test/factories/downloads.rb b/test/factories/downloads.rb new file mode 100644 index 00000000000..a47ccb8e5b0 --- /dev/null +++ b/test/factories/downloads.rb @@ -0,0 +1,8 @@ +FactoryBot.define do + factory :download do + gem_name { "example" } + gem_version { "0.0.1" } + payload { { env: { bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0" } } } + created_at { Time.now } + end +end diff --git a/test/factories/log_downloads.rb b/test/factories/log_downloads.rb new file mode 100644 index 00000000000..0d010c9b75a --- /dev/null +++ b/test/factories/log_downloads.rb @@ -0,0 +1,8 @@ +FactoryBot.define do + factory :log_download do + sequence(:key) { "key-#{_1}" } + sequence(:directory) { "directory-#{_1}" } + status { :pending } + backend { 0 } + end +end diff --git a/test/helpers/timescaledb_helpers.rb b/test/helpers/timescaledb_helpers.rb new file mode 100644 index 00000000000..87402ccdeb1 --- /dev/null +++ b/test/helpers/timescaledb_helpers.rb @@ -0,0 +1,12 @@ +module TimescaledbHelpers + extend ActiveSupport::Concern + + included do + def refresh_all_caggs! + Download.connection.commit_db_transaction + Download::MaterializedViews.each do |cagg| + cagg.refresh! + end + end + end +end diff --git a/test/jobs/fastly_log_downloads_processor_job_test.rb b/test/jobs/fastly_log_downloads_processor_job_test.rb new file mode 100644 index 00000000000..7875cd38011 --- /dev/null +++ b/test/jobs/fastly_log_downloads_processor_job_test.rb @@ -0,0 +1,51 @@ +require "test_helper" + +class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase + setup do + @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read + + @sample_log_counts = { + "bundler-1.10.6" => 2, + "json-1.8.3-java" => 2, + "json-1.8.3" => 1, + "json-1.8.2" => 4, + "no-such-gem-1.2.3" => 1 + } + + @log_download = LogDownload.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") + + Aws.config[:s3] = { + stub_responses: { get_object: { body: @sample_log } } + } + @processor = FastlyLogDownloadsProcessor.new("test-bucket", "fastly-fake.log") + @job = FastlyLogDownloadsProcessorJob.new(bucket: "test-bucket", key: "fastly-fake.log") + Download.connection.execute("truncate table downloads") + end + + teardown do + # Remove stubbed response + Aws.config.delete(:s3) + end + + def perform_and_refresh + count = @processor.perform + refresh_all_caggs! + count + end + + context "#perform" do + should "process file" do + assert_equal 10, perform_and_refresh + summary_counts = Download::VersionsPerHour.all.each_with_object({}){|e,summary|summary[e.gem_name+"-"+e.gem_version] = e.downloads} + assert_equal @sample_log_counts, summary_counts + end + + should "fail if dont find the file" do + @log_download.update(backend: "local", directory: "foobar") + assert_raises FastlyLogDownloadsProcessor::LogFileNotFoundError do + perform_and_refresh + end + end + end +end + diff --git a/test/models/download_test.rb b/test/models/download_test.rb new file mode 100644 index 00000000000..87f4bbec008 --- /dev/null +++ b/test/models/download_test.rb @@ -0,0 +1,192 @@ +require "test_helper" + +class DownloadTest < ActiveSupport::TestCase + include ActiveSupport::Testing::TimeHelpers + + # This will run once before all tests in this class + setup do + # Truncate the table because we are using a transaction + Download.connection.execute("truncate table downloads") + end + + teardown do + travel_back + # Truncate the table because we are using a transaction + Download.connection.execute("truncate table downloads") + end + + def refresh_all! + # We need to commit the transaction to make sure the continuous aggregates are refreshed with latest data + Download.connection.commit_db_transaction + Download::MaterializedViews.each do |view| + view.refresh! + end + end + + context ".per_minute" do + should "return downloads per minute" do + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.per_minute + + create(:download, created_at: 2.minutes.ago) + create(:download, created_at: 1.minute.ago) + create(:download, created_at: 1.minute.ago) + + assert_equal [1, 2], Download.per_minute.map(&:downloads) + end + end + end + + context ".gems_per_minute" do + should "return gems downloads per minute" do + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.gems_per_minute + + create(:download, gem_name: "example", created_at: 2.minutes.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example2", created_at: 1.minute.ago) + + assert_equal [1, 2], Download.gems_per_minute.where(gem_name: "example").map(&:downloads) + assert_equal [1], Download.gems_per_minute.where(gem_name: "example2").map(&:downloads) + end + end + end + + context ".versions_per_minute" do + should "return versions downloads per minute" do + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.versions_per_minute + + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 2.minutes.ago) + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) + + assert_equal [1, 1], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) + assert_equal [2], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) + assert_equal [1], Download.versions_per_minute.where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) + end + end + end + + context "Continuous Aggregate" do + should "materialized views by minute, hour, day and month" do + assert Download::PerMinute.table_exists? + assert Download::PerHour.table_exists? + assert Download::PerDay.table_exists? + assert Download::PerMonth.table_exists? + + assert Download::GemsPerMinute.table_exists? + assert Download::GemsPerHour.table_exists? + assert Download::GemsPerDay.table_exists? + assert Download::GemsPerMonth.table_exists? + + assert Download::VersionsPerMinute.table_exists? + assert Download::VersionsPerHour.table_exists? + assert Download::VersionsPerDay.table_exists? + assert Download::VersionsPerMonth.table_exists? + + assert Download::MaterializedViews == [ + Download::PerMinute, + Download::PerHour, + Download::PerDay, + Download::PerMonth, + Download::GemsPerMinute, + Download::GemsPerHour, + Download::GemsPerDay, + Download::GemsPerMonth, + Download::VersionsPerMinute, + Download::VersionsPerHour, + Download::VersionsPerDay, + Download::VersionsPerMonth + ] + end + + should "refresh materialized views" do + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + [1.year.ago, 11.months.ago, 10.months.ago, 3.months.ago, 1.month.ago, 1.day.ago, 1.hour.ago, 1.minute.ago, 30.seconds.ago].each do |created_at| + create(:download, gem_name: "alpha", gem_version: "0.0.1", created_at: created_at) + end + + [3.months.ago, 1.month.ago, 1.day.ago, 1.hour.ago, 1.minute.ago, 45.seconds.ago].each do |created_at| + create(:download, gem_name: "beta", gem_version: "0.0.1", created_at: created_at) + end + + refresh_all! + + assert_equal 15, Download.count + assert_equal 8, Download::PerMinute.count + assert_equal 7, Download::PerHour.count + assert_equal 7, Download::PerDay.count + assert_equal 6, Download::PerMonth.count + + expected_per_minute = [ + {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T12:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:59:00.000Z","downloads":4} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_minute, Download::PerMinute.all.map{_1.attributes.symbolize_keys} + + expected_per_hour = [ + {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:00:00.000Z","downloads":6}, + {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T12:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T12:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_hour, Download::PerHour.all.map{_1.attributes.symbolize_keys} + + expected_per_day = [ + {"created_at":"2024-08-23T00:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T00:00:00.000Z","downloads":6}, + {"created_at":"2023-08-24T00:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T00:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T00:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T00:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T00:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_day, Download::PerDay.all.map{_1.attributes.symbolize_keys} + + expected_per_month = [ + {"created_at":"2024-08-01T00:00:00.000Z","downloads":8}, + {"created_at":"2023-08-01T00:00:00.000Z","downloads":1}, + {"created_at":"2023-09-01T00:00:00.000Z","downloads":1}, + {"created_at":"2023-10-01T00:00:00.000Z","downloads":1}, + {"created_at":"2024-05-01T00:00:00.000Z","downloads":2}, + {"created_at":"2024-07-01T00:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + assert_equal expected_per_month, Download::PerMonth.all.map{_1.attributes.symbolize_keys} + + assert_equal [1, 1, 1, 2, 2, 2, 2, 4], Download::PerMinute.all.map{_1.attributes["downloads"]} + + downloads_per_minute = -> (gem_name) { Download::GemsPerMinute.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 1, 1, 1, 1, 1, 1, 2], downloads_per_minute.("alpha") + assert_equal [1,1,1,1,2], downloads_per_minute.("beta") + + downloads_per_hour = -> (gem_name) { Download::GemsPerHour.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_hour.("alpha") + assert_equal [1, 3, 1, 1], downloads_per_hour.("beta") + + downloads_per_day = -> (gem_name) { Download::GemsPerDay.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_day.("alpha") + assert_equal [1, 3, 1, 1], downloads_per_day.("beta") + + downloads_per_month = -> (gem_name) { Download::GemsPerMonth.where('gem_name': gem_name).pluck('downloads')} + assert_equal [4, 1, 1, 1, 1, 1], downloads_per_month.("alpha") + assert_equal [4, 1, 1], downloads_per_month.("beta") + end + end + end +end \ No newline at end of file diff --git a/test/models/log_download_test.rb b/test/models/log_download_test.rb new file mode 100644 index 00000000000..a389a1e60d3 --- /dev/null +++ b/test/models/log_download_test.rb @@ -0,0 +1,63 @@ +require "test_helper" + +class LogDownloadTest < ActiveSupport::TestCase + setup do + @log_download = LogDownload.create!(directory: "test", key: "fake.log") + end + + should "not allow duplicate directory and key" do + assert_raises(ActiveRecord::RecordNotUnique) do + LogDownload.create!(directory: "test", key: "fake.log") + end + end + + should "allow diffent keys in same directory" do + LogDownload.create!(directory: "test", key: "fake2.log") + end + + context "#pop" do + setup do + @log_download = LogDownload.create!(directory: "test/2", key: "bar", status: "pending") + end + + context "without any keys" do + setup do + LogDownload.create!(directory: "test", key: "fake3.log") + end + + should "return the first download" do + assert_equal "fake.log", LogDownload.pop.key + end + end + + context "with a key" do + setup do + LogDownload.create!(directory: "test", key: "fake4.log") + end + + should "return the first download" do + assert_equal "fake4.log", LogDownload.pop(key: "fake4.log").key + end + end + + should "change the status" do + one = LogDownload.pop + assert_equal "processing", one.status + end + + should "return nil in case no download is available" do + 2.times { LogDownload.pop } + assert_nil LogDownload.pop + end + + context "with a directory" do + setup do + LogDownload.create!(directory: "test/dir", key: "fake5.log") + end + + should "return the first download" do + assert_equal "fake5.log", LogDownload.pop(directory: "test/dir").key + end + end + end +end diff --git a/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb b/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb new file mode 100644 index 00000000000..d5b7c5dba9a --- /dev/null +++ b/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "test_helper" + +module Maintenance + class BackfillLogTicketsToTimescaleDownloadsTaskTest < ActiveSupport::TestCase + setup do + @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read + + @sample_log_counts = { + "bundler-1.10.6" => 2, + "json-1.8.3-java" => 2, + "json-1.8.3" => 1, + "json-1.8.2" => 4, + "no-such-gem-1.2.3" => 1 + } + + @log_download = LogDownload.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") + + Aws.config[:s3] = { + stub_responses: { get_object: { body: @sample_log } } + } + Download.connection.execute("truncate table downloads") + end + + test "process" do + Maintenance::BackfillLogTicketsToTimescaleDownloadsTask.process(@log_download) + refresh_all_caggs! + @log_download.reload + assert_equal 10, @log_download.processed_count + assert_equal "processed", @log_download.status + assert_equal 10, Download.count + summary_counts = Download::VersionsPerHour.all.each_with_object({}){|e,summary|summary[e.gem_name+"-"+e.gem_version] = e.downloads} + assert_equal @sample_log_counts, summary_counts + end + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index f77de05f3c0..9c4f5fc3775 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -31,6 +31,7 @@ require "helpers/password_helpers" require "helpers/webauthn_helpers" require "helpers/oauth_helpers" +require 'helpers/timescaledb_helpers' require "webmock/minitest" require "phlex/testing/rails/view_helper" @@ -79,6 +80,7 @@ class ActiveSupport::TestCase include GemHelpers include EmailHelpers include PasswordHelpers + include TimescaledbHelpers parallelize_setup do |_worker| SemanticLogger.reopen diff --git a/test/unit/sqs_worker_test.rb b/test/unit/sqs_worker_test.rb index 05f394ece57..aba121cd81e 100644 --- a/test/unit/sqs_worker_test.rb +++ b/test/unit/sqs_worker_test.rb @@ -55,8 +55,13 @@ class SqsWorkerTest < ActiveSupport::TestCase should "create Logticket" do StatsD.expects(:increment).with("fastly_log_processor.s3_entry_fetched") StatsD.expects(:increment).with("fastly_log_processor.enqueued") + StatsD.expects(:increment).with("fastly_log_downloads_processor.enqueued") if ENV['DOWNLOADS_DB_ENABLED'] == 'true' StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogDownloadsProcessorJob.name))) + end assert_enqueued_jobs 1, only: FastlyLogProcessorJob do @sqs_worker.perform(nil, @body) end @@ -66,6 +71,12 @@ class SqsWorkerTest < ActiveSupport::TestCase assert_equal "bucket-name", log_ticket.directory assert_equal "object-key", log_ticket.key assert_equal "pending", log_ticket.status + + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + log_download = LogDownload.last + assert_equal "bucket-name", log_download.directory + assert_equal "object-key", log_download.key + end end should "not create duplicate LogTicket" do @@ -76,8 +87,17 @@ class SqsWorkerTest < ActiveSupport::TestCase StatsD.expects(:increment).with("fastly_log_processor.enqueued").twice StatsD.expects(:increment).with("fastly_log_processor.duplicated") StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, - has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) - assert_enqueued_jobs 1, only: FastlyLogProcessorJob do + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) + jobs = [FastlyLogProcessorJob] + + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + StatsD.expects(:increment).with("fastly_log_downloads_processor.enqueued").once + StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogDownloadsProcessorJob.name))) + jobs << FastlyLogDownloadsProcessorJob + end + + assert_enqueued_jobs jobs.size, only: jobs do @sqs_worker.perform(nil, @body) end end