Skip to content

Commit

Permalink
[WIP] Store downloads in timescale
Browse files Browse the repository at this point in the history
  • Loading branch information
segiddins committed Apr 3, 2023
1 parent d61c1d6 commit 6862f5c
Show file tree
Hide file tree
Showing 18 changed files with 582 additions and 37 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ gem "rotp", "~> 6.2"
gem "unpwn", "~> 1.0"
gem "webauthn", "~> 3.0"

gem "timescaledb-rails", git: "https://github.com/segiddins/timescaledb-rails", branch: "segiddins/support-for-timescale-in-other-database"

# Admin dashboard
gem "avo", "~> 2.28"
gem "pundit", "~> 2.3"
Expand Down
9 changes: 9 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
GIT
remote: https://github.com/segiddins/timescaledb-rails
revision: 758fdee08d2a2d5353efd8393e859c409e452a6a
branch: segiddins/support-for-timescale-in-other-database
specs:
timescaledb-rails (0.1.5)
rails (>= 6.0)

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -685,6 +693,7 @@ DEPENDENCIES
sprockets-rails (~> 3.4)
statsd-instrument (~> 3.5)
terser (~> 1.1)
timescaledb-rails!
toxiproxy (~> 2.0)
unpwn (~> 1.0)
validates_formatting_of (~> 0.9)
Expand Down
17 changes: 17 additions & 0 deletions app/avo/resources/download_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class DownloadResource < Avo::BaseResource
self.title = :id
self.includes = []
# self.search_query = -> do
# scope.ransack(id_eq: params[:q], m: "or").result(distinct: false)
# end

field :id, as: :id
# Fields generated from the model
field :rubygem_id, as: :number
field :version_id, as: :number
field :downloads, as: :number
field :key, as: :text
field :bucket, as: :text
field :occurred_at, as: :date_time
# add fields here
end
4 changes: 4 additions & 0 deletions app/controllers/avo/downloads_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/2.0/controllers.html
class Avo::DownloadsController < Avo::ResourcesController
end
57 changes: 53 additions & 4 deletions app/jobs/fastly_log_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ def perform
log_ticket = LogTicket.pop(key: key, directory: bucket)
if log_ticket.nil?
StatsD.increment("fastly_log_processor.extra")
Rails.logger.debug { "No log ticket for key=#{key} directory=#{bucket}, ignoring" }
return
end

counts = download_counts(log_ticket)
counts_by_bucket = download_counts(log_ticket)
counts = counts_by_bucket.transform_values { |h| h.sum { |_, v| v } }
StatsD.gauge("fastly_log_processor.processed_versions_count", counts.count)
Rails.logger.info "Processed Fastly log counts: #{counts.inspect}"

# insert into timescale first since it is idempotent and GemDownload is not
record_downloads(counts_by_bucket, log_ticket.id)

processed_count = counts.sum { |_, v| v }
GemDownload.for_all_gems.with_lock do
GemDownload.bulk_update(counts)
Expand Down Expand Up @@ -54,16 +59,60 @@ def download_counts(log_ticket)
ok_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:ok]
not_modified_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:not_modified]

file.each_line.with_object(Hash.new(0)) do |log_line, accum|
path, response_code = log_line.split[10, 2]
file.each_line.with_object(Hash.new { |h, k| h[k] = Hash.new(0) }) do |log_line, accum|
parts = log_line.split
path, response_code = parts[10, 2]
case response_code.to_i
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when ok_status, not_modified_status
if (match = PATH_PATTERN.match(path))
accum[match[:path]] += 1
timestamp = parts[0].sub!(/\A<\d+>/, "")
accum[match[:path]][truncated_timestamp(timestamp)] += 1
end
end
end
end

def truncated_timestamp(timestamp)
return unless timestamp
time = Time.iso8601(timestamp)
time.change(min: time.min / 15 * 15)
rescue Date::Error
nil
end

VERSION_PLUCK_ID_LIMIT = 1000
DOWNLOAD_UPSERT_LIMIT = 1000

def record_downloads(counts_by_bucket, log_ticket_id)
ids_by_full_name = counts_by_bucket.each_key.each_slice(VERSION_PLUCK_ID_LIMIT).with_object({}) do |full_names, hash|
Version.where(full_name: full_names).pluck(:full_name, :id, :rubygem_id).each { |full_name, *ids| hash[full_name] = ids }
end

counts_by_bucket
.lazy
.flat_map do |path, buckets|
versions = ids_by_full_name[path]
if versions.nil?
Rails.logger.debug { "No version found for path=#{path}" }
next
end

version_id, rubygem_id = versions

buckets.map do |occurred_at, downloads|
{ version_id:, rubygem_id:, occurred_at:, downloads:, log_ticket_id: }
end
end # rubocop:disable Style/MultilineBlockChain
.compact
.each_slice(DOWNLOAD_UPSERT_LIMIT) do |inserts|
# next unless Download.hypertable?
Download.upsert_all(
inserts,
update_only: %i[downloads],
unique_by: %i[rubygem_id version_id occurred_at log_ticket_id]
)
end
end
end
36 changes: 36 additions & 0 deletions app/models/download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
class Download < DownloadRecord
belongs_to :rubygem, validate: false, optional: true
belongs_to :version, validate: false, optional: true

validates :occurred_at, presence: true
validates :downloads, presence: true

def self.suffix = nil
def self.time_period = nil

def self.class_name_for(suffix:, time_period:)
raise ArgumentError if suffix && !time_period
"#{time_period.iso8601}#{"_#{suffix}" if suffix}".classify
end

[15.minutes, 1.day, 1.month, 1.year].each do |time_period|
(%w[all_versions all_gems] << nil).each do |suffix|
table_name = "#{Download.table_name}_#{time_period.inspect.parameterize(separator: '_')}#{"_#{suffix}" if suffix}"
::Download.class_eval(<<~RUBY, __FILE__, __LINE__ + 1) # rubocop:disable Style/DocumentDynamicEvalDefinition
class #{class_name_for(suffix:, time_period:)} < Download
attribute :downloads, :integer
def readonly?
true
end
def self.table_name = #{table_name.dump}
def self.suffix = #{suffix&.dump || :nil}
def self.time_period = #{time_period.inspect.parameterize(separator: '_').dump}
end
RUBY
end
end
end
7 changes: 7 additions & 0 deletions app/models/download_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class DownloadRecord < ApplicationRecord
include Timescaledb::Rails::Model

self.abstract_class = true

connects_to database: { writing: :downloads, reading: :downloads }
end
84 changes: 60 additions & 24 deletions config/database.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,70 @@ default: &default
encoding: utf8
username: postgres

development:
<<: *default
database: rubygems_development
host: localhost
password: devpassword
pool: 5
timeout: 5000
timescale: &timescale
adapter: postgresql
encoding: utf8
username: postgres
migrations_paths: db/downloads_migrate

development:
primary:
<<: *default
database: rubygems_development
host: localhost
password: devpassword
pool: 5
timeout: 5000
downloads:
<<: *timescale
database: rubygems_tsdb_development
host: localhost
port: 5434
pool: 5
timeout: 5000

test:
<<: *default
database: rubygems_test
host: localhost
min_messages: warning
password: testpassword
pool: 5
timeout: 5000
primary:
<<: *default
database: rubygems_test
host: localhost
min_messages: warning
password: testpassword
pool: 5
timeout: 5000
downloads:
<<: *timescale
database: rubygems_tsdb_test
host: localhost
min_messages: warning
port: 5434
pool: 5
timeout: 5000

staging:
<<: *default
database: rubygems_staging
min_messages: error
pool: 30
reconnect: true
primary:
<<: *default
database: rubygems_staging
min_messages: error
pool: 30
reconnect: true
downloads:
<<: *timescale
database: rubygems_tsdb_staging
min_messages: error
pool: 30
reconnect: true

production:
<<: *default
database: rubygems_production
min_messages: error
pool: 30
reconnect: true
primary:
<<: *default
database: rubygems_production
min_messages: error
pool: 30
reconnect: true
downloads:
<<: *timescale
database: rubygems_tsdb_production
min_messages: error
pool: 30
reconnect: true
3 changes: 3 additions & 0 deletions config/initializers/timescale.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Timescaledb::Rails::Railtie.configure do
config.record_base = "::DownloadRecord"
end
14 changes: 14 additions & 0 deletions db/downloads_migrate/20230308211125_create_downloads.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class CreateDownloads < ActiveRecord::Migration[7.0]
def change
create_table :downloads, id: false do |t|
t.integer :rubygem_id, null: false
t.integer :version_id, null: false
t.integer :downloads, null: false
t.integer :log_ticket_id, null: true
t.timestamptz :occurred_at, null: false
end

add_index :downloads, [:rubygem_id, :version_id, :occurred_at, :log_ticket_id], unique: true, name: 'idx_downloads_by_version_log_ticket'
create_hypertable :downloads, :occurred_at, chunk_time_interval: '7 days', partitioning_column: 'rubygem_id'
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
class CreateDownloadsContinuousAggregates < ActiveRecord::Migration[7.0]
disable_ddl_transaction!

def quote_table_name(...)
connection.quote_table_name(...)
end

def continuous_aggregate(
duration:,
from: Download.table_name,
start_offset:,
end_offset:,
schedule_window:,
retention: start_offset
)
name = "downloads_" + duration.inspect.parameterize(separator: '_')

create_continuous_aggregate(
name,
Download.
time_bucket(duration.iso8601, quote_table_name("#{from}.occurred_at"), select_alias: 'occurred_at').
group(:rubygem_id, :version_id).
select(:rubygem_id, :version_id).
sum(quote_table_name("#{from}.downloads"), :downloads).
reorder(nil).
from(from).
to_sql,
force: true
)
add_continuous_aggregate_policy(name, start_offset&.iso8601, end_offset&.iso8601, schedule_window)
add_hypertable_retention_policy(name, retention.iso8601) if retention

all_versions_name = name + "_all_versions"
create_continuous_aggregate(
all_versions_name,
Download.
time_bucket(duration.iso8601, quote_table_name("#{name}.occurred_at"), select_alias: 'occurred_at').
group(:rubygem_id).
select(:rubygem_id).
sum(quote_table_name("#{name}.downloads"), :downloads).
from(name).
to_sql,
force: true
)
add_continuous_aggregate_policy(all_versions_name, start_offset&.iso8601, end_offset&.iso8601, schedule_window)
add_hypertable_retention_policy(all_versions_name, retention.iso8601) if retention

all_gems_name = name + "_all_gems"
create_continuous_aggregate(
all_gems_name,
Download.
time_bucket(duration.iso8601, quote_table_name("#{all_versions_name}.occurred_at"), select_alias: 'occurred_at').
sum(quote_table_name("#{all_versions_name}.downloads"), :downloads).
from(all_versions_name).
to_sql,
force: true
)
add_continuous_aggregate_policy(all_gems_name, start_offset&.iso8601, end_offset&.iso8601, schedule_window)
add_hypertable_retention_policy(all_gems_name, retention.iso8601) if retention

name
end

def change
# https://github.com/timescale/timescaledb/issues/5474
Download.create!(version_id: 0, rubygem_id: 0, occurred_at: Time.at(0), downloads: 0)

from = continuous_aggregate(
duration: 15.minutes,
start_offset: 1.week,
end_offset: 1.hour,
schedule_window: 1.hour
)

from = continuous_aggregate(
duration: 1.day,
start_offset: 2.years,
end_offset: 1.day,
schedule_window: 12.hours,
from:
)

from = continuous_aggregate(
duration: 1.month,
start_offset: nil,
end_offset: 1.month,
schedule_window: 1.day,
retention: nil,
from:
)

from = continuous_aggregate(
duration: 1.year,
start_offset: nil,
end_offset: 1.year,
schedule_window: 1.month,
retention: nil,
from:
)
end
end
Loading

0 comments on commit 6862f5c

Please sign in to comment.