From 0c38acd66ec4053ffb0a90608c5bae3fe837e5d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 14:08:01 -0300 Subject: [PATCH 01/14] Setup model tests --- app/jobs/fastly_log_downloads_processor.rb | 85 ++++++++++ .../fastly_log_downloads_processor_job.rb | 21 +++ app/models/download.rb | 90 +++++++++++ app/models/download_record.rb | 6 + .../20240708184547_create_downloads.rb | 64 ++++++++ db/downloads_schema.rb | 110 ++++++++++++- lib/shoryuken/sqs_worker.rb | 4 + test/factories/downloads.rb | 8 + test/helpers/timescaledb_helpers.rb | 11 ++ ...fastly_log_downloads_processor_job_test.rb | 151 ++++++++++++++++++ test/models/download_test.rb | 138 ++++++++++++++++ test/test_helper.rb | 2 + 12 files changed, 689 insertions(+), 1 deletion(-) create mode 100644 app/jobs/fastly_log_downloads_processor.rb create mode 100644 app/jobs/fastly_log_downloads_processor_job.rb create mode 100644 app/models/download.rb create mode 100644 app/models/download_record.rb create mode 100644 db/downloads_migrate/20240708184547_create_downloads.rb create mode 100644 test/factories/downloads.rb create mode 100644 test/helpers/timescaledb_helpers.rb create mode 100644 test/jobs/fastly_log_downloads_processor_job_test.rb create mode 100644 test/models/download_test.rb diff --git a/app/jobs/fastly_log_downloads_processor.rb b/app/jobs/fastly_log_downloads_processor.rb new file mode 100644 index 00000000000..ef69f3d095d --- /dev/null +++ b/app/jobs/fastly_log_downloads_processor.rb @@ -0,0 +1,85 @@ +require "zlib" + +# Process log files downloaded from Fastly and insert row by row into the database. +# It works in a similar way to FastlyLogProcessor, but it's optimized for a different +# use case: it processes log files downloaded from Fastly and inserts the raw data into +# the database in batches. +# The counters and other metrics are calculated in a separate job directly in +# the database through the continuous aggregates. +# Check Download::PerMinute, Download::PerHour and other classes as an example. +class FastlyLogDownloadsProcessor + class LogFileNotFoundError < ::StandardError; end + + extend StatsD::Instrument + + BATCH_SIZE = 5000 + + attr_accessor :bucket, :key + + def initialize(bucket, key) + @bucket = bucket + @key = key + @processed_count = 0 + @batch = [] + end + + def perform + StatsD.increment("fastly_log_downloads_processor.started") + raise LogFileNotFoundError if body.nil? + + downloads = parse_success_downloads + downloads.each_slice(BATCH_SIZE) do |batch| + Download.insert_all batch + end + + StatsD.gauge("fastly_log_downloads_processor.processed_count", downloads.size) + end + + def body + @body ||= LogTicket.find_by(key: key, directory: bucket)&.body + end + + def parse_success_downloads + body.each_line.map do |log_line| + fragments = log_line.split + path, response_code = fragments[10, 2] + case response_code.to_i + # Only count successful downloads + # NB: we consider a 304 response a download attempt + when 200, 304 + m = path.match(PATH_PATTERN) + gem_name = m[:gem_name] || path + gem_version = m[:gem_version] + ts = Time.parse fragments[4..9].join(' ') + env = parse_env fragments[12..-1] + payload = {env:} + + {ts:, gem_name:, gem_version:, payload:} + end + end.compact + end + + + # Parse the env into a hash of key value pairs + # example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0" + # output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"} + # case it says single word like jruby it appends true as the value + # example env = "jruby" + # output = {jruby: "true"} + # also removes some unwanted characters + def parse_env(output) + env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip + env = nil if env == "(null)" + env = env.split(' ').map do |info| + pair = info.split(/\/|-/,2) + pair << "true" if pair.size == 1 + pair + end.to_h + end + + statsd_count_success :perform, "fastly_log_downloads_processor.perform" + statsd_measure :perform, "fastly_log_downloads_processor.job_performance" + + PATH_PATTERN = /\/gems\/(?.*)-(?\d+.*)\.gem/ + private_constant :PATH_PATTERN +end diff --git a/app/jobs/fastly_log_downloads_processor_job.rb b/app/jobs/fastly_log_downloads_processor_job.rb new file mode 100644 index 00000000000..9f0b74031e9 --- /dev/null +++ b/app/jobs/fastly_log_downloads_processor_job.rb @@ -0,0 +1,21 @@ +# Same as the FastlyLogProcessorJob but for saving it to TimescaleDB +# and the Download table as flat downloads. +class FastlyLogDownloadsProcessorJob < ApplicationJob + queue_as :default + queue_with_priority PRIORITIES.fetch(:stats) + + include GoodJob::ActiveJobExtensions::Concurrency + good_job_control_concurrency_with( + # Maximum number of jobs with the concurrency key to be + # concurrently performed (excludes enqueued jobs) + # + # Limited to avoid overloading the gem_download table with + # too many concurrent conflicting updates + perform_limit: good_job_concurrency_perform_limit(default: 5), + key: name + ) + + def perform(bucket:, key:) + FastlyLogDownloadsProcessor.new(bucket, key).perform + end +end diff --git a/app/models/download.rb b/app/models/download.rb new file mode 100644 index 00000000000..51390afa568 --- /dev/null +++ b/app/models/download.rb @@ -0,0 +1,90 @@ +require 'timescaledb' +class Download < DownloadRecord + + acts_as_hypertable time_column: 'created_at' + + scope :time_bucket, -> (range='1m', query="count(*)") do + time_column = self.hypertable_options[:time_column] + select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") + end + + scope :per_minute, -> (query="count(*) as downloads") do + time_bucket('1m', query).group(1).order(1) + end + + scope :gems_per_minute, -> do + per_minute("gem_name, count(*) as downloads").group(1,2) + end + + scope :versions_per_minute, -> do + per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3) + end + + cagg = -> (view_name) do + Class.new(DownloadRecord) do + self.table_name = "downloads_#{view_name}" + + scope :sum_downloads, -> { select("sum(downloads)::bigint as downloads") } + scope :avg_downloads, -> { select("avg(downloads)::bigint as avg_downloads") } + + scope :rollup, -> (range='1d', query=:sum_downloads) do + time_column = Download.hypertable_options[:time_column] + if query.is_a?(Symbol) + select("time_bucket('#{range}', #{time_column}) as #{time_column}") + .public_send(query) + .group(1) + else + select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") + .group(1) + end + end + + scope :per_hour, -> (query=:sum_downloads) do + rollup('1h', query) + end + + scope :per_day, -> (query=:sum_downloads) do + rollup('1d', query) + end + + scope :per_week, -> (query=:sum_downloads) do + rollup('1w', query) + end + + scope :per_month, -> (query=:sum_downloads) do + rollup('1mon', query) + end + + scope :per_year, -> (query=:sum_downloads) do + rollup('1y', query) + end + + def readonly? + true + end + + def self.refresh! + connection_pool.with_connection do |conn| + # Fixme: This is a workaround to guarantee we're in a fresh connection + conn.reset! if conn.transaction_open? + conn.raw_connection.exec("CALL refresh_continuous_aggregate('#{table_name}', NULL, NULL)") + end + end + end + end + + MaterializedViews = [ + PerMinute = cagg['per_minute'], + PerHour = cagg['per_hour'], + PerDay = cagg['per_day'], + PerMonth = cagg['per_month'], + GemsPerMinute = cagg['gems_per_minute'], + GemsPerHour = cagg['gems_per_hour'], + GemsPerDay = cagg['gems_per_day'], + GemsPerMonth = cagg['gems_per_month'], + VersionsPerMinute = cagg['versions_per_minute'], + VersionsPerHour = cagg['versions_per_hour'], + VersionsPerDay = cagg['versions_per_day'], + VersionsPerMonth = cagg['versions_per_month'] + ] +end diff --git a/app/models/download_record.rb b/app/models/download_record.rb new file mode 100644 index 00000000000..7965d4722b0 --- /dev/null +++ b/app/models/download_record.rb @@ -0,0 +1,6 @@ +# This model is used to connect to the downloads database +class DownloadRecord < ApplicationRecord + self.abstract_class = true + extend Timescaledb::ActsAsHypertable + connects_to database: { writing: :downloads, reading: :downloads } +end diff --git a/db/downloads_migrate/20240708184547_create_downloads.rb b/db/downloads_migrate/20240708184547_create_downloads.rb new file mode 100644 index 00000000000..bf52d58712d --- /dev/null +++ b/db/downloads_migrate/20240708184547_create_downloads.rb @@ -0,0 +1,64 @@ +class CreateDownloads < ActiveRecord::Migration[7.1] + + disable_ddl_transaction! + + def self.up + self.down if Download.table_exists? + + hypertable_options = { + time_column: 'created_at', + chunk_time_interval: '1 day', + compress_segmentby: 'gem_name, gem_version', + compress_orderby: 'created_at DESC', + compression_interval: '7 days' + } + + create_table(:downloads, id: false, hypertable: hypertable_options) do |t| + t.timestamptz :created_at, null: false + t.text :gem_name, :gem_version, null: false + t.jsonb :payload + end + + { + per_minute: Download.per_minute, + per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1), + per_day: Download::PerHour.per_day(:sum_downloads).group(1), + per_month: Download::PerDay.per_month(:sum_downloads).group(1), + + gems_per_minute: Download.gems_per_minute, + gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads) as downloads").group(1,2), + gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads) as downloads").group(1,2), + gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads) as downloads").group(1,2), + + versions_per_minute: Download.versions_per_minute, + versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3), + versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3), + versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3) + }.each do |name, scope| + puts "Creating continuous aggregate #{name}", scope.to_sql + frame = name.to_s.split('per_').last + create_continuous_aggregate( + "downloads_#{name}", + scope.to_sql, + refresh_policies: { + schedule_interval: "INTERVAL '1 #{frame}'", + start_offset: "INTERVAL '3 #{frame}'", + end_offset: "INTERVAL '1 minute'" + }) + end + end + def self.down + %w[month day hour minute].each do |frame| + ["downloads_per_#{frame}", + "downloads_gems_per_#{frame}", + "downloads_versions_per_#{frame}", + ].each do |view| + safety_assured do + execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade") + end + end + end + + drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists? + end +end diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index c390619acd2..c351e17f371 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -10,9 +10,117 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 0) do +ActiveRecord::Schema[7.1].define(version: 2024_07_08_184547) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" enable_extension "timescaledb" + create_table "downloads", id: false, force: :cascade do |t| + t.timestamptz "created_at", null: false + t.text "gem_name", null: false + t.text "gem_version", null: false + t.jsonb "payload" + t.index ["created_at"], name: "downloads_created_at_idx", order: :desc + end + + create_hypertable "downloads", time_column: "created_at", chunk_time_interval: "1 day", compress_segmentby: "gem_name, gem_version", compress_orderby: "created_at DESC", compression_interval: "P7D" + create_continuous_aggregate("downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)) + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + (sum(downloads))::bigint AS downloads + FROM downloads_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_gems_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + gem_name, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_gems_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + gem_name, + sum(downloads) AS downloads + FROM downloads_gems_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_gems_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + gem_name, + sum(downloads) AS downloads + FROM downloads_gems_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_gems_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + gem_name, + sum(downloads) AS downloads + FROM downloads_gems_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name + SQL + + create_continuous_aggregate("downloads_versions_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1M'::interval, created_at) AS created_at, + gem_name, + gem_version, + count(*) AS downloads + FROM downloads + GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name, gem_version + ORDER BY (time_bucket('PT1M'::interval, created_at)) + SQL + + create_continuous_aggregate("downloads_versions_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + SELECT time_bucket('PT1H'::interval, created_at) AS created_at, + gem_name, + gem_version, + sum(downloads) AS downloads + FROM downloads_versions_per_minute + GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name, gem_version + SQL + + create_continuous_aggregate("downloads_versions_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1D'::interval, created_at) AS created_at, + gem_name, + gem_version, + sum(downloads) AS downloads + FROM downloads_versions_per_hour + GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name, gem_version + SQL + + create_continuous_aggregate("downloads_versions_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + SELECT time_bucket('P1M'::interval, created_at) AS created_at, + gem_name, + gem_version, + sum(downloads) AS downloads + FROM downloads_versions_per_day + GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name, gem_version + SQL + end diff --git a/lib/shoryuken/sqs_worker.rb b/lib/shoryuken/sqs_worker.rb index e12dbc834dc..6cc269d42cc 100644 --- a/lib/shoryuken/sqs_worker.rb +++ b/lib/shoryuken/sqs_worker.rb @@ -24,6 +24,10 @@ def perform(_sqs_msg, body) StatsD.increment("fastly_log_processor.duplicated") else FastlyLogProcessorJob.perform_later(bucket:, key:) + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' && Download.table_exists? + StatsD.increment("fastly_log_downloads_processor.enqueued") + FastlyLogDownloadsProcessorJob.perform_later(bucket:, key:) + end end end end diff --git a/test/factories/downloads.rb b/test/factories/downloads.rb new file mode 100644 index 00000000000..a47ccb8e5b0 --- /dev/null +++ b/test/factories/downloads.rb @@ -0,0 +1,8 @@ +FactoryBot.define do + factory :download do + gem_name { "example" } + gem_version { "0.0.1" } + payload { { env: { bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0" } } } + created_at { Time.now } + end +end diff --git a/test/helpers/timescaledb_helpers.rb b/test/helpers/timescaledb_helpers.rb new file mode 100644 index 00000000000..16719a80aac --- /dev/null +++ b/test/helpers/timescaledb_helpers.rb @@ -0,0 +1,11 @@ +module TimescaledbHelpers + extend ActiveSupport::Concern + + included do + def refresh_all_caggs! + Download::MaterializedViews.each do |cagg| + cagg.refresh! + end + end + end +end diff --git a/test/jobs/fastly_log_downloads_processor_job_test.rb b/test/jobs/fastly_log_downloads_processor_job_test.rb new file mode 100644 index 00000000000..baa143ad165 --- /dev/null +++ b/test/jobs/fastly_log_downloads_processor_job_test.rb @@ -0,0 +1,151 @@ +require "test_helper" + +class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase + include SearchKickHelper + + setup do + @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read + + @sample_log_counts = { + "bundler-1.10.6" => 2, + "json-1.8.3-java" => 2, + "json-1.8.3" => 1, + "json-1.8.2" => 4, + "no-such-gem-1.2.3" => 1 + } + @log_ticket = LogTicket.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") + + Aws.config[:s3] = { + stub_responses: { get_object: { body: @sample_log } } + } + @processor = FastlyLogDownloadsProcessor.new("test-bucket", "fastly-fake.log") + @job = FastlyLogDownloadsProcessorJob.new(bucket: "test-bucket", key: "fastly-fake.log") + refresh_all_caggs! + end + + teardown do + # Remove stubbed response + Aws.config.delete(:s3) + end + + context "#download_counts" do + should "process file from s3" do + + require "pry";binding.pry + assert_equal @sample_log_counts, @processor.download_counts(@log_ticket) + end + + should "process file from local fs" do + @log_ticket.update(backend: "local", directory: "test/sample_logs") + + assert_equal @sample_log_counts, @processor.download_counts(@log_ticket) + end + + should "fail if dont find the file" do + @log_ticket.update(backend: "local", directory: "foobar") + assert_raises FastlyLogDownloadsProcessor::LogFileNotFoundError do + @processor.download_counts(@log_ticket) + end + end + end + + context "with gem data" do + setup do + # Create some gems to match the values in the sample log + bundler = create(:rubygem, name: "bundler") + json = create(:rubygem, name: "json") + + create(:version, rubygem: bundler, number: "1.10.6") + create(:version, rubygem: json, number: "1.8.3", platform: "java") + create(:version, rubygem: json, number: "1.8.3") + create(:version, rubygem: json, number: "1.8.2") + + import_and_refresh + end + + context "#perform" do + should "not double count" do + json = Rubygem.find_by_name("json") + + assert_equal 0, GemDownload.count_for_rubygem(json.id) + 3.times { @job.perform_now } + + assert_equal 7, es_downloads(json.id) + assert_equal 7, GemDownload.count_for_rubygem(json.id) + end + + should "update download counts" do + @job.perform_now + @sample_log_counts + .each do |name, expected_count| + version = Version.find_by(full_name: name) + next unless version + count = GemDownload.find_by(rubygem_id: version.rubygem.id, version_id: version.id).count + + assert_equal expected_count, count, "invalid value for #{name}" + end + + json = Rubygem.find_by_name("json") + + assert_equal 7, GemDownload.count_for_rubygem(json.id) + assert_equal 7, es_downloads(json.id) + assert_equal "processed", @log_ticket.reload.status + end + + should "not run if already processed" do + json = Rubygem.find_by_name("json") + + assert_equal 0, json.downloads + assert_equal 0, es_downloads(json.id) + @log_ticket.update(status: "processed") + @job.perform_now + + assert_equal 0, es_downloads(json.id) + assert_equal 0, json.downloads + end + + should "not mark as processed if anything fails" do + @processor.class.any_instance.stubs(:download_counts).raises("woops") + + assert_kind_of RuntimeError, @job.perform_now + + refute_equal "processed", @log_ticket.reload.status + assert_equal "failed", @log_ticket.reload.status + end + + should "not re-process if it failed" do + @processor.class.any_instance.stubs(:download_counts).raises("woops") + + assert_kind_of RuntimeError, @job.perform_now + + @job.perform_now + json = Rubygem.find_by_name("json") + + assert_equal 0, json.downloads + assert_equal 0, es_downloads(json.id) + end + + should "only process the right file" do + ticket = LogTicket.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.2.log", status: "pending") + + @job.perform_now + + assert_equal "pending", ticket.reload.status + assert_equal "processed", @log_ticket.reload.status + end + + should "update the processed count" do + @job.perform_now + + assert_equal 10, @log_ticket.reload.processed_count + end + + should "update the total gem count" do + assert_equal 0, GemDownload.total_count + @job.perform_now + + assert_equal 9, GemDownload.total_count + end + end + end +end diff --git a/test/models/download_test.rb b/test/models/download_test.rb new file mode 100644 index 00000000000..f639a4731dd --- /dev/null +++ b/test/models/download_test.rb @@ -0,0 +1,138 @@ +require "test_helper" + +class DownloadTest < ActiveSupport::TestCase + # This will run once before all tests in this class + setup do + # Truncate the table because we are using a transaction + Download.connection.execute("truncate table downloads") + end + + teardown do + # Truncate the table because we are using a transaction + Download.connection.execute("truncate table downloads") + end + + def refresh_all! + # We need to commit the transaction to make sure the continuous aggregates are refreshed with latest data + Download.connection.commit_db_transaction + Download::MaterializedViews.each do |view| + view.refresh! + end + end + + context ".per_minute" do + should "return downloads per minute" do + + assert_equal [], Download.per_minute + + create(:download, created_at: 2.minutes.ago) + create(:download, created_at: 1.minute.ago) + create(:download, created_at: 1.minute.ago) + + assert_equal [1, 2], Download.per_minute.map(&:downloads) + end + end + + context ".gems_per_minute" do + should "return gems downloads per minute" do + create(:download, gem_name: "example", created_at: 2.minutes.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example2", created_at: 1.minute.ago) + + assert_equal [1, 2], Download.gems_per_minute.where(gem_name: "example").map(&:downloads) + assert_equal [1], Download.gems_per_minute.where(gem_name: "example2").map(&:downloads) + end + end + + context ".versions_per_minute" do + should "return versions downloads per minute" do + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 2.minutes.ago) + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) + + assert_equal [1, 1], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) + assert_equal [2], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) + assert_equal [1], Download.versions_per_minute.where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) + end + end + + context "Continuous Aggregate" do + should "materialized views by minute, hour, day and month" do + assert Download::PerMinute.table_exists? + assert Download::PerHour.table_exists? + assert Download::PerDay.table_exists? + assert Download::PerMonth.table_exists? + + assert Download::GemsPerMinute.table_exists? + assert Download::GemsPerHour.table_exists? + assert Download::GemsPerDay.table_exists? + assert Download::GemsPerMonth.table_exists? + + assert Download::VersionsPerMinute.table_exists? + assert Download::VersionsPerHour.table_exists? + assert Download::VersionsPerDay.table_exists? + assert Download::VersionsPerMonth.table_exists? + + assert Download::MaterializedViews == [ + Download::PerMinute, + Download::PerHour, + Download::PerDay, + Download::PerMonth, + Download::GemsPerMinute, + Download::GemsPerHour, + Download::GemsPerDay, + Download::GemsPerMonth, + Download::VersionsPerMinute, + Download::VersionsPerHour, + Download::VersionsPerDay, + Download::VersionsPerMonth + ] + end + + should "refresh materialized views" do + + create(:download, created_at: 2.minutes.ago) + create(:download, created_at: 1.minute.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) + create(:download, gem_name: "example2", gem_version: "0.0.2", created_at: 1.minute.ago) + + assert Download.count == 7 + + refresh_all! + + assert Download::PerMinute.count == 2 + assert Download::PerMinute.all.map{_1.attributes["downloads"]} == [1, 6] + + previous = Download::PerMinute + %w[PerHour PerDay PerMonth].each do |view| + cagg = "Download::#{view}".constantize + scope = previous.send(view.underscore) + assert cagg.count == 1, "Expected Download::#{view}.count to be 1, got #{cagg.count}" + assert cagg.all.map(&:attributes) == scope.map(&:attributes) + previous = cagg + end + + downloads_per_minute = -> (gem_name) { Download::GemsPerMinute.where('gem_name': gem_name).pluck('downloads')} + assert downloads_per_minute.("example") == [1, 4] + assert downloads_per_minute.("example2") == [2] + + downloads_per_hour = -> (gem_name) { Download::GemsPerHour.where('gem_name': gem_name).pluck('downloads')} + assert downloads_per_hour.("example") == [5] + assert downloads_per_hour.("example2") == [2] + + downloads_per_day = -> (gem_name) { Download::GemsPerDay.where('gem_name': gem_name).pluck('downloads')} + assert downloads_per_day.("example") == [5] + assert downloads_per_day.("example2") == [2] + + downloads_per_month = -> (gem_name) { Download::GemsPerMonth.where('gem_name': gem_name).pluck('downloads')} + assert downloads_per_month.("example") == [5] + assert downloads_per_month.("example2") == [2] + end + end +end \ No newline at end of file diff --git a/test/test_helper.rb b/test/test_helper.rb index f77de05f3c0..9c4f5fc3775 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -31,6 +31,7 @@ require "helpers/password_helpers" require "helpers/webauthn_helpers" require "helpers/oauth_helpers" +require 'helpers/timescaledb_helpers' require "webmock/minitest" require "phlex/testing/rails/view_helper" @@ -79,6 +80,7 @@ class ActiveSupport::TestCase include GemHelpers include EmailHelpers include PasswordHelpers + include TimescaledbHelpers parallelize_setup do |_worker| SemanticLogger.reopen From 7335a926457d12787b44db4692d54b3b59916e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 14:50:58 -0300 Subject: [PATCH 02/14] Fix hierarchical values to use integer --- .../20240708184547_create_downloads.rb | 12 +- db/downloads_schema.rb | 12 +- ...fastly_log_downloads_processor_job_test.rb | 124 ++---------------- 3 files changed, 24 insertions(+), 124 deletions(-) diff --git a/db/downloads_migrate/20240708184547_create_downloads.rb b/db/downloads_migrate/20240708184547_create_downloads.rb index bf52d58712d..618ddf76562 100644 --- a/db/downloads_migrate/20240708184547_create_downloads.rb +++ b/db/downloads_migrate/20240708184547_create_downloads.rb @@ -26,14 +26,14 @@ def self.up per_month: Download::PerDay.per_month(:sum_downloads).group(1), gems_per_minute: Download.gems_per_minute, - gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads) as downloads").group(1,2), - gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads) as downloads").group(1,2), - gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads) as downloads").group(1,2), + gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads)::bigint as downloads").group(1,2), + gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads)::bigint as downloads").group(1,2), + gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads)::bigint as downloads").group(1,2), versions_per_minute: Download.versions_per_minute, - versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3), - versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3), - versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, sum(downloads) as downloads").group(1,2,3) + versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3), + versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3), + versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3) }.each do |name, scope| puts "Creating continuous aggregate #{name}", scope.to_sql frame = name.to_s.split('per_').last diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index c351e17f371..e7fa6a899ff 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -65,7 +65,7 @@ create_continuous_aggregate("downloads_gems_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_gems_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name SQL @@ -73,7 +73,7 @@ create_continuous_aggregate("downloads_gems_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_gems_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name SQL @@ -81,7 +81,7 @@ create_continuous_aggregate("downloads_gems_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_gems_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name SQL @@ -100,7 +100,7 @@ SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, gem_version, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_versions_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name, gem_version SQL @@ -109,7 +109,7 @@ SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, gem_version, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_versions_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name, gem_version SQL @@ -118,7 +118,7 @@ SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, gem_version, - sum(downloads) AS downloads + (sum(downloads))::bigint AS downloads FROM downloads_versions_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name, gem_version SQL diff --git a/test/jobs/fastly_log_downloads_processor_job_test.rb b/test/jobs/fastly_log_downloads_processor_job_test.rb index baa143ad165..4b95fcba552 100644 --- a/test/jobs/fastly_log_downloads_processor_job_test.rb +++ b/test/jobs/fastly_log_downloads_processor_job_test.rb @@ -20,7 +20,6 @@ class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase } @processor = FastlyLogDownloadsProcessor.new("test-bucket", "fastly-fake.log") @job = FastlyLogDownloadsProcessorJob.new(bucket: "test-bucket", key: "fastly-fake.log") - refresh_all_caggs! end teardown do @@ -28,124 +27,25 @@ class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase Aws.config.delete(:s3) end - context "#download_counts" do - should "process file from s3" do - - require "pry";binding.pry - assert_equal @sample_log_counts, @processor.download_counts(@log_ticket) - end - - should "process file from local fs" do - @log_ticket.update(backend: "local", directory: "test/sample_logs") + def perform_and_refresh + count = @processor.perform + refresh_all_caggs! + count + end - assert_equal @sample_log_counts, @processor.download_counts(@log_ticket) + context "#perform" do + should "process file" do + assert_equal 10, perform_and_refresh + summary_counts = Download::VersionsPerHour.all.each_with_object({}){|e,summary|summary[e.gem_name+"-"+e.gem_version] = e.downloads} + assert_equal @sample_log_counts, summary_counts end should "fail if dont find the file" do @log_ticket.update(backend: "local", directory: "foobar") assert_raises FastlyLogDownloadsProcessor::LogFileNotFoundError do - @processor.download_counts(@log_ticket) - end - end - end - - context "with gem data" do - setup do - # Create some gems to match the values in the sample log - bundler = create(:rubygem, name: "bundler") - json = create(:rubygem, name: "json") - - create(:version, rubygem: bundler, number: "1.10.6") - create(:version, rubygem: json, number: "1.8.3", platform: "java") - create(:version, rubygem: json, number: "1.8.3") - create(:version, rubygem: json, number: "1.8.2") - - import_and_refresh - end - - context "#perform" do - should "not double count" do - json = Rubygem.find_by_name("json") - - assert_equal 0, GemDownload.count_for_rubygem(json.id) - 3.times { @job.perform_now } - - assert_equal 7, es_downloads(json.id) - assert_equal 7, GemDownload.count_for_rubygem(json.id) - end - - should "update download counts" do - @job.perform_now - @sample_log_counts - .each do |name, expected_count| - version = Version.find_by(full_name: name) - next unless version - count = GemDownload.find_by(rubygem_id: version.rubygem.id, version_id: version.id).count - - assert_equal expected_count, count, "invalid value for #{name}" - end - - json = Rubygem.find_by_name("json") - - assert_equal 7, GemDownload.count_for_rubygem(json.id) - assert_equal 7, es_downloads(json.id) - assert_equal "processed", @log_ticket.reload.status - end - - should "not run if already processed" do - json = Rubygem.find_by_name("json") - - assert_equal 0, json.downloads - assert_equal 0, es_downloads(json.id) - @log_ticket.update(status: "processed") - @job.perform_now - - assert_equal 0, es_downloads(json.id) - assert_equal 0, json.downloads - end - - should "not mark as processed if anything fails" do - @processor.class.any_instance.stubs(:download_counts).raises("woops") - - assert_kind_of RuntimeError, @job.perform_now - - refute_equal "processed", @log_ticket.reload.status - assert_equal "failed", @log_ticket.reload.status - end - - should "not re-process if it failed" do - @processor.class.any_instance.stubs(:download_counts).raises("woops") - - assert_kind_of RuntimeError, @job.perform_now - - @job.perform_now - json = Rubygem.find_by_name("json") - - assert_equal 0, json.downloads - assert_equal 0, es_downloads(json.id) - end - - should "only process the right file" do - ticket = LogTicket.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.2.log", status: "pending") - - @job.perform_now - - assert_equal "pending", ticket.reload.status - assert_equal "processed", @log_ticket.reload.status - end - - should "update the processed count" do - @job.perform_now - - assert_equal 10, @log_ticket.reload.processed_count - end - - should "update the total gem count" do - assert_equal 0, GemDownload.total_count - @job.perform_now - - assert_equal 9, GemDownload.total_count + perform_and_refresh end end end end + From 5e23a5045923c333ca8559d939143e355f235e96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 14:52:59 -0300 Subject: [PATCH 03/14] Fix attribute name --- app/jobs/fastly_log_downloads_processor.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/jobs/fastly_log_downloads_processor.rb b/app/jobs/fastly_log_downloads_processor.rb index ef69f3d095d..2bbeda12a54 100644 --- a/app/jobs/fastly_log_downloads_processor.rb +++ b/app/jobs/fastly_log_downloads_processor.rb @@ -31,8 +31,10 @@ def perform downloads.each_slice(BATCH_SIZE) do |batch| Download.insert_all batch end + @processed_count = downloads.size - StatsD.gauge("fastly_log_downloads_processor.processed_count", downloads.size) + StatsD.gauge("fastly_log_downloads_processor.processed_count", @processed_count) + @processed_count end def body @@ -50,11 +52,11 @@ def parse_success_downloads m = path.match(PATH_PATTERN) gem_name = m[:gem_name] || path gem_version = m[:gem_version] - ts = Time.parse fragments[4..9].join(' ') + created_at = Time.parse fragments[4..9].join(' ') env = parse_env fragments[12..-1] payload = {env:} - {ts:, gem_name:, gem_version:, payload:} + {created_at:, gem_name:, gem_version:, payload:} end end.compact end From 19df3b9b0682f09c4012575db056653655697906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 18:29:20 -0300 Subject: [PATCH 04/14] Add LogDownload to mimic LogTicket on downloads db --- app/avo/resources/log_download_resource.rb | 18 ++++++ .../avo/log_downloads_controller.rb | 4 ++ app/jobs/fastly_log_downloads_processor.rb | 26 +++++--- app/models/downloads_record.rb | 5 ++ app/models/log_download.rb | 31 +++++++++ ...log_tickets_to_timescale_downloads_task.rb | 13 ++++ .../20240823181725_create_log_downloads.rb | 16 +++++ db/downloads_schema.rb | 13 +++- test/factories/log_downloads.rb | 8 +++ test/helpers/timescaledb_helpers.rb | 1 + ...fastly_log_downloads_processor_job_test.rb | 8 +-- test/models/log_download_test.rb | 63 +++++++++++++++++++ ...ickets_to_timescale_downloads_task_test.rb | 37 +++++++++++ 13 files changed, 231 insertions(+), 12 deletions(-) create mode 100644 app/avo/resources/log_download_resource.rb create mode 100644 app/controllers/avo/log_downloads_controller.rb create mode 100644 app/models/downloads_record.rb create mode 100644 app/models/log_download.rb create mode 100644 app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb create mode 100644 db/downloads_migrate/20240823181725_create_log_downloads.rb create mode 100644 test/factories/log_downloads.rb create mode 100644 test/models/log_download_test.rb create mode 100644 test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb diff --git a/app/avo/resources/log_download_resource.rb b/app/avo/resources/log_download_resource.rb new file mode 100644 index 00000000000..90241805684 --- /dev/null +++ b/app/avo/resources/log_download_resource.rb @@ -0,0 +1,18 @@ +class LogDownloadResource < Avo::BaseResource + self.title = :id + self.includes = [] + # self.search_query = -> do + # scope.ransack(id_eq: params[:q], m: "or").result(distinct: false) + # end + class BackendFilter < ScopeBooleanFilter; end + filter BackendFilter, arguments: {default: LogDownload.backends.transform_values { true } } + class StatusFilter < ScopeBooleanFilter; end + filter StatusFilter, arguments: {default: LogDownload.statuses.transform_values { true } } + field :id, as: :id + # Fields generated from the model + field :key, as: :text + field :directory, as: :text + field :backend, as: :select, enum: LogDownload.backends + field :status, as: :select, enum: LogDownload.statuses + # add fields here +end diff --git a/app/controllers/avo/log_downloads_controller.rb b/app/controllers/avo/log_downloads_controller.rb new file mode 100644 index 00000000000..a763031854f --- /dev/null +++ b/app/controllers/avo/log_downloads_controller.rb @@ -0,0 +1,4 @@ +# This controller has been generated to enable Rails' resource routes. +# More information on https://docs.avohq.io/2.0/controllers.html +class Avo::LogDownloadsController < Avo::ResourcesController +end diff --git a/app/jobs/fastly_log_downloads_processor.rb b/app/jobs/fastly_log_downloads_processor.rb index 2bbeda12a54..767e7babb8f 100644 --- a/app/jobs/fastly_log_downloads_processor.rb +++ b/app/jobs/fastly_log_downloads_processor.rb @@ -15,30 +15,42 @@ class LogFileNotFoundError < ::StandardError; end BATCH_SIZE = 5000 attr_accessor :bucket, :key + attr_reader :processed_count def initialize(bucket, key) @bucket = bucket @key = key @processed_count = 0 - @batch = [] end def perform StatsD.increment("fastly_log_downloads_processor.started") raise LogFileNotFoundError if body.nil? - downloads = parse_success_downloads - downloads.each_slice(BATCH_SIZE) do |batch| + count = 0 + parse_success_downloads.each_slice(BATCH_SIZE) do |batch| Download.insert_all batch + count += batch.size end - @processed_count = downloads.size - StatsD.gauge("fastly_log_downloads_processor.processed_count", @processed_count) - @processed_count + if count > 0 + element.update(status: "processed", processed_count: count) + else + element.update(status: "failed") + end + + # This value may diverge from numbers from the fastly_log_processor as totals are + # not aggregated with the number of downloads but each row represents a download. + StatsD.gauge("fastly_log_downloads_processor.processed_count", count) + count end def body - @body ||= LogTicket.find_by(key: key, directory: bucket)&.body + @body ||= element&.body + end + + def element + @element ||= LogDownload.pop(directory: @bucket, key: @key) end def parse_success_downloads diff --git a/app/models/downloads_record.rb b/app/models/downloads_record.rb new file mode 100644 index 00000000000..c63898013c4 --- /dev/null +++ b/app/models/downloads_record.rb @@ -0,0 +1,5 @@ +class DownloadsRecord < ApplicationRecord + self.abstract_class = true + + connects_to database: { writing: :downloads } +end diff --git a/app/models/log_download.rb b/app/models/log_download.rb new file mode 100644 index 00000000000..6316bd1de1f --- /dev/null +++ b/app/models/log_download.rb @@ -0,0 +1,31 @@ +# Mimic LogTicket model to store the log files but for the downloads database. +# It will be backfilled with the log files from the main database to the downloads database. +# There will be a background job to process the log files +class LogDownload < DownloadsRecord + enum backend: { s3: 0, local: 1 } + enum status: %i[pending processing failed processed].index_with(&:to_s) + + def self.pop(key: nil, directory: nil) + scope = pending.limit(1).lock(true).order("id ASC") + scope = scope.where(key: key) if key + scope = scope.where(directory: directory) if directory + scope.sole.tap do |ticket| + ticket.update_column(:status, "processing") + end + rescue ActiveRecord::RecordNotFound + nil # no ticket in queue found by `sole` call + end + + def fs + @fs ||= + if s3? + RubygemFs::S3.new(bucket: directory) + else + RubygemFs::Local.new(directory) + end + end + + def body + fs.get(key) + end +end diff --git a/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb new file mode 100644 index 00000000000..bd65083c767 --- /dev/null +++ b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Maintenance + class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task + def collection + LogDownload.where(status: "pending") + end + + def process(element) + FastlyLogDownloadsProcessor.new(element.directory, element.key).perform + end + end +end diff --git a/db/downloads_migrate/20240823181725_create_log_downloads.rb b/db/downloads_migrate/20240823181725_create_log_downloads.rb new file mode 100644 index 00000000000..9f536945c03 --- /dev/null +++ b/db/downloads_migrate/20240823181725_create_log_downloads.rb @@ -0,0 +1,16 @@ +# Mimic LogTicket table to store the log files but for the downloads database +# It will be used to store the log files to be processed during the migration +class CreateLogDownloads < ActiveRecord::Migration[7.1] + def change + create_table :log_downloads do |t| + t.string :key + t.string :directory + t.integer :backend + t.string :status, default: "pending" + t.integer :processed_count, default: 0 + t.timestamps + end + + add_index :log_downloads, [:key, :directory], unique: true + end +end \ No newline at end of file diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index e7fa6a899ff..d1613ada846 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_07_08_184547) do +ActiveRecord::Schema[7.1].define(version: 2024_08_23_181725) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" enable_extension "timescaledb" @@ -23,6 +23,17 @@ t.index ["created_at"], name: "downloads_created_at_idx", order: :desc end + create_table "log_downloads", force: :cascade do |t| + t.string "key" + t.string "directory" + t.integer "backend" + t.string "status", default: "pending" + t.integer "processed_count", default: 0 + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key", "directory"], name: "index_log_downloads_on_key_and_directory", unique: true + end + create_hypertable "downloads", time_column: "created_at", chunk_time_interval: "1 day", compress_segmentby: "gem_name, gem_version", compress_orderby: "created_at DESC", compression_interval: "P7D" create_continuous_aggregate("downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, diff --git a/test/factories/log_downloads.rb b/test/factories/log_downloads.rb new file mode 100644 index 00000000000..0d010c9b75a --- /dev/null +++ b/test/factories/log_downloads.rb @@ -0,0 +1,8 @@ +FactoryBot.define do + factory :log_download do + sequence(:key) { "key-#{_1}" } + sequence(:directory) { "directory-#{_1}" } + status { :pending } + backend { 0 } + end +end diff --git a/test/helpers/timescaledb_helpers.rb b/test/helpers/timescaledb_helpers.rb index 16719a80aac..87402ccdeb1 100644 --- a/test/helpers/timescaledb_helpers.rb +++ b/test/helpers/timescaledb_helpers.rb @@ -3,6 +3,7 @@ module TimescaledbHelpers included do def refresh_all_caggs! + Download.connection.commit_db_transaction Download::MaterializedViews.each do |cagg| cagg.refresh! end diff --git a/test/jobs/fastly_log_downloads_processor_job_test.rb b/test/jobs/fastly_log_downloads_processor_job_test.rb index 4b95fcba552..7875cd38011 100644 --- a/test/jobs/fastly_log_downloads_processor_job_test.rb +++ b/test/jobs/fastly_log_downloads_processor_job_test.rb @@ -1,8 +1,6 @@ require "test_helper" class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase - include SearchKickHelper - setup do @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read @@ -13,13 +11,15 @@ class FastlyLogDownloadsProcessorJobTest < ActiveJob::TestCase "json-1.8.2" => 4, "no-such-gem-1.2.3" => 1 } - @log_ticket = LogTicket.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") + + @log_download = LogDownload.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") Aws.config[:s3] = { stub_responses: { get_object: { body: @sample_log } } } @processor = FastlyLogDownloadsProcessor.new("test-bucket", "fastly-fake.log") @job = FastlyLogDownloadsProcessorJob.new(bucket: "test-bucket", key: "fastly-fake.log") + Download.connection.execute("truncate table downloads") end teardown do @@ -41,7 +41,7 @@ def perform_and_refresh end should "fail if dont find the file" do - @log_ticket.update(backend: "local", directory: "foobar") + @log_download.update(backend: "local", directory: "foobar") assert_raises FastlyLogDownloadsProcessor::LogFileNotFoundError do perform_and_refresh end diff --git a/test/models/log_download_test.rb b/test/models/log_download_test.rb new file mode 100644 index 00000000000..a389a1e60d3 --- /dev/null +++ b/test/models/log_download_test.rb @@ -0,0 +1,63 @@ +require "test_helper" + +class LogDownloadTest < ActiveSupport::TestCase + setup do + @log_download = LogDownload.create!(directory: "test", key: "fake.log") + end + + should "not allow duplicate directory and key" do + assert_raises(ActiveRecord::RecordNotUnique) do + LogDownload.create!(directory: "test", key: "fake.log") + end + end + + should "allow diffent keys in same directory" do + LogDownload.create!(directory: "test", key: "fake2.log") + end + + context "#pop" do + setup do + @log_download = LogDownload.create!(directory: "test/2", key: "bar", status: "pending") + end + + context "without any keys" do + setup do + LogDownload.create!(directory: "test", key: "fake3.log") + end + + should "return the first download" do + assert_equal "fake.log", LogDownload.pop.key + end + end + + context "with a key" do + setup do + LogDownload.create!(directory: "test", key: "fake4.log") + end + + should "return the first download" do + assert_equal "fake4.log", LogDownload.pop(key: "fake4.log").key + end + end + + should "change the status" do + one = LogDownload.pop + assert_equal "processing", one.status + end + + should "return nil in case no download is available" do + 2.times { LogDownload.pop } + assert_nil LogDownload.pop + end + + context "with a directory" do + setup do + LogDownload.create!(directory: "test/dir", key: "fake5.log") + end + + should "return the first download" do + assert_equal "fake5.log", LogDownload.pop(directory: "test/dir").key + end + end + end +end diff --git a/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb b/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb new file mode 100644 index 00000000000..d5b7c5dba9a --- /dev/null +++ b/test/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task_test.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +require "test_helper" + +module Maintenance + class BackfillLogTicketsToTimescaleDownloadsTaskTest < ActiveSupport::TestCase + setup do + @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read + + @sample_log_counts = { + "bundler-1.10.6" => 2, + "json-1.8.3-java" => 2, + "json-1.8.3" => 1, + "json-1.8.2" => 4, + "no-such-gem-1.2.3" => 1 + } + + @log_download = LogDownload.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") + + Aws.config[:s3] = { + stub_responses: { get_object: { body: @sample_log } } + } + Download.connection.execute("truncate table downloads") + end + + test "process" do + Maintenance::BackfillLogTicketsToTimescaleDownloadsTask.process(@log_download) + refresh_all_caggs! + @log_download.reload + assert_equal 10, @log_download.processed_count + assert_equal "processed", @log_download.status + assert_equal 10, Download.count + summary_counts = Download::VersionsPerHour.all.each_with_object({}){|e,summary|summary[e.gem_name+"-"+e.gem_version] = e.downloads} + assert_equal @sample_log_counts, summary_counts + end + end +end From 189c9a0fd4286c85d5dbb0efb41b985057e73dde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 19:43:42 -0300 Subject: [PATCH 05/14] Add LogDownloads to sqs in case it's enabled --- app/avo/resources/log_download_resource.rb | 7 ++----- lib/shoryuken/sqs_worker.rb | 3 ++- test/unit/sqs_worker_test.rb | 24 ++++++++++++++++++++-- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/app/avo/resources/log_download_resource.rb b/app/avo/resources/log_download_resource.rb index 90241805684..b3613efc4d0 100644 --- a/app/avo/resources/log_download_resource.rb +++ b/app/avo/resources/log_download_resource.rb @@ -1,18 +1,15 @@ class LogDownloadResource < Avo::BaseResource self.title = :id self.includes = [] - # self.search_query = -> do - # scope.ransack(id_eq: params[:q], m: "or").result(distinct: false) - # end + class BackendFilter < ScopeBooleanFilter; end filter BackendFilter, arguments: {default: LogDownload.backends.transform_values { true } } class StatusFilter < ScopeBooleanFilter; end filter StatusFilter, arguments: {default: LogDownload.statuses.transform_values { true } } + field :id, as: :id - # Fields generated from the model field :key, as: :text field :directory, as: :text field :backend, as: :select, enum: LogDownload.backends field :status, as: :select, enum: LogDownload.statuses - # add fields here end diff --git a/lib/shoryuken/sqs_worker.rb b/lib/shoryuken/sqs_worker.rb index 6cc269d42cc..996eec6c732 100644 --- a/lib/shoryuken/sqs_worker.rb +++ b/lib/shoryuken/sqs_worker.rb @@ -24,8 +24,9 @@ def perform(_sqs_msg, body) StatsD.increment("fastly_log_processor.duplicated") else FastlyLogProcessorJob.perform_later(bucket:, key:) - if ENV['DOWNLOADS_DB_ENABLED'] == 'true' && Download.table_exists? + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' StatsD.increment("fastly_log_downloads_processor.enqueued") + LogDownload.create!(backend: "s3", key: key, directory: bucket) FastlyLogDownloadsProcessorJob.perform_later(bucket:, key:) end end diff --git a/test/unit/sqs_worker_test.rb b/test/unit/sqs_worker_test.rb index 05f394ece57..aba121cd81e 100644 --- a/test/unit/sqs_worker_test.rb +++ b/test/unit/sqs_worker_test.rb @@ -55,8 +55,13 @@ class SqsWorkerTest < ActiveSupport::TestCase should "create Logticket" do StatsD.expects(:increment).with("fastly_log_processor.s3_entry_fetched") StatsD.expects(:increment).with("fastly_log_processor.enqueued") + StatsD.expects(:increment).with("fastly_log_downloads_processor.enqueued") if ENV['DOWNLOADS_DB_ENABLED'] == 'true' StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogDownloadsProcessorJob.name))) + end assert_enqueued_jobs 1, only: FastlyLogProcessorJob do @sqs_worker.perform(nil, @body) end @@ -66,6 +71,12 @@ class SqsWorkerTest < ActiveSupport::TestCase assert_equal "bucket-name", log_ticket.directory assert_equal "object-key", log_ticket.key assert_equal "pending", log_ticket.status + + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + log_download = LogDownload.last + assert_equal "bucket-name", log_download.directory + assert_equal "object-key", log_download.key + end end should "not create duplicate LogTicket" do @@ -76,8 +87,17 @@ class SqsWorkerTest < ActiveSupport::TestCase StatsD.expects(:increment).with("fastly_log_processor.enqueued").twice StatsD.expects(:increment).with("fastly_log_processor.duplicated") StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, - has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) - assert_enqueued_jobs 1, only: FastlyLogProcessorJob do + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogProcessorJob.name))) + jobs = [FastlyLogProcessorJob] + + if ENV['DOWNLOADS_DB_ENABLED'] == 'true' + StatsD.expects(:increment).with("fastly_log_downloads_processor.enqueued").once + StatsD.expects(:increment).with("rails.enqueue.active_job.success", 1, + has_entry(tags: has_entries(queue: "default", priority: 4, job_class: FastlyLogDownloadsProcessorJob.name))) + jobs << FastlyLogDownloadsProcessorJob + end + + assert_enqueued_jobs jobs.size, only: jobs do @sqs_worker.perform(nil, @body) end end From 983111522751908b30277dfd7a7c933486601a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 20:01:38 -0300 Subject: [PATCH 06/14] Lock as final step --- app/models/log_download.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/log_download.rb b/app/models/log_download.rb index 6316bd1de1f..c763fb2290c 100644 --- a/app/models/log_download.rb +++ b/app/models/log_download.rb @@ -6,11 +6,11 @@ class LogDownload < DownloadsRecord enum status: %i[pending processing failed processed].index_with(&:to_s) def self.pop(key: nil, directory: nil) - scope = pending.limit(1).lock(true).order("id ASC") + scope = pending.limit(1).order("id ASC") scope = scope.where(key: key) if key scope = scope.where(directory: directory) if directory - scope.sole.tap do |ticket| - ticket.update_column(:status, "processing") + scope.lock(true).sole.tap do |log_download| + log_download.update_column(:status, "processing") end rescue ActiveRecord::RecordNotFound nil # no ticket in queue found by `sole` call From 8d94fab3f3e3bece290d2b85c79045bd2e5b7f5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 23 Aug 2024 20:12:14 -0300 Subject: [PATCH 07/14] Fix model names --- app/models/download.rb | 2 +- app/models/download_record.rb | 6 ------ app/models/{downloads_record.rb => downloads_db.rb} | 2 +- app/models/log_download.rb | 4 ++-- 4 files changed, 4 insertions(+), 10 deletions(-) delete mode 100644 app/models/download_record.rb rename app/models/{downloads_record.rb => downloads_db.rb} (66%) diff --git a/app/models/download.rb b/app/models/download.rb index 51390afa568..af4a5c0af3a 100644 --- a/app/models/download.rb +++ b/app/models/download.rb @@ -1,5 +1,5 @@ -require 'timescaledb' class Download < DownloadRecord + extend Timescaledb::ActsAsHypertable acts_as_hypertable time_column: 'created_at' diff --git a/app/models/download_record.rb b/app/models/download_record.rb deleted file mode 100644 index 7965d4722b0..00000000000 --- a/app/models/download_record.rb +++ /dev/null @@ -1,6 +0,0 @@ -# This model is used to connect to the downloads database -class DownloadRecord < ApplicationRecord - self.abstract_class = true - extend Timescaledb::ActsAsHypertable - connects_to database: { writing: :downloads, reading: :downloads } -end diff --git a/app/models/downloads_record.rb b/app/models/downloads_db.rb similarity index 66% rename from app/models/downloads_record.rb rename to app/models/downloads_db.rb index c63898013c4..6c448398c97 100644 --- a/app/models/downloads_record.rb +++ b/app/models/downloads_db.rb @@ -1,4 +1,4 @@ -class DownloadsRecord < ApplicationRecord +class DownloadsDB < ApplicationRecord self.abstract_class = true connects_to database: { writing: :downloads } diff --git a/app/models/log_download.rb b/app/models/log_download.rb index c763fb2290c..d8b50e15bd6 100644 --- a/app/models/log_download.rb +++ b/app/models/log_download.rb @@ -1,7 +1,7 @@ # Mimic LogTicket model to store the log files but for the downloads database. # It will be backfilled with the log files from the main database to the downloads database. # There will be a background job to process the log files -class LogDownload < DownloadsRecord +class LogDownload < DownloadsDB enum backend: { s3: 0, local: 1 } enum status: %i[pending processing failed processed].index_with(&:to_s) @@ -28,4 +28,4 @@ def fs def body fs.get(key) end -end +end \ No newline at end of file From 8643fa198fcebdcffbf459c45ff2bdf3c73f26ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Sat, 24 Aug 2024 17:52:26 -0300 Subject: [PATCH 08/14] Test cases for continuous aggregates with multi month context --- app/models/download.rb | 4 +- app/models/log_download.rb | 2 +- ...log_tickets_to_timescale_downloads_task.rb | 4 + config/initializers/zeitwerk.rb | 3 +- .../20240708184547_create_downloads.rb | 7 +- .../20240823181725_create_log_downloads.rb | 2 +- test/models/download_test.rb | 174 ++++++++++++------ 7 files changed, 127 insertions(+), 69 deletions(-) diff --git a/app/models/download.rb b/app/models/download.rb index af4a5c0af3a..70f7f6e26b6 100644 --- a/app/models/download.rb +++ b/app/models/download.rb @@ -1,4 +1,4 @@ -class Download < DownloadRecord +class Download < DownloadsDB extend Timescaledb::ActsAsHypertable acts_as_hypertable time_column: 'created_at' @@ -21,7 +21,7 @@ class Download < DownloadRecord end cagg = -> (view_name) do - Class.new(DownloadRecord) do + Class.new(DownloadsDB) do self.table_name = "downloads_#{view_name}" scope :sum_downloads, -> { select("sum(downloads)::bigint as downloads") } diff --git a/app/models/log_download.rb b/app/models/log_download.rb index d8b50e15bd6..2d1f6d87029 100644 --- a/app/models/log_download.rb +++ b/app/models/log_download.rb @@ -28,4 +28,4 @@ def fs def body fs.get(key) end -end \ No newline at end of file +end diff --git a/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb index bd65083c767..0750ad0b594 100644 --- a/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb +++ b/app/tasks/maintenance/backfill_log_tickets_to_timescale_downloads_task.rb @@ -1,6 +1,10 @@ # frozen_string_literal: true module Maintenance + # Helper to keep backfilling LogTickets to TimescaleDB downloads table. + # It will be used to migrate the data from the old LogTicket table to the new LogDownload table. + # It will be executed in the background and it will be a one time task. + # Later, after all pending LogTickets are migrated, this job will be removed. class BackfillLogTicketsToTimescaleDownloadsTask < MaintenanceTasks::Task def collection LogDownload.where(status: "pending") diff --git a/config/initializers/zeitwerk.rb b/config/initializers/zeitwerk.rb index cba98d517fc..896e8817c1b 100644 --- a/config/initializers/zeitwerk.rb +++ b/config/initializers/zeitwerk.rb @@ -11,5 +11,6 @@ Rails.autoloaders.once.inflector.inflect( "http" => "HTTP", - "oidc" => "OIDC" + "oidc" => "OIDC", + "downloads_db" => "DownloadsDB" ) diff --git a/db/downloads_migrate/20240708184547_create_downloads.rb b/db/downloads_migrate/20240708184547_create_downloads.rb index 618ddf76562..1ac3998ceea 100644 --- a/db/downloads_migrate/20240708184547_create_downloads.rb +++ b/db/downloads_migrate/20240708184547_create_downloads.rb @@ -31,11 +31,10 @@ def self.up gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads)::bigint as downloads").group(1,2), versions_per_minute: Download.versions_per_minute, - versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3), - versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3), - versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, sum(downloads)::bigint as downloads").group(1,2,3) + versions_per_hour: Download::VersionsPerMinute.sum_downloads.per_hour("gem_name, gem_version").group(1,2,3), + versions_per_day: Download::VersionsPerHour.sum_downloads.per_day("gem_name, gem_version").group(1,2,3), + versions_per_month: Download::VersionsPerDay.sum_downloads.per_month("gem_name, gem_version").group(1,2,3) }.each do |name, scope| - puts "Creating continuous aggregate #{name}", scope.to_sql frame = name.to_s.split('per_').last create_continuous_aggregate( "downloads_#{name}", diff --git a/db/downloads_migrate/20240823181725_create_log_downloads.rb b/db/downloads_migrate/20240823181725_create_log_downloads.rb index 9f536945c03..21725c43ca8 100644 --- a/db/downloads_migrate/20240823181725_create_log_downloads.rb +++ b/db/downloads_migrate/20240823181725_create_log_downloads.rb @@ -13,4 +13,4 @@ def change add_index :log_downloads, [:key, :directory], unique: true end -end \ No newline at end of file +end diff --git a/test/models/download_test.rb b/test/models/download_test.rb index f639a4731dd..87f4bbec008 100644 --- a/test/models/download_test.rb +++ b/test/models/download_test.rb @@ -1,6 +1,8 @@ require "test_helper" class DownloadTest < ActiveSupport::TestCase + include ActiveSupport::Testing::TimeHelpers + # This will run once before all tests in this class setup do # Truncate the table because we are using a transaction @@ -8,6 +10,7 @@ class DownloadTest < ActiveSupport::TestCase end teardown do + travel_back # Truncate the table because we are using a transaction Download.connection.execute("truncate table downloads") end @@ -22,40 +25,49 @@ def refresh_all! context ".per_minute" do should "return downloads per minute" do + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.per_minute - assert_equal [], Download.per_minute - - create(:download, created_at: 2.minutes.ago) - create(:download, created_at: 1.minute.ago) - create(:download, created_at: 1.minute.ago) + create(:download, created_at: 2.minutes.ago) + create(:download, created_at: 1.minute.ago) + create(:download, created_at: 1.minute.ago) - assert_equal [1, 2], Download.per_minute.map(&:downloads) + assert_equal [1, 2], Download.per_minute.map(&:downloads) + end end end context ".gems_per_minute" do should "return gems downloads per minute" do - create(:download, gem_name: "example", created_at: 2.minutes.ago) - create(:download, gem_name: "example", created_at: 1.minute.ago) - create(:download, gem_name: "example", created_at: 1.minute.ago) - create(:download, gem_name: "example2", created_at: 1.minute.ago) + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.gems_per_minute + + create(:download, gem_name: "example", created_at: 2.minutes.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example", created_at: 1.minute.ago) + create(:download, gem_name: "example2", created_at: 1.minute.ago) - assert_equal [1, 2], Download.gems_per_minute.where(gem_name: "example").map(&:downloads) - assert_equal [1], Download.gems_per_minute.where(gem_name: "example2").map(&:downloads) + assert_equal [1, 2], Download.gems_per_minute.where(gem_name: "example").map(&:downloads) + assert_equal [1], Download.gems_per_minute.where(gem_name: "example2").map(&:downloads) + end end end context ".versions_per_minute" do should "return versions downloads per minute" do - create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 2.minutes.ago) - create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) - create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) - create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) - create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) - - assert_equal [1, 1], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) - assert_equal [2], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) - assert_equal [1], Download.versions_per_minute.where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + assert_equal [], Download.versions_per_minute + + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 2.minutes.ago) + create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) + create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) + + assert_equal [1, 1], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) + assert_equal [2], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) + assert_equal [1], Download.versions_per_minute.where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) + end end end @@ -93,46 +105,88 @@ def refresh_all! end should "refresh materialized views" do - - create(:download, created_at: 2.minutes.ago) - create(:download, created_at: 1.minute.ago) - create(:download, gem_name: "example", created_at: 1.minute.ago) - create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) - create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) - create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) - create(:download, gem_name: "example2", gem_version: "0.0.2", created_at: 1.minute.ago) - - assert Download.count == 7 - - refresh_all! - - assert Download::PerMinute.count == 2 - assert Download::PerMinute.all.map{_1.attributes["downloads"]} == [1, 6] - - previous = Download::PerMinute - %w[PerHour PerDay PerMonth].each do |view| - cagg = "Download::#{view}".constantize - scope = previous.send(view.underscore) - assert cagg.count == 1, "Expected Download::#{view}.count to be 1, got #{cagg.count}" - assert cagg.all.map(&:attributes) == scope.map(&:attributes) - previous = cagg + travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do + [1.year.ago, 11.months.ago, 10.months.ago, 3.months.ago, 1.month.ago, 1.day.ago, 1.hour.ago, 1.minute.ago, 30.seconds.ago].each do |created_at| + create(:download, gem_name: "alpha", gem_version: "0.0.1", created_at: created_at) + end + + [3.months.ago, 1.month.ago, 1.day.ago, 1.hour.ago, 1.minute.ago, 45.seconds.ago].each do |created_at| + create(:download, gem_name: "beta", gem_version: "0.0.1", created_at: created_at) + end + + refresh_all! + + assert_equal 15, Download.count + assert_equal 8, Download::PerMinute.count + assert_equal 7, Download::PerHour.count + assert_equal 7, Download::PerDay.count + assert_equal 6, Download::PerMonth.count + + expected_per_minute = [ + {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T12:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:59:00.000Z","downloads":4} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_minute, Download::PerMinute.all.map{_1.attributes.symbolize_keys} + + expected_per_hour = [ + {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:00:00.000Z","downloads":6}, + {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T12:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T12:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T12:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_hour, Download::PerHour.all.map{_1.attributes.symbolize_keys} + + expected_per_day = [ + {"created_at":"2024-08-23T00:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T00:00:00.000Z","downloads":6}, + {"created_at":"2023-08-24T00:00:00.000Z","downloads":1}, + {"created_at":"2023-09-24T00:00:00.000Z","downloads":1}, + {"created_at":"2023-10-24T00:00:00.000Z","downloads":1}, + {"created_at":"2024-05-24T00:00:00.000Z","downloads":2}, + {"created_at":"2024-07-24T00:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + + assert_equal expected_per_day, Download::PerDay.all.map{_1.attributes.symbolize_keys} + + expected_per_month = [ + {"created_at":"2024-08-01T00:00:00.000Z","downloads":8}, + {"created_at":"2023-08-01T00:00:00.000Z","downloads":1}, + {"created_at":"2023-09-01T00:00:00.000Z","downloads":1}, + {"created_at":"2023-10-01T00:00:00.000Z","downloads":1}, + {"created_at":"2024-05-01T00:00:00.000Z","downloads":2}, + {"created_at":"2024-07-01T00:00:00.000Z","downloads":2} + ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} + assert_equal expected_per_month, Download::PerMonth.all.map{_1.attributes.symbolize_keys} + + assert_equal [1, 1, 1, 2, 2, 2, 2, 4], Download::PerMinute.all.map{_1.attributes["downloads"]} + + downloads_per_minute = -> (gem_name) { Download::GemsPerMinute.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 1, 1, 1, 1, 1, 1, 2], downloads_per_minute.("alpha") + assert_equal [1,1,1,1,2], downloads_per_minute.("beta") + + downloads_per_hour = -> (gem_name) { Download::GemsPerHour.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_hour.("alpha") + assert_equal [1, 3, 1, 1], downloads_per_hour.("beta") + + downloads_per_day = -> (gem_name) { Download::GemsPerDay.where('gem_name': gem_name).pluck('downloads')} + assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_day.("alpha") + assert_equal [1, 3, 1, 1], downloads_per_day.("beta") + + downloads_per_month = -> (gem_name) { Download::GemsPerMonth.where('gem_name': gem_name).pluck('downloads')} + assert_equal [4, 1, 1, 1, 1, 1], downloads_per_month.("alpha") + assert_equal [4, 1, 1], downloads_per_month.("beta") end - - downloads_per_minute = -> (gem_name) { Download::GemsPerMinute.where('gem_name': gem_name).pluck('downloads')} - assert downloads_per_minute.("example") == [1, 4] - assert downloads_per_minute.("example2") == [2] - - downloads_per_hour = -> (gem_name) { Download::GemsPerHour.where('gem_name': gem_name).pluck('downloads')} - assert downloads_per_hour.("example") == [5] - assert downloads_per_hour.("example2") == [2] - - downloads_per_day = -> (gem_name) { Download::GemsPerDay.where('gem_name': gem_name).pluck('downloads')} - assert downloads_per_day.("example") == [5] - assert downloads_per_day.("example2") == [2] - - downloads_per_month = -> (gem_name) { Download::GemsPerMonth.where('gem_name': gem_name).pluck('downloads')} - assert downloads_per_month.("example") == [5] - assert downloads_per_month.("example2") == [2] end end end \ No newline at end of file From 6eca343a4df8f012044322b8a86d2d01438a7ba2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 25 Oct 2024 11:46:00 -0300 Subject: [PATCH 09/14] Use continuous_aggregate macro --- app/models/download.rb | 98 +++---------------- app/models/downloads_db.rb | 3 + .../20240708184547_create_downloads.rb | 38 +------ db/downloads_schema.rb | 69 +++++++------ 4 files changed, 51 insertions(+), 157 deletions(-) diff --git a/app/models/download.rb b/app/models/download.rb index 70f7f6e26b6..0d5afe7dec7 100644 --- a/app/models/download.rb +++ b/app/models/download.rb @@ -1,90 +1,18 @@ class Download < DownloadsDB - extend Timescaledb::ActsAsHypertable acts_as_hypertable time_column: 'created_at' - scope :time_bucket, -> (range='1m', query="count(*)") do - time_column = self.hypertable_options[:time_column] - select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") - end - - scope :per_minute, -> (query="count(*) as downloads") do - time_bucket('1m', query).group(1).order(1) - end - - scope :gems_per_minute, -> do - per_minute("gem_name, count(*) as downloads").group(1,2) - end - - scope :versions_per_minute, -> do - per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3) - end - - cagg = -> (view_name) do - Class.new(DownloadsDB) do - self.table_name = "downloads_#{view_name}" - - scope :sum_downloads, -> { select("sum(downloads)::bigint as downloads") } - scope :avg_downloads, -> { select("avg(downloads)::bigint as avg_downloads") } - - scope :rollup, -> (range='1d', query=:sum_downloads) do - time_column = Download.hypertable_options[:time_column] - if query.is_a?(Symbol) - select("time_bucket('#{range}', #{time_column}) as #{time_column}") - .public_send(query) - .group(1) - else - select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}") - .group(1) - end - end - - scope :per_hour, -> (query=:sum_downloads) do - rollup('1h', query) - end - - scope :per_day, -> (query=:sum_downloads) do - rollup('1d', query) - end - - scope :per_week, -> (query=:sum_downloads) do - rollup('1w', query) - end - - scope :per_month, -> (query=:sum_downloads) do - rollup('1mon', query) - end - - scope :per_year, -> (query=:sum_downloads) do - rollup('1y', query) - end - - def readonly? - true - end - - def self.refresh! - connection_pool.with_connection do |conn| - # Fixme: This is a workaround to guarantee we're in a fresh connection - conn.reset! if conn.transaction_open? - conn.raw_connection.exec("CALL refresh_continuous_aggregate('#{table_name}', NULL, NULL)") - end - end - end - end - - MaterializedViews = [ - PerMinute = cagg['per_minute'], - PerHour = cagg['per_hour'], - PerDay = cagg['per_day'], - PerMonth = cagg['per_month'], - GemsPerMinute = cagg['gems_per_minute'], - GemsPerHour = cagg['gems_per_hour'], - GemsPerDay = cagg['gems_per_day'], - GemsPerMonth = cagg['gems_per_month'], - VersionsPerMinute = cagg['versions_per_minute'], - VersionsPerHour = cagg['versions_per_hour'], - VersionsPerDay = cagg['versions_per_day'], - VersionsPerMonth = cagg['versions_per_month'] - ] + scope :total_downloads, -> { select("count(*) as total") } + scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) } + scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) } + + continuous_aggregates( + timeframes: [:minute, :hour, :day, :month], + scopes: [:total_downloads, :downloads_by_gem, :downloads_by_version], + refresh_policy: { + minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" }, + hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" }, + day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 day" }, + month: { start_offset: "3 month", end_offset: "1 day", schedule_interval: "1 day" } + }) end diff --git a/app/models/downloads_db.rb b/app/models/downloads_db.rb index 6c448398c97..59da28a0b3b 100644 --- a/app/models/downloads_db.rb +++ b/app/models/downloads_db.rb @@ -2,4 +2,7 @@ class DownloadsDB < ApplicationRecord self.abstract_class = true connects_to database: { writing: :downloads } + + extend Timescaledb::ActsAsHypertable + include Timescaledb::ContinuousAggregatesHelper end diff --git a/db/downloads_migrate/20240708184547_create_downloads.rb b/db/downloads_migrate/20240708184547_create_downloads.rb index 1ac3998ceea..86d9afa82a3 100644 --- a/db/downloads_migrate/20240708184547_create_downloads.rb +++ b/db/downloads_migrate/20240708184547_create_downloads.rb @@ -19,44 +19,10 @@ def self.up t.jsonb :payload end - { - per_minute: Download.per_minute, - per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1), - per_day: Download::PerHour.per_day(:sum_downloads).group(1), - per_month: Download::PerDay.per_month(:sum_downloads).group(1), - - gems_per_minute: Download.gems_per_minute, - gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, sum(downloads)::bigint as downloads").group(1,2), - gems_per_day: Download::GemsPerHour.per_day("gem_name, sum(downloads)::bigint as downloads").group(1,2), - gems_per_month: Download::GemsPerDay.per_month("gem_name, sum(downloads)::bigint as downloads").group(1,2), - - versions_per_minute: Download.versions_per_minute, - versions_per_hour: Download::VersionsPerMinute.sum_downloads.per_hour("gem_name, gem_version").group(1,2,3), - versions_per_day: Download::VersionsPerHour.sum_downloads.per_day("gem_name, gem_version").group(1,2,3), - versions_per_month: Download::VersionsPerDay.sum_downloads.per_month("gem_name, gem_version").group(1,2,3) - }.each do |name, scope| - frame = name.to_s.split('per_').last - create_continuous_aggregate( - "downloads_#{name}", - scope.to_sql, - refresh_policies: { - schedule_interval: "INTERVAL '1 #{frame}'", - start_offset: "INTERVAL '3 #{frame}'", - end_offset: "INTERVAL '1 minute'" - }) - end + Download.create_continuous_aggregates end def self.down - %w[month day hour minute].each do |frame| - ["downloads_per_#{frame}", - "downloads_gems_per_#{frame}", - "downloads_versions_per_#{frame}", - ].each do |view| - safety_assured do - execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade") - end - end - end + Download.drop_continuous_aggregates drop_table(:downloads, force: :cascade, if_exists: true) if Download.table_exists? end diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index d1613ada846..f69670f802b 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -35,102 +35,99 @@ end create_hypertable "downloads", time_column: "created_at", chunk_time_interval: "1 day", compress_segmentby: "gem_name, gem_version", compress_orderby: "created_at DESC", compression_interval: "P7D" - create_continuous_aggregate("downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("total_downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT10M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, - count(*) AS downloads + count(*) AS total FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)) - ORDER BY (time_bucket('PT1M'::interval, created_at)) SQL - create_continuous_aggregate("downloads_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("total_downloads_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT4H'", end_offset: "INTERVAL 'PT1H'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, - (sum(downloads))::bigint AS downloads - FROM downloads_per_minute + sum(total) AS total + FROM total_downloads_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)) SQL - create_continuous_aggregate("downloads_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("total_downloads_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, - (sum(downloads))::bigint AS downloads - FROM downloads_per_hour + sum(total) AS total + FROM total_downloads_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)) SQL - create_continuous_aggregate("downloads_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("total_downloads_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, - (sum(downloads))::bigint AS downloads - FROM downloads_per_day + sum(total) AS total + FROM total_downloads_per_day GROUP BY (time_bucket('P1M'::interval, created_at)) SQL - create_continuous_aggregate("downloads_gems_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_gem_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT10M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, gem_name, - count(*) AS downloads + count(*) AS total FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name - ORDER BY (time_bucket('PT1M'::interval, created_at)) SQL - create_continuous_aggregate("downloads_gems_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_gem_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT4H'", end_offset: "INTERVAL 'PT1H'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, - (sum(downloads))::bigint AS downloads - FROM downloads_gems_per_minute + sum(total) AS total + FROM downloads_by_gem_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name SQL - create_continuous_aggregate("downloads_gems_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_gem_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, - (sum(downloads))::bigint AS downloads - FROM downloads_gems_per_hour + sum(total) AS total + FROM downloads_by_gem_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name SQL - create_continuous_aggregate("downloads_gems_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_gem_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, - (sum(downloads))::bigint AS downloads - FROM downloads_gems_per_day + sum(total) AS total + FROM downloads_by_gem_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name SQL - create_continuous_aggregate("downloads_versions_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_version_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT10M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, gem_name, gem_version, - count(*) AS downloads + count(*) AS total FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name, gem_version - ORDER BY (time_bucket('PT1M'::interval, created_at)) SQL - create_continuous_aggregate("downloads_versions_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT3H'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_version_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT4H'", end_offset: "INTERVAL 'PT1H'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, gem_version, - (sum(downloads))::bigint AS downloads - FROM downloads_versions_per_minute + sum(total) AS total + FROM downloads_by_version_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name, gem_version SQL - create_continuous_aggregate("downloads_versions_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_version_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, gem_version, - (sum(downloads))::bigint AS downloads - FROM downloads_versions_per_hour + sum(total) AS total + FROM downloads_by_version_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name, gem_version SQL - create_continuous_aggregate("downloads_versions_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '2629746'"}, materialized_only: true, finalized: true) + create_continuous_aggregate("downloads_by_version_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, gem_version, - (sum(downloads))::bigint AS downloads - FROM downloads_versions_per_day + sum(total) AS total + FROM downloads_by_version_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name, gem_version SQL From 9ae442e8ee1e8120a3fc2bf915bad6ea07a00613 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 25 Oct 2024 12:28:57 -0300 Subject: [PATCH 10/14] Refactor scopes --- app/models/download.rb | 2 ++ app/models/downloads_db.rb | 3 --- app/models/log_download.rb | 6 ++++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/app/models/download.rb b/app/models/download.rb index 0d5afe7dec7..678453f7c5e 100644 --- a/app/models/download.rb +++ b/app/models/download.rb @@ -1,4 +1,6 @@ class Download < DownloadsDB + extend Timescaledb::ActsAsHypertable + include Timescaledb::ContinuousAggregatesHelper acts_as_hypertable time_column: 'created_at' diff --git a/app/models/downloads_db.rb b/app/models/downloads_db.rb index 59da28a0b3b..6c448398c97 100644 --- a/app/models/downloads_db.rb +++ b/app/models/downloads_db.rb @@ -2,7 +2,4 @@ class DownloadsDB < ApplicationRecord self.abstract_class = true connects_to database: { writing: :downloads } - - extend Timescaledb::ActsAsHypertable - include Timescaledb::ContinuousAggregatesHelper end diff --git a/app/models/log_download.rb b/app/models/log_download.rb index 2d1f6d87029..cc10c96c611 100644 --- a/app/models/log_download.rb +++ b/app/models/log_download.rb @@ -4,9 +4,11 @@ class LogDownload < DownloadsDB enum backend: { s3: 0, local: 1 } enum status: %i[pending processing failed processed].index_with(&:to_s) - + + scope :latest_created_at, -> { order(created_at: :desc).select(:created_at).pluck(:created_at).first } + def self.pop(key: nil, directory: nil) - scope = pending.limit(1).order("id ASC") + scope = pending.limit(1).order("created_at ASC") scope = scope.where(key: key) if key scope = scope.where(directory: directory) if directory scope.lock(true).sole.tap do |log_download| From a8bf2b5de03af80827d106db96346b3899761a2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Fri, 25 Oct 2024 20:52:58 -0300 Subject: [PATCH 11/14] Fix tests --- app/models/download.rb | 8 +-- test/models/download_test.rb | 127 +++++++++++++++-------------------- 2 files changed, 60 insertions(+), 75 deletions(-) diff --git a/app/models/download.rb b/app/models/download.rb index 678453f7c5e..25900dd3973 100644 --- a/app/models/download.rb +++ b/app/models/download.rb @@ -2,11 +2,11 @@ class Download < DownloadsDB extend Timescaledb::ActsAsHypertable include Timescaledb::ContinuousAggregatesHelper - acts_as_hypertable time_column: 'created_at' + acts_as_hypertable time_column: 'created_at', segment_by: [:gem_name, :gem_version] - scope :total_downloads, -> { select("count(*) as total") } - scope :downloads_by_gem, -> { select("gem_name, count(*) as total").group(:gem_name) } - scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as total").group(:gem_name, :gem_version) } + scope :total_downloads, -> { select("count(*) as downloads").order(:created_at) } + scope :downloads_by_gem, -> { select("gem_name, count(*) as downloads").group(:gem_name).order(:created_at) } + scope :downloads_by_version, -> { select("gem_name, gem_version, count(*) as downloads").group(:gem_name, :gem_version).order(:created_at) } continuous_aggregates( timeframes: [:minute, :hour, :day, :month], diff --git a/test/models/download_test.rb b/test/models/download_test.rb index 87f4bbec008..25af38f1ed3 100644 --- a/test/models/download_test.rb +++ b/test/models/download_test.rb @@ -18,45 +18,43 @@ class DownloadTest < ActiveSupport::TestCase def refresh_all! # We need to commit the transaction to make sure the continuous aggregates are refreshed with latest data Download.connection.commit_db_transaction - Download::MaterializedViews.each do |view| - view.refresh! - end + Download.refresh_aggregates end - context ".per_minute" do + context ".rollup(:minute)" do should "return downloads per minute" do travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do - assert_equal [], Download.per_minute + assert_equal 0, Download.count create(:download, created_at: 2.minutes.ago) create(:download, created_at: 1.minute.ago) create(:download, created_at: 1.minute.ago) - assert_equal [1, 2], Download.per_minute.map(&:downloads) + assert_equal [1, 2], Download.total_downloads.rollup(:minute).map(&:downloads) end end end - context ".gems_per_minute" do + context ".downloads_by_gem.rollup(:minute)" do should "return gems downloads per minute" do travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do - assert_equal [], Download.gems_per_minute + assert_equal [], Download.downloads_by_gem.rollup(:minute).map(&:downloads) create(:download, gem_name: "example", created_at: 2.minutes.ago) create(:download, gem_name: "example", created_at: 1.minute.ago) create(:download, gem_name: "example", created_at: 1.minute.ago) create(:download, gem_name: "example2", created_at: 1.minute.ago) - assert_equal [1, 2], Download.gems_per_minute.where(gem_name: "example").map(&:downloads) - assert_equal [1], Download.gems_per_minute.where(gem_name: "example2").map(&:downloads) + assert_equal [1, 2], Download.downloads_by_gem.rollup(:minute).where(gem_name: "example").map(&:downloads) + assert_equal [1], Download.downloads_by_gem.rollup(:minute).where(gem_name: "example2").map(&:downloads) end end end - context ".versions_per_minute" do + context ".downloads_by_version.rollup(:minute)" do should "return versions downloads per minute" do travel_to Time.zone.local(2024, 8, 24, 12, 0, 0) do - assert_equal [], Download.versions_per_minute + assert_equal 0, Download.count create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 2.minutes.ago) create(:download, gem_name: "example", gem_version: "0.0.1", created_at: 1.minute.ago) @@ -64,44 +62,30 @@ def refresh_all! create(:download, gem_name: "example", gem_version: "0.0.2", created_at: 1.minute.ago) create(:download, gem_name: "example2", gem_version: "0.0.1", created_at: 1.minute.ago) - assert_equal [1, 1], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) - assert_equal [2], Download.versions_per_minute.where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) - assert_equal [1], Download.versions_per_minute.where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) + assert_equal [1, 1], Download.downloads_by_version.rollup(:minute).where(gem_name: "example", gem_version: "0.0.1").map(&:downloads) + assert_equal [2], Download.downloads_by_version.rollup(:minute).where(gem_name: "example", gem_version: "0.0.2").map(&:downloads) + assert_equal [1], Download.downloads_by_version.rollup(:minute).where(gem_name: "example2", gem_version: "0.0.1").map(&:downloads) end end end context "Continuous Aggregate" do should "materialized views by minute, hour, day and month" do - assert Download::PerMinute.table_exists? - assert Download::PerHour.table_exists? - assert Download::PerDay.table_exists? - assert Download::PerMonth.table_exists? - - assert Download::GemsPerMinute.table_exists? - assert Download::GemsPerHour.table_exists? - assert Download::GemsPerDay.table_exists? - assert Download::GemsPerMonth.table_exists? - - assert Download::VersionsPerMinute.table_exists? - assert Download::VersionsPerHour.table_exists? - assert Download::VersionsPerDay.table_exists? - assert Download::VersionsPerMonth.table_exists? - - assert Download::MaterializedViews == [ - Download::PerMinute, - Download::PerHour, - Download::PerDay, - Download::PerMonth, - Download::GemsPerMinute, - Download::GemsPerHour, - Download::GemsPerDay, - Download::GemsPerMonth, - Download::VersionsPerMinute, - Download::VersionsPerHour, - Download::VersionsPerDay, - Download::VersionsPerMonth - ] + assert Download::TotalDownloadsPerMinute.table_exists? + assert Download::TotalDownloadsPerHour.table_exists? + assert Download::TotalDownloadsPerDay.table_exists? + assert Download::TotalDownloadsPerMonth.table_exists? + + assert Download::DownloadsByGemPerMinute.table_exists? + assert Download::DownloadsByGemPerHour.table_exists? + assert Download::DownloadsByGemPerDay.table_exists? + assert Download::DownloadsByGemPerMonth.table_exists? + + assert Download::DownloadsByVersionPerMinute.table_exists? + assert Download::DownloadsByVersionPerHour.table_exists? + assert Download::DownloadsByVersionPerDay.table_exists? + assert Download::DownloadsByVersionPerMonth.table_exists? + end should "refresh materialized views" do @@ -117,10 +101,10 @@ def refresh_all! refresh_all! assert_equal 15, Download.count - assert_equal 8, Download::PerMinute.count - assert_equal 7, Download::PerHour.count - assert_equal 7, Download::PerDay.count - assert_equal 6, Download::PerMonth.count + assert_equal 8, Download::TotalDownloadsPerMinute.count + assert_equal 7, Download::TotalDownloadsPerHour.count + assert_equal 7, Download::TotalDownloadsPerDay.count + assert_equal 6, Download::TotalDownloadsPerMonth.count expected_per_minute = [ {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, @@ -133,59 +117,60 @@ def refresh_all! {"created_at":"2024-08-24T11:59:00.000Z","downloads":4} ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} - assert_equal expected_per_minute, Download::PerMinute.all.map{_1.attributes.symbolize_keys} + assert_equal expected_per_minute, Download::TotalDownloadsPerMinute.order(:created_at).all.map{_1.attributes.symbolize_keys} expected_per_hour = [ - {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, - {"created_at":"2024-08-24T11:00:00.000Z","downloads":6}, {"created_at":"2023-08-24T12:00:00.000Z","downloads":1}, {"created_at":"2023-09-24T12:00:00.000Z","downloads":1}, {"created_at":"2023-10-24T12:00:00.000Z","downloads":1}, {"created_at":"2024-05-24T12:00:00.000Z","downloads":2}, - {"created_at":"2024-07-24T12:00:00.000Z","downloads":2} + {"created_at":"2024-07-24T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-23T12:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T11:00:00.000Z","downloads":6}, ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} - assert_equal expected_per_hour, Download::PerHour.all.map{_1.attributes.symbolize_keys} + assert_equal expected_per_hour, Download::TotalDownloadsPerHour.order(created_at: :asc).all.map{_1.attributes.symbolize_keys} expected_per_day = [ - {"created_at":"2024-08-23T00:00:00.000Z","downloads":2}, - {"created_at":"2024-08-24T00:00:00.000Z","downloads":6}, {"created_at":"2023-08-24T00:00:00.000Z","downloads":1}, {"created_at":"2023-09-24T00:00:00.000Z","downloads":1}, {"created_at":"2023-10-24T00:00:00.000Z","downloads":1}, {"created_at":"2024-05-24T00:00:00.000Z","downloads":2}, - {"created_at":"2024-07-24T00:00:00.000Z","downloads":2} + {"created_at":"2024-07-24T00:00:00.000Z","downloads":2}, + {"created_at":"2024-08-23T00:00:00.000Z","downloads":2}, + {"created_at":"2024-08-24T00:00:00.000Z","downloads":6}, ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} - assert_equal expected_per_day, Download::PerDay.all.map{_1.attributes.symbolize_keys} + assert_equal expected_per_day, Download::TotalDownloadsPerDay.order(:created_at).all.map{_1.attributes.symbolize_keys} expected_per_month = [ - {"created_at":"2024-08-01T00:00:00.000Z","downloads":8}, {"created_at":"2023-08-01T00:00:00.000Z","downloads":1}, {"created_at":"2023-09-01T00:00:00.000Z","downloads":1}, {"created_at":"2023-10-01T00:00:00.000Z","downloads":1}, {"created_at":"2024-05-01T00:00:00.000Z","downloads":2}, - {"created_at":"2024-07-01T00:00:00.000Z","downloads":2} + {"created_at":"2024-07-01T00:00:00.000Z","downloads":2}, + {"created_at":"2024-08-01T00:00:00.000Z","downloads":8}, ].map{|h| h[:created_at] = Time.zone.parse(h[:created_at]); h} - assert_equal expected_per_month, Download::PerMonth.all.map{_1.attributes.symbolize_keys} - assert_equal [1, 1, 1, 2, 2, 2, 2, 4], Download::PerMinute.all.map{_1.attributes["downloads"]} + assert_equal expected_per_month, Download::TotalDownloadsPerMonth.order(:created_at).all.map{_1.attributes.symbolize_keys} + + assert_equal [1, 1, 1, 2, 2, 2, 2, 4], Download::TotalDownloadsPerMinute.order(:created_at).all.map{_1.attributes["downloads"]} - downloads_per_minute = -> (gem_name) { Download::GemsPerMinute.where('gem_name': gem_name).pluck('downloads')} + downloads_per_minute = -> (gem_name) { Download::DownloadsByGemPerMinute.where('gem_name': gem_name).order(:created_at).pluck('downloads')} assert_equal [1, 1, 1, 1, 1, 1, 1, 2], downloads_per_minute.("alpha") assert_equal [1,1,1,1,2], downloads_per_minute.("beta") - downloads_per_hour = -> (gem_name) { Download::GemsPerHour.where('gem_name': gem_name).pluck('downloads')} - assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_hour.("alpha") - assert_equal [1, 3, 1, 1], downloads_per_hour.("beta") + downloads_per_hour = -> (gem_name) { Download::DownloadsByGemPerHour.where('gem_name': gem_name).order(:created_at).pluck('downloads')} + assert_equal [1, 1, 1, 1, 1, 1, 3], downloads_per_hour.("alpha") + assert_equal [1, 1, 1, 3], downloads_per_hour.("beta") - downloads_per_day = -> (gem_name) { Download::GemsPerDay.where('gem_name': gem_name).pluck('downloads')} - assert_equal [1, 3, 1, 1, 1, 1, 1], downloads_per_day.("alpha") - assert_equal [1, 3, 1, 1], downloads_per_day.("beta") + downloads_per_day = -> (gem_name) { Download::DownloadsByGemPerDay.where('gem_name': gem_name).order(:created_at).pluck('downloads')} + assert_equal [1, 1, 1, 1, 1, 1, 3], downloads_per_day.("alpha") + assert_equal [1, 1, 1, 3], downloads_per_day.("beta") - downloads_per_month = -> (gem_name) { Download::GemsPerMonth.where('gem_name': gem_name).pluck('downloads')} - assert_equal [4, 1, 1, 1, 1, 1], downloads_per_month.("alpha") - assert_equal [4, 1, 1], downloads_per_month.("beta") + downloads_per_month = -> (gem_name) { Download::DownloadsByGemPerMonth.where('gem_name': gem_name).order(:created_at).pluck('downloads')} + assert_equal [1, 1, 1, 1, 1, 4], downloads_per_month.("alpha") + assert_equal [1, 1, 4], downloads_per_month.("beta") end end end From 0cde4537e2dc79c4be757218131e46381df60f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Mon, 18 Nov 2024 17:14:35 -0300 Subject: [PATCH 12/14] Update schema --- db/downloads_schema.rb | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb index f69670f802b..2e2c67ef86f 100644 --- a/db/downloads_schema.rb +++ b/db/downloads_schema.rb @@ -37,28 +37,28 @@ create_hypertable "downloads", time_column: "created_at", chunk_time_interval: "1 day", compress_segmentby: "gem_name, gem_version", compress_orderby: "created_at DESC", compression_interval: "P7D" create_continuous_aggregate("total_downloads_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT10M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, - count(*) AS total + count(*) AS downloads FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)) SQL create_continuous_aggregate("total_downloads_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT4H'", end_offset: "INTERVAL 'PT1H'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, - sum(total) AS total + sum(downloads) AS downloads FROM total_downloads_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)) SQL create_continuous_aggregate("total_downloads_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, - sum(total) AS total + sum(downloads) AS downloads FROM total_downloads_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)) SQL create_continuous_aggregate("total_downloads_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, - sum(total) AS total + sum(downloads) AS downloads FROM total_downloads_per_day GROUP BY (time_bucket('P1M'::interval, created_at)) SQL @@ -66,7 +66,7 @@ create_continuous_aggregate("downloads_by_gem_per_minute", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT10M'", end_offset: "INTERVAL 'PT1M'", schedule_interval: "INTERVAL '60'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1M'::interval, created_at) AS created_at, gem_name, - count(*) AS total + count(*) AS downloads FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name SQL @@ -74,7 +74,7 @@ create_continuous_aggregate("downloads_by_gem_per_hour", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'PT4H'", end_offset: "INTERVAL 'PT1H'", schedule_interval: "INTERVAL '3600'"}, materialized_only: true, finalized: true) SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_gem_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name SQL @@ -82,7 +82,7 @@ create_continuous_aggregate("downloads_by_gem_per_day", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3D'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_gem_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name SQL @@ -90,7 +90,7 @@ create_continuous_aggregate("downloads_by_gem_per_month", <<-SQL, refresh_policies: { start_offset: "INTERVAL 'P3M'", end_offset: "INTERVAL 'P1D'", schedule_interval: "INTERVAL '86400'"}, materialized_only: true, finalized: true) SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_gem_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name SQL @@ -99,7 +99,7 @@ SELECT time_bucket('PT1M'::interval, created_at) AS created_at, gem_name, gem_version, - count(*) AS total + count(*) AS downloads FROM downloads GROUP BY (time_bucket('PT1M'::interval, created_at)), gem_name, gem_version SQL @@ -108,7 +108,7 @@ SELECT time_bucket('PT1H'::interval, created_at) AS created_at, gem_name, gem_version, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_version_per_minute GROUP BY (time_bucket('PT1H'::interval, created_at)), gem_name, gem_version SQL @@ -117,7 +117,7 @@ SELECT time_bucket('P1D'::interval, created_at) AS created_at, gem_name, gem_version, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_version_per_hour GROUP BY (time_bucket('P1D'::interval, created_at)), gem_name, gem_version SQL @@ -126,7 +126,7 @@ SELECT time_bucket('P1M'::interval, created_at) AS created_at, gem_name, gem_version, - sum(total) AS total + sum(downloads) AS downloads FROM downloads_by_version_per_day GROUP BY (time_bucket('P1M'::interval, created_at)), gem_name, gem_version SQL From 81eb03d625add362037f396ef391db572766dbc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Mon, 18 Nov 2024 17:54:10 -0300 Subject: [PATCH 13/14] Add task to backfill log downloads --- ...ill_log_downloads_from_log_tickets_task.rb | 25 +++++++++++++++++++ ...og_downloads_from_log_tickets_task_test.rb | 22 ++++++++++++++++ 2 files changed, 47 insertions(+) create mode 100644 app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb create mode 100644 test/tasks/maintenance/backfill_log_downloads_from_log_tickets_task_test.rb diff --git a/app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb b/app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb new file mode 100644 index 00000000000..6e0534e9d5f --- /dev/null +++ b/app/tasks/maintenance/backfill_log_downloads_from_log_tickets_task.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module Maintenance + # This task is used to backfill log downloads from log tickets. + # It is used to migrate from past to present using created_at date to order + # limit 500 per iteration and use latest created_at date to get the next 500 + # later union with pending tickets. + class BackfillLogDownloadsFromLogTicketsTask < MaintenanceTasks::Task + def collection + # migrate from past to present using created_at date to order + # limit 500 per iteration and use latest created_at date to get the next 500 + # later union with pending tickets + scope = LogTicket.processed.order(created_at: :asc) + last_created_at = LogDownload.latest_created_at + scope = scope.where("created_at < ?", last_created_at) if last_created_at + scope + .limit(500) + .union(LogTicket.pending.order(created_at: :asc).limit(500)) + end + + def process(batch) + LogDownload.insert_all(batch.select(:id, :status, :directory, :key, :created_at).to_a.map(&:attributes)) + end + end +end diff --git a/test/tasks/maintenance/backfill_log_downloads_from_log_tickets_task_test.rb b/test/tasks/maintenance/backfill_log_downloads_from_log_tickets_task_test.rb new file mode 100644 index 00000000000..bd3933660b2 --- /dev/null +++ b/test/tasks/maintenance/backfill_log_downloads_from_log_tickets_task_test.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +require "test_helper" + +module Maintenance + class BackfillLogDownloadsFromLogTicketsTaskTest < ActiveSupport::TestCase + setup do + # create a list of log ticket statuses + @log_ticket_statuses = %w[pending processed] + @log_ticket_statuses.each do |status| + 3.times { create(:log_ticket, status: status) } + end + end + + test "#process performs a task iteration" do + assert_equal LogTicket.count, 6 + assert_equal LogDownload.count, 0 + Maintenance::BackfillLogDownloadsFromLogTicketsTask.process(LogTicket.all) + assert_equal LogDownload.count, 6 + end + end +end From 718c6a40760d56af319cc2edfad2f5e506bc8aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B4natas=20Davi=20Paganini?= Date: Mon, 18 Nov 2024 18:13:48 -0300 Subject: [PATCH 14/14] Update timescaledb version --- Gemfile | 2 +- Gemfile.lock | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Gemfile b/Gemfile index 5cc75dda44b..6fd4d845d5d 100644 --- a/Gemfile +++ b/Gemfile @@ -57,7 +57,7 @@ gem "phlex-rails", "~> 1.2" gem "discard", "~> 1.3" gem "user_agent_parser", "~> 2.18" gem "pghero", "~> 3.5" -gem "timescaledb", "~> 0.2" +gem "timescaledb", "~> 0.3.0" # Admin dashboard gem "avo", "~> 2.51" diff --git a/Gemfile.lock b/Gemfile.lock index 18526310b42..2dc46337f37 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -710,7 +710,7 @@ GEM thor (1.3.1) tilt (2.3.0) timeout (0.4.1) - timescaledb (0.2.9) + timescaledb (0.3.0) activerecord activesupport pg (~> 1.2) @@ -877,7 +877,7 @@ DEPENDENCIES strong_migrations (~> 2.0) tailwindcss-rails (~> 2.6) terser (~> 1.2) - timescaledb (~> 0.2) + timescaledb (~> 0.3.0) toxiproxy (~> 2.0) unpwn (~> 1.0) user_agent_parser (~> 2.18) @@ -1155,7 +1155,7 @@ CHECKSUMS thor (1.3.1) sha256=fa7e3471d4f6a27138e3d9c9b0d4daac9c3d7383927667ae83e9ab42ae7401ef tilt (2.3.0) sha256=82dd903d61213c63679d28e404ee8e10d1b0fdf5270f1ad0898ec314cc3e745c timeout (0.4.1) sha256=6f1f4edd4bca28cffa59501733a94215407c6960bd2107331f0280d4abdebb9a - timescaledb (0.2.9) sha256=1d1382d5b889bf8ec6bcdcbf061c8e28234bc343911c81d48a9dfb22d9d5cfdc + timescaledb (0.3.0) sha256=9ce2b39417d30544054cb609fbd84e18e304c7b7952a793846b8f4489551a28f toxiproxy (2.0.2) sha256=2e3b53604fb921d40da3db8f78a52b3133fcae33e93d440725335b15974e440a tpm-key_attestation (0.12.0) sha256=e133d80cf24fef0e7a7dfad00fd6aeff01fc79875fbfc66cd8537bbd622b1e6d turbo-rails (1.5.0) sha256=b426cc762fb0940277729b3f1751a9f0bd269f5613c1d62ac73e5f0be7c7a83e