From 10de755b186385f3f4eb02dff22636332de7ffde Mon Sep 17 00:00:00 2001 From: Samuel Giddins Date: Thu, 9 Mar 2023 12:44:35 -0800 Subject: [PATCH] [WIP] Store downloads in timescale --- Gemfile | 1 + Gemfile.lock | 3 + app/avo/resources/download_resource.rb | 17 ++ app/controllers/avo/downloads_controller.rb | 4 + app/jobs/fastly_log_processor.rb | 57 ++++++- app/models/download.rb | 33 ++++ app/models/download_record.rb | 7 + config/database.yml.sample | 84 ++++++--- .../20230308211125_create_downloads.rb | 14 ++ ..._create_downloads_continuous_aggregates.rb | 107 ++++++++++++ db/downloads_schema.rb | 160 ++++++++++++++++++ docker-compose.yml | 6 + test/factories.rb | 8 + test/jobs/fastly_log_processor_job_test.rb | 90 +++++++++- test/models/download_test.rb | 7 + test/sample_logs/fastly-fake.log | 6 +- test/test_helper.rb | 7 + 17 files changed, 574 insertions(+), 37 deletions(-) create mode 100644 app/avo/resources/download_resource.rb create mode 100644 app/controllers/avo/downloads_controller.rb create mode 100644 app/models/download.rb create mode 100644 app/models/download_record.rb create mode 100644 db/downloads_migrate/20230308211125_create_downloads.rb create mode 100644 db/downloads_migrate/20230308224729_create_downloads_continuous_aggregates.rb create mode 100644 db/downloads_schema.rb create mode 100644 test/models/download_test.rb diff --git a/Gemfile b/Gemfile index 43d29cc2940..bf03cab8739 100644 --- a/Gemfile +++ b/Gemfile @@ -48,6 +48,7 @@ gem "rqrcode" gem "rotp" gem "unpwn" gem "webauthn" +gem "timescaledb-rails" # Admin dashboard gem "avo" diff --git a/Gemfile.lock b/Gemfile.lock index ec22bdb8bab..71b792b8582 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -538,6 +538,8 @@ GEM thor (1.2.1) tilt (2.0.10) timeout (0.3.1) + timescaledb-rails (0.1.5) + rails (>= 6.0) toxiproxy (2.0.2) tpm-key_attestation (0.12.0) bindata (~> 2.4) @@ -663,6 +665,7 @@ DEPENDENCIES sprockets-rails statsd-instrument (~> 2.3.0) terser + timescaledb-rails toxiproxy (~> 2.0.0) unpwn validates_formatting_of diff --git a/app/avo/resources/download_resource.rb b/app/avo/resources/download_resource.rb new file mode 100644 index 00000000000..c2f115ad662 --- /dev/null +++ b/app/avo/resources/download_resource.rb @@ -0,0 +1,17 @@ +class DownloadResource < Avo::BaseResource + self.title = :id + self.includes = [] + # self.search_query = -> do + # scope.ransack(id_eq: params[:q], m: "or").result(distinct: false) + # end + + field :id, as: :id + # Fields generated from the model + field :rubygem_id, as: :number + field :version_id, as: :number + field :downloads, as: :number + field :key, as: :text + field :bucket, as: :text + field :occurred_at, as: :date_time + # add fields here +end diff --git a/app/controllers/avo/downloads_controller.rb b/app/controllers/avo/downloads_controller.rb new file mode 100644 index 00000000000..fc3832ea2ce --- /dev/null +++ b/app/controllers/avo/downloads_controller.rb @@ -0,0 +1,4 @@ +# This controller has been generated to enable Rails' resource routes. +# More information on https://docs.avohq.io/2.0/controllers.html +class Avo::DownloadsController < Avo::ResourcesController +end diff --git a/app/jobs/fastly_log_processor.rb b/app/jobs/fastly_log_processor.rb index c633dca9af9..0572c4d0ef6 100644 --- a/app/jobs/fastly_log_processor.rb +++ b/app/jobs/fastly_log_processor.rb @@ -18,13 +18,18 @@ def perform log_ticket = LogTicket.pop(key: key, directory: bucket) if log_ticket.nil? StatsD.increment("fastly_log_processor.extra") + Rails.logger.debug { "No log ticket for key=#{key} directory=#{bucket}, ignoring" } return end - counts = download_counts(log_ticket) + counts_by_bucket = download_counts(log_ticket) + counts = counts_by_bucket.transform_values { |h| h.sum { |_, v| v } } StatsD.gauge("fastly_log_processor.processed_versions_count", counts.count) Rails.logger.info "Processed Fastly log counts: #{counts.inspect}" + # insert into timescale first since it is idempotent and GemDownload is not + record_downloads(counts_by_bucket, log_ticket.id) + processed_count = counts.sum { |_, v| v } ActiveRecord::Base.connection.transaction do GemDownload.bulk_update(counts) @@ -54,16 +59,60 @@ def download_counts(log_ticket) ok_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:ok] not_modified_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:not_modified] - file.each_line.with_object(Hash.new(0)) do |log_line, accum| - path, response_code = log_line.split[10, 2] + file.each_line.with_object(Hash.new { |h, k| h[k] = Hash.new(0) }) do |log_line, accum| + parts = log_line.split + path, response_code = parts[10, 2] case response_code.to_i # Only count successful downloads # NB: we consider a 304 response a download attempt when ok_status, not_modified_status if (match = PATH_PATTERN.match(path)) - accum[match[:path]] += 1 + timestamp = parts[0].sub!(/\A<\d+>/, "") + accum[match[:path]][truncated_timestamp(timestamp)] += 1 end end end end + + def truncated_timestamp(timestamp) + return unless timestamp + time = Time.iso8601(timestamp) + time.change(min: time.min / 15 * 15) + rescue Date::Error + nil + end + + VERSION_PLUCK_ID_LIMIT = 1000 + DOWNLOAD_UPSERT_LIMIT = 1000 + + def record_downloads(counts_by_bucket, log_ticket_id) + ids_by_full_name = counts_by_bucket.each_key.each_slice(VERSION_PLUCK_ID_LIMIT).each_with_object({}) do |full_names, hash| + Version.where(full_name: full_names).pluck(:full_name, :id, :rubygem_id).each { |full_name, *ids| hash[full_name] = ids } + end + + counts_by_bucket + .lazy + .flat_map do |path, buckets| + versions = ids_by_full_name[path] + if versions.nil? + Rails.logger.debug { "No version found for path=#{path}" } + next + end + + version_id, rubygem_id = versions + + buckets.map do |occurred_at, downloads| + { version_id:, rubygem_id:, occurred_at:, downloads:, log_ticket_id: } + end + end # rubocop:disable Style/MultilineBlockChain + .compact + .each_slice(DOWNLOAD_UPSERT_LIMIT) do |inserts| + next unless Download.hypertable? + Download.upsert_all( + inserts, + update_only: %i[downloads], + unique_by: %i[rubygem_id version_id occurred_at log_ticket_id] + ) + end + end end diff --git a/app/models/download.rb b/app/models/download.rb new file mode 100644 index 00000000000..e57fda667e6 --- /dev/null +++ b/app/models/download.rb @@ -0,0 +1,33 @@ +class Download < DownloadRecord + belongs_to :rubygem + belongs_to :version + + def self.suffix = nil + def self.time_period = nil + + def self.class_name_for(suffix:, time_period:) + raise ArgumentError if suffix && !time_period + "#{time_period.iso8601}#{"_#{suffix}" if suffix}".classify + end + + [15.minutes, 1.day, 1.month, 1.year].each do |time_period| + (%w[all_versions all_gems] << nil).each do |suffix| + table_name = "#{Download.table_name}_#{time_period.inspect.parameterize(separator: '_')}#{"_#{suffix}" if suffix}" + ::Download.class_eval(<<~RUBY, __FILE__, __LINE__ + 1) # rubocop:disable Style/DocumentDynamicEvalDefinition + class #{class_name_for(suffix:, time_period:)} < Download + attribute :downloads, :integer + + def readonly? + true + end + + def self.table_name = #{table_name.dump} + + def self.suffix = #{suffix&.dump || :nil} + + def self.time_period = #{time_period.inspect.parameterize(separator: '_').dump} + end + RUBY + end + end +end diff --git a/app/models/download_record.rb b/app/models/download_record.rb new file mode 100644 index 00000000000..bc168e13318 --- /dev/null +++ b/app/models/download_record.rb @@ -0,0 +1,7 @@ +class DownloadRecord < ApplicationRecord + include Timescaledb::Rails::Model + + self.abstract_class = true + + connects_to database: { writing: :downloads, reading: :downloads } +end diff --git a/config/database.yml.sample b/config/database.yml.sample index bfc241051bd..eed6ab5a86f 100644 --- a/config/database.yml.sample +++ b/config/database.yml.sample @@ -3,34 +3,70 @@ default: &default encoding: utf8 username: postgres -development: - <<: *default - database: rubygems_development - host: localhost - password: devpassword - pool: 5 - timeout: 5000 +timescale: ×cale + adapter: postgresql + encoding: utf8 + username: postgres + migrations_paths: db/downloads_migrate +development: + primary: + <<: *default + database: rubygems_development + host: localhost + password: devpassword + pool: 5 + timeout: 5000 + downloads: + <<: *timescale + database: rubygems_tsdb_development + host: localhost + port: 5434 + pool: 5 + timeout: 5000 test: - <<: *default - database: rubygems_test - host: localhost - min_messages: warning - password: testpassword - pool: 5 - timeout: 5000 + primary: + <<: *default + database: rubygems_test + host: localhost + min_messages: warning + password: testpassword + pool: 5 + timeout: 5000 + downloads: + <<: *timescale + database: rubygems_tsdb_test + host: localhost + min_messages: warning + port: 5434 + pool: 5 + timeout: 5000 staging: - <<: *default - database: rubygems_staging - min_messages: error - pool: 30 - reconnect: true + primary: + <<: *default + database: rubygems_staging + min_messages: error + pool: 30 + reconnect: true + downloads: + <<: *timescale + database: rubygems_tsdb_staging + min_messages: error + pool: 30 + reconnect: true production: - <<: *default - database: rubygems_production - min_messages: error - pool: 30 - reconnect: true + primary: + <<: *default + database: rubygems_production + min_messages: error + pool: 30 + reconnect: true + downloads: + <<: *timescale + database: rubygems_tsdb_production + min_messages: error + pool: 30 + reconnect: true diff --git a/db/downloads_migrate/20230308211125_create_downloads.rb b/db/downloads_migrate/20230308211125_create_downloads.rb new file mode 100644 index 00000000000..4e79c2618b9 --- /dev/null +++ b/db/downloads_migrate/20230308211125_create_downloads.rb @@ -0,0 +1,14 @@ +class CreateDownloads < ActiveRecord::Migration[7.0] + def change + create_table :downloads, id: false do |t| + t.integer :rubygem_id, null: false + t.integer :version_id, null: false + t.integer :downloads, null: false + t.integer :log_ticket_id, null: true + t.timestamptz :occurred_at, null: false + end + + add_index :downloads, [:rubygem_id, :version_id, :occurred_at, :log_ticket_id], unique: true, name: 'idx_downloads_by_version_log_ticket' + create_hypertable :downloads, :occurred_at, chunk_time_interval: '7 days', partitioning_column: 'rubygem_id' + end +end diff --git a/db/downloads_migrate/20230308224729_create_downloads_continuous_aggregates.rb b/db/downloads_migrate/20230308224729_create_downloads_continuous_aggregates.rb new file mode 100644 index 00000000000..e25d28964ef --- /dev/null +++ b/db/downloads_migrate/20230308224729_create_downloads_continuous_aggregates.rb @@ -0,0 +1,107 @@ +class CreateDownloadsContinuousAggregates < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + + def execute(...) + puts(...) + super + end + + def create_continuous_aggregate(name, ...) + execute "DROP MATERIALIZED VIEW IF EXISTS #{connection.quote_table_name(name)} CASCADE;" + super + rescue ActiveRecord::StatementInvalid => e + raise unless e.cause.is_a?(PG::DatetimeFieldOverflow) + say "WARNING: #{e}" + nil + end + + def continuous_aggregate( + duration:, + from: Download.table_name, + start_offset:, + end_offset:, + schedule_window:, + retention: start_offset + ) + name = "downloads_" + duration.inspect.parameterize(separator: '_') + start_offset ||= 20.years + + create_continuous_aggregate( + name, + Download. + time_bucket(duration.iso8601, select_alias: :occurred_at). + group(:rubygem_id, :version_id). + select(:rubygem_id, :version_id). + sum(:downloads, :downloads). + reorder(nil). + from(from). + to_sql + ) + add_continuous_aggregate_policy(name, start_offset&.iso8601, end_offset&.iso8601, schedule_window) + add_hypertable_retention_policy(name, retention.iso8601) if retention + + all_versions_name = name + "_all_versions" + create_continuous_aggregate( + all_versions_name, + Download. + time_bucket(duration.iso8601, select_alias: :occurred_at). + group(:rubygem_id). + select(:rubygem_id). + sum(:downloads, :downloads). + from(name). + to_sql + ) + add_continuous_aggregate_policy(all_versions_name, start_offset&.iso8601, end_offset&.iso8601, schedule_window) + add_hypertable_retention_policy(all_versions_name, retention.iso8601) if retention + + all_gems_name = name + "_all_gems" + create_continuous_aggregate( + all_gems_name, + Download. + time_bucket(duration.iso8601, select_alias: :occurred_at). + sum(:downloads, :downloads). + from(all_versions_name). + to_sql + ) + add_continuous_aggregate_policy(all_gems_name, start_offset&.iso8601, end_offset&.iso8601, schedule_window) + add_hypertable_retention_policy(all_gems_name, retention.iso8601) if retention + + name + end + + def change + ActiveRecord::Base.logger = Logger.new(STDERR) + from = continuous_aggregate( + duration: 15.minutes, + start_offset: 1.week, + end_offset: 1.hour, + schedule_window: 1.hour + ) + + from = continuous_aggregate( + duration: 1.day, + start_offset: 2.years, + end_offset: 1.day, + schedule_window: 12.hours, + from: + ) + + from = continuous_aggregate( + duration: 1.month, + start_offset: nil, + end_offset: 1.month, + schedule_window: 1.day, + retention: nil, + from: + ) + + from = continuous_aggregate( + duration: 1.year, + start_offset: nil, + end_offset: 1.year, + schedule_window: 1.month, + retention: nil, + from: + ) + end +end diff --git a/db/downloads_schema.rb b/db/downloads_schema.rb new file mode 100644 index 00000000000..f7ab246758b --- /dev/null +++ b/db/downloads_schema.rb @@ -0,0 +1,160 @@ +# This file is auto-generated from the current state of the database. Instead +# of editing this file, please use the migrations feature of Active Record to +# incrementally modify your database, and then regenerate this schema definition. +# +# This file is the source Rails uses to define your schema when running `bin/rails +# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to +# be faster and is potentially less error prone than running all of your +# migrations from scratch. Old migrations may fail to apply correctly if those +# migrations use external dependencies or application code. +# +# It's strongly recommended that you check this file into your version control system. + +ActiveRecord::Schema[7.0].define(version: 2023_03_08_224729) do + # These are extensions that must be enabled in order to support this database + enable_extension "pg_stat_statements" + enable_extension "plpgsql" + enable_extension "timescaledb" + enable_extension "timescaledb_toolkit" + + create_table "downloads", id: false, force: :cascade do |t| + t.integer "rubygem_id", null: false + t.integer "version_id", null: false + t.integer "downloads", null: false + t.integer "log_ticket_id" + t.timestamptz "occurred_at", null: false + t.index ["occurred_at"], name: "downloads_occurred_at_idx", order: :desc + t.index ["rubygem_id", "version_id", "occurred_at", "log_ticket_id"], name: "idx_downloads_by_version_log_ticket", unique: true + end + + create_hypertable "downloads", "occurred_at", chunk_time_interval: "7 days" + + create_continuous_aggregate "downloads_15_minutes", <<-SQL + SELECT time_bucket('PT15M'::interval, downloads.occurred_at) AS occurred_at, + downloads.rubygem_id, + downloads.version_id, + sum(downloads.downloads) AS downloads + FROM downloads + GROUP BY (time_bucket('PT15M'::interval, downloads.occurred_at)), downloads.rubygem_id, downloads.version_id; + SQL + + add_continuous_aggregate_policy "downloads_15_minutes", "1 week", "1 hour", "1 hour" + + create_continuous_aggregate "downloads_15_minutes_all_versions", <<-SQL + SELECT time_bucket('PT15M'::interval, downloads_15_minutes.occurred_at) AS occurred_at, + downloads_15_minutes.rubygem_id, + sum(downloads_15_minutes.downloads) AS downloads + FROM downloads_15_minutes + GROUP BY (time_bucket('PT15M'::interval, downloads_15_minutes.occurred_at)), downloads_15_minutes.rubygem_id + ORDER BY (time_bucket('PT15M'::interval, downloads_15_minutes.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_15_minutes_all_versions", "1 week", "1 hour", "1 hour" + + create_continuous_aggregate "downloads_15_minutes_all_gems", <<-SQL + SELECT time_bucket('PT15M'::interval, downloads_15_minutes_all_versions.occurred_at) AS occurred_at, + sum(downloads_15_minutes_all_versions.downloads) AS downloads + FROM downloads_15_minutes_all_versions + GROUP BY (time_bucket('PT15M'::interval, downloads_15_minutes_all_versions.occurred_at)) + ORDER BY (time_bucket('PT15M'::interval, downloads_15_minutes_all_versions.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_15_minutes_all_gems", "1 week", "1 hour", "1 hour" + + create_continuous_aggregate "downloads_1_day", <<-SQL + SELECT time_bucket('P1D'::interval, downloads_15_minutes.occurred_at) AS occurred_at, + downloads_15_minutes.rubygem_id, + downloads_15_minutes.version_id, + sum(downloads_15_minutes.downloads) AS downloads + FROM downloads_15_minutes + GROUP BY (time_bucket('P1D'::interval, downloads_15_minutes.occurred_at)), downloads_15_minutes.rubygem_id, downloads_15_minutes.version_id; + SQL + + add_continuous_aggregate_policy "downloads_1_day", "2 years", "1 day", "12 hours" + + create_continuous_aggregate "downloads_1_day_all_versions", <<-SQL + SELECT time_bucket('P1D'::interval, downloads_1_day.occurred_at) AS occurred_at, + downloads_1_day.rubygem_id, + sum(downloads_1_day.downloads) AS downloads + FROM downloads_1_day + GROUP BY (time_bucket('P1D'::interval, downloads_1_day.occurred_at)), downloads_1_day.rubygem_id + ORDER BY (time_bucket('P1D'::interval, downloads_1_day.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_day_all_versions", "2 years", "1 day", "12 hours" + + create_continuous_aggregate "downloads_1_day_all_gems", <<-SQL + SELECT time_bucket('P1D'::interval, downloads_1_day_all_versions.occurred_at) AS occurred_at, + sum(downloads_1_day_all_versions.downloads) AS downloads + FROM downloads_1_day_all_versions + GROUP BY (time_bucket('P1D'::interval, downloads_1_day_all_versions.occurred_at)) + ORDER BY (time_bucket('P1D'::interval, downloads_1_day_all_versions.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_day_all_gems", "2 years", "1 day", "12 hours" + + create_continuous_aggregate "downloads_1_month", <<-SQL + SELECT time_bucket('P1M'::interval, downloads_1_day.occurred_at) AS occurred_at, + downloads_1_day.rubygem_id, + downloads_1_day.version_id, + sum(downloads_1_day.downloads) AS downloads + FROM downloads_1_day + GROUP BY (time_bucket('P1M'::interval, downloads_1_day.occurred_at)), downloads_1_day.rubygem_id, downloads_1_day.version_id; + SQL + + add_continuous_aggregate_policy "downloads_1_month", "20 years", "1 month", "1 day" + + create_continuous_aggregate "downloads_1_month_all_versions", <<-SQL + SELECT time_bucket('P1M'::interval, downloads_1_month.occurred_at) AS occurred_at, + downloads_1_month.rubygem_id, + sum(downloads_1_month.downloads) AS downloads + FROM downloads_1_month + GROUP BY (time_bucket('P1M'::interval, downloads_1_month.occurred_at)), downloads_1_month.rubygem_id + ORDER BY (time_bucket('P1M'::interval, downloads_1_month.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_month_all_versions", "20 years", "1 month", "1 day" + + create_continuous_aggregate "downloads_1_month_all_gems", <<-SQL + SELECT time_bucket('P1M'::interval, downloads_1_month_all_versions.occurred_at) AS occurred_at, + sum(downloads_1_month_all_versions.downloads) AS downloads + FROM downloads_1_month_all_versions + GROUP BY (time_bucket('P1M'::interval, downloads_1_month_all_versions.occurred_at)) + ORDER BY (time_bucket('P1M'::interval, downloads_1_month_all_versions.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_month_all_gems", "20 years", "1 month", "1 day" + + create_continuous_aggregate "downloads_1_year", <<-SQL + SELECT time_bucket('P1Y'::interval, downloads_1_month.occurred_at) AS occurred_at, + downloads_1_month.rubygem_id, + downloads_1_month.version_id, + sum(downloads_1_month.downloads) AS downloads + FROM downloads_1_month + GROUP BY (time_bucket('P1Y'::interval, downloads_1_month.occurred_at)), downloads_1_month.rubygem_id, downloads_1_month.version_id; + SQL + + add_continuous_aggregate_policy "downloads_1_year", "20 years", "1 year", "1 month" + + create_continuous_aggregate "downloads_1_year_all_versions", <<-SQL + SELECT time_bucket('P1Y'::interval, downloads_1_year.occurred_at) AS occurred_at, + downloads_1_year.rubygem_id, + sum(downloads_1_year.downloads) AS downloads + FROM downloads_1_year + GROUP BY (time_bucket('P1Y'::interval, downloads_1_year.occurred_at)), downloads_1_year.rubygem_id + ORDER BY (time_bucket('P1Y'::interval, downloads_1_year.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_year_all_versions", "20 years", "1 year", "1 month" + + create_continuous_aggregate "downloads_1_year_all_gems", <<-SQL + SELECT time_bucket('P1Y'::interval, downloads_1_year_all_versions.occurred_at) AS occurred_at, + sum(downloads_1_year_all_versions.downloads) AS downloads + FROM downloads_1_year_all_versions + GROUP BY (time_bucket('P1Y'::interval, downloads_1_year_all_versions.occurred_at)) + ORDER BY (time_bucket('P1Y'::interval, downloads_1_year_all_versions.occurred_at)); + SQL + + add_continuous_aggregate_policy "downloads_1_year_all_gems", "20 years", "1 year", "1 month" + +end diff --git a/docker-compose.yml b/docker-compose.yml index 065a162e523..67079143f6e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,12 @@ services: - "5432:5432" environment: - POSTGRES_HOST_AUTH_METHOD=trust + timescale: + image: timescale/timescaledb-ha:pg15-ts2.10-latest + ports: + - "5434:5432" + environment: + - POSTGRES_HOST_AUTH_METHOD=trust cache: image: memcached:1.4.24 ports: diff --git a/test/factories.rb b/test/factories.rb index 25762e96d62..13b8ececd77 100644 --- a/test/factories.rb +++ b/test/factories.rb @@ -1,4 +1,12 @@ FactoryBot.define do + factory :download do + rubygem_id { 1 } + version_id { 1 } + downloads { 1 } + log_ticket_id { 1 } + occurred_at { "2023-03-08 13:11:25" } + end + factory :admin_github_user, class: "Admin::GitHubUser" do login { "jackson-keeling" } avatar_url { "MyString" } diff --git a/test/jobs/fastly_log_processor_job_test.rb b/test/jobs/fastly_log_processor_job_test.rb index 934900bd8ca..a7143d82bc5 100644 --- a/test/jobs/fastly_log_processor_job_test.rb +++ b/test/jobs/fastly_log_processor_job_test.rb @@ -7,11 +7,24 @@ class FastlyLogProcessorJobTest < ActiveJob::TestCase @sample_log = Rails.root.join("test", "sample_logs", "fastly-fake.log").read @sample_log_counts = { - "bundler-1.10.6" => 2, - "json-1.8.3-java" => 2, - "json-1.8.3" => 1, - "json-1.8.2" => 4, - "no-such-gem-1.2.3" => 1 + "bundler-1.10.6" => { + Time.utc(2015, 11, 30, 21, 0, 0) => 2 + }, + "json-1.8.3-java" => { + Time.utc(2015, 11, 30, 21, 0, 0) => 2 + }, + "json-1.8.3" => { + Time.utc(2015, 11, 30, 21, 0, 0) => 1 + }, + "json-1.8.2" => { + Time.utc(2015, 11, 29, 21, 0, 0) => 1, + Time.utc(2015, 11, 30, 21, 0, 0) => 1, + Time.utc(2015, 11, 30, 21, 30, 0) => 1, + Time.utc(2015, 12, 30, 21, 0, 0) => 1 + }, + "no-such-gem-1.2.3" => { + Time.utc(2015, 11, 30, 21, 0, 0) => 1 + } } @log_ticket = LogTicket.create!(backend: "s3", directory: "test-bucket", key: "fastly-fake.log", status: "pending") @@ -77,7 +90,7 @@ class FastlyLogProcessorJobTest < ActiveJob::TestCase version = Version.find_by(full_name: name) if version count = GemDownload.find_by(rubygem_id: version.rubygem.id, version_id: version.id).count - assert_equal expected_count, count, "invalid value for #{name}" + assert_equal expected_count.each_value.sum, count, "invalid value for #{name}" end end @@ -87,6 +100,71 @@ class FastlyLogProcessorJobTest < ActiveJob::TestCase assert_equal "processed", @log_ticket.reload.status end + should "update download counts in timescale" do + requires_timescale + + @job.perform_now + + downloads = Download.all.map do |download| + version = Version.find(download.version_id) + { version.full_name => { download.occurred_at.utc => download.downloads } } + end.reduce({}, &:deep_merge) + + assert_equal @sample_log_counts.except("no-such-gem-1.2.3"), downloads + + json = Rubygem.find_by_name("json") + assert_equal 7, es_downloads(json.id) + assert_equal "processed", @log_ticket.reload.status + end + + context "all the timescale subclasses" do + { + Download::P1D => [ + { rubygem: "bundler", version: "1.10.6", downloads: 2, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", version: "1.8.3", downloads: 2, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", version: "1.8.3", downloads: 1, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", version: "1.8.2", downloads: 1, occurred_at: "2015-11-29 00:00:00 UTC" }, + { rubygem: "json", version: "1.8.2", downloads: 2, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", version: "1.8.2", downloads: 1, occurred_at: "2015-12-30 00:00:00 UTC" } + ], + Download::P1DAllVersion => [ + { rubygem: "bundler", downloads: 2, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", downloads: 1, occurred_at: "2015-11-29 00:00:00 UTC" }, + { rubygem: "json", downloads: 5, occurred_at: "2015-11-30 00:00:00 UTC" }, + { rubygem: "json", downloads: 1, occurred_at: "2015-12-30 00:00:00 UTC" } + ], + Download::P1DAllGem => [ + { downloads: 1, occurred_at: "2015-11-29 00:00:00 UTC" }, + { downloads: 7, occurred_at: "2015-11-30 00:00:00 UTC" }, + { downloads: 1, occurred_at: "2015-12-30 00:00:00 UTC" } + ] + # Download::P1M => [], + # Download::P1MAllVersion => [], + # Download::P1MAllGem => [], + # Download::P1Y => [], + # Download::P1YAllVersion => [], + # Download::P1YAllGem => [] + }.each do |klass, expected| + should "return the correct objects for #{klass}" do + requires_timescale + + @job.perform_now + + includes = { # rubocop:disable Performance/CollectionLiteralInLoop + rubygem: :name, + version: :number + }.slice(*klass.columns_hash.keys.map { _1.sub(/_id/, "").to_sym }) + order = %w[rubygem_id version_id occurred_at] & klass.columns_hash.keys # rubocop:disable Performance/CollectionLiteralInLoop + actual = klass.order(order).includes(includes.keys) + actual = actual.map do |download| + includes.to_h { |k, v| [k, download.send(k).send(v)] } + .merge(downloads: download.downloads, occurred_at: download.occurred_at.to_s) + end + assert_equal expected, actual + end + end + end + should "not run if already processed" do json = Rubygem.find_by_name("json") assert_equal 0, json.downloads diff --git a/test/models/download_test.rb b/test/models/download_test.rb new file mode 100644 index 00000000000..7d8b29c4329 --- /dev/null +++ b/test/models/download_test.rb @@ -0,0 +1,7 @@ +require "test_helper" + +class DownloadTest < ActiveSupport::TestCase + # test "the truth" do + # assert true + # end +end diff --git a/test/sample_logs/fastly-fake.log b/test/sample_logs/fastly-fake.log index 6618a5f6210..4dfe8e01203 100644 --- a/test/sample_logs/fastly-fake.log +++ b/test/sample_logs/fastly-fake.log @@ -4,8 +4,8 @@ <134>2015-11-30T21:02:08Z cache-dfw1829 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:02:01 GMT /production.s3.rubygems.org/gems/json-1.8.3-java.gem 200 Ruby, RubyGems/2.1.9 universal-java-1.8 Ruby/1.9.3 (2015-01-30 patchlevel 551) jruby <134>2015-11-30T21:00:19Z cache-atl6227 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:11 GMT /production.s3.rubygems.org/gems/json-1.8.3.gem 200 Ruby, RubyGems/2.2.0 x86_64-linux Ruby/2.1.0 (2013-12-25 patchlevel 0) <134>2015-11-30T21:00:37Z cache-ams4136 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:36 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 200 Ruby, RubyGems/1.8.23 x86_64-linux Ruby/1.9.3 (2012-04-20 patchlevel 194) -<134>2015-11-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 200 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) -<134>2015-11-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 200 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) -<134>2015-11-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 304 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) +<134>2015-11-29T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 200 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) +<134>2015-12-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 200 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) +<134>2015-11-30T21:39:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 304 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) <134>2015-11-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/json-1.8.2.gem 404 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) <134>2015-11-30T21:00:51Z cache-nrt6130 downloads[191209]: 0.0.0.0 Mon, 30 Nov 2015 21:00:50 GMT /production.s3.rubygems.org/gems/no-such-gem-1.2.3.gem 200 Ruby, RubyGems/2.2.2 x86_64-linux Ruby/2.1.7 (2015-08-18 patchlevel 400) diff --git a/test/test_helper.rb b/test/test_helper.rb index 9fda07676bd..f0f1fe33c27 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -73,6 +73,13 @@ def requires_toxiproxy skip("Toxiproxy is not running, but was required for this test.") end + def requires_timescale + Download.hypertable? + rescue StandardError + raise "Timescale not running, but REQUIRE_TIMESCALE was set." if ENV["REQUIRE_TIMESCALE"] + skip("Timescale is not running, but was required for this test.") + end + def assert_changed(object, *attributes) original_attributes = attributes.index_with { |a| object.send(a) } yield if block_given?