Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Store downloads in timescale #3560

Closed
wants to merge 12 commits into from
Closed
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Prepare environment
run: |
cp config/database.yml.sample config/database.yml
bundle exec rake db:setup
bundle exec rake db:create db:migrate
- name: Tests
run: bin/rails test:all
- name: Upload coverage to Codecov
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ RUN \
zstd-libs \
linux-headers \
zlib-dev \
tzdata
git \
tzdata

WORKDIR /app

Expand Down Expand Up @@ -79,7 +80,7 @@ RUN bundle config set --local without 'development test assets' && \


# Final stage for app image
FROM base
FROM base as app

RUN --mount=type=cache,id=dev-apk-cache,sharing=locked,target=/var/cache/apk \
--mount=type=cache,id=dev-apk-lib,sharing=locked,target=/var/lib/apk \
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ gem "rotp", "~> 6.2"
gem "unpwn", "~> 1.0"
gem "webauthn", "~> 3.0"

gem "timescaledb-rails", git: "https://github.com/segiddins/timescaledb-rails", branch: "segiddins/support-for-timescale-in-other-database"

# Admin dashboard
gem "avo", "~> 2.28"
gem "pundit", "~> 2.3"
Expand Down
11 changes: 10 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
GIT
remote: https://github.com/segiddins/timescaledb-rails
revision: fffb8e6d42b953537bf652995a5c27e881938762
branch: segiddins/support-for-timescale-in-other-database
specs:
timescaledb-rails (0.1.5)
rails (>= 6.0)

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -332,7 +340,7 @@ GEM
timeout
net-smtp (0.3.3)
net-protocol
nio4r (2.5.8)
nio4r (2.5.9)
nokogiri (1.14.2)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
Expand Down Expand Up @@ -685,6 +693,7 @@ DEPENDENCIES
sprockets-rails (~> 3.4)
statsd-instrument (~> 3.5)
terser (~> 1.1)
timescaledb-rails!
toxiproxy (~> 2.0)
unpwn (~> 1.0)
validates_formatting_of (~> 0.9)
Expand Down
30 changes: 30 additions & 0 deletions app/avo/resources/download_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class DownloadResource < Avo::BaseResource
self.title = :query
self.includes = []
# self.search_query = -> do
# scope.ransack(id_eq: params[:q], m: "or").result(distinct: false)
# end

self.find_record_method = lambda { |model_class:, id:, params:| # rubocop:disable Lint/UnusedBlockArgument
# In case of perfoming action `id` becomes an array of `ids`
split = lambda { |s|
parts = s.split("_")
raise ArgumentError unless parts.size == 4
{ rubygem_id: parts[0], version_id: parts[1], log_ticket_id: parts[2].presence, occurred_at: parts[3] }
}

if id.is_a?(Array)
id.reduce(model_class) { |a, e| a.or.where(split[e]) }
else
model_class.find_by!(split[id])
end
}

# Fields generated from the model
field :occurred_at, as: :date_time
field :rubygem, as: :belongs_to
field :version, as: :belongs_to
field :log_ticket, as: :belongs_to
field :downloads, as: :number
# add fields here
end
4 changes: 4 additions & 0 deletions app/controllers/avo/downloads_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This controller has been generated to enable Rails' resource routes.
# More information on https://docs.avohq.io/2.0/controllers.html
class Avo::DownloadsController < Avo::ResourcesController
end
57 changes: 53 additions & 4 deletions app/jobs/fastly_log_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ def perform
log_ticket = LogTicket.pop(key: key, directory: bucket)
if log_ticket.nil?
StatsD.increment("fastly_log_processor.extra")
Rails.logger.debug { "No log ticket for key=#{key} directory=#{bucket}, ignoring" }
return
end

counts = download_counts(log_ticket)
counts_by_bucket = download_counts(log_ticket)
counts = counts_by_bucket.transform_values { |h| h.sum { |_, v| v } }
StatsD.gauge("fastly_log_processor.processed_versions_count", counts.count)
Rails.logger.info "Processed Fastly log counts: #{counts.inspect}"

# insert into timescale first since it is idempotent and GemDownload is not
record_downloads(counts_by_bucket, log_ticket.id)

processed_count = counts.sum { |_, v| v }
GemDownload.for_all_gems.with_lock do
GemDownload.bulk_update(counts)
Expand Down Expand Up @@ -54,16 +59,60 @@ def download_counts(log_ticket)
ok_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:ok]
not_modified_status = Rack::Utils::SYMBOL_TO_STATUS_CODE[:not_modified]

file.each_line.with_object(Hash.new(0)) do |log_line, accum|
path, response_code = log_line.split[10, 2]
file.each_line.with_object(Hash.new { |h, k| h[k] = Hash.new(0) }) do |log_line, accum|
parts = log_line.split
path, response_code = parts[10, 2]
case response_code.to_i
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when ok_status, not_modified_status
if (match = PATH_PATTERN.match(path))
accum[match[:path]] += 1
timestamp = parts[0].sub!(/\A<\d+>/, "")
accum[match[:path]][truncated_timestamp(timestamp)] += 1
end
end
end
end

def truncated_timestamp(timestamp)
return unless timestamp
time = Time.iso8601(timestamp)
time.change(min: time.min / 15 * 15)
rescue Date::Error
nil
end

VERSION_PLUCK_ID_LIMIT = 1000
DOWNLOAD_UPSERT_LIMIT = 1000

def record_downloads(counts_by_bucket, log_ticket_id)
ids_by_full_name = counts_by_bucket.each_key.each_slice(VERSION_PLUCK_ID_LIMIT).with_object({}) do |full_names, hash|
Version.where(full_name: full_names).pluck(:full_name, :id, :rubygem_id).each { |full_name, *ids| hash[full_name] = ids }
end

counts_by_bucket
.lazy
.flat_map do |path, buckets|
versions = ids_by_full_name[path]
if versions.nil?
Rails.logger.debug { "No version found for path=#{path}" }
next
end

version_id, rubygem_id = versions

buckets.map do |occurred_at, downloads|
{ version_id:, rubygem_id:, occurred_at:, downloads:, log_ticket_id: }
end
end # rubocop:disable Style/MultilineBlockChain
.compact
.each_slice(DOWNLOAD_UPSERT_LIMIT) do |inserts|
# next unless Download.hypertable?
Download.upsert_all(
inserts,
update_only: %i[downloads],
unique_by: %i[rubygem_id version_id occurred_at log_ticket_id]
)
end
end
end
40 changes: 40 additions & 0 deletions app/models/download.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
class Download < DownloadRecord
def query = { rubygem_id:, version_id:, log_ticket_id:, occurred_at: occurred_at.iso8601 }
def id = query.values.join("_")

belongs_to :rubygem, validate: false, optional: true
belongs_to :version, validate: false, optional: true
belongs_to :log_ticket, validate: false, optional: true

validates :occurred_at, presence: true
validates :downloads, presence: true

def self.suffix = nil
def self.time_period = nil

def self.class_name_for(suffix:, time_period:)
raise ArgumentError if suffix && !time_period
[time_period.iso8601, suffix].compact.join("_").classify
end

[15.minutes, 1.day, 1.month, 1.year].each do |time_period|
(%w[all_versions all_gems] << nil).each do |suffix|
table_name = "#{Download.table_name}_#{time_period.inspect.parameterize(separator: '_')}#{"_#{suffix}" if suffix}"
::Download.class_eval(<<~RUBY, __FILE__, __LINE__ + 1) # rubocop:disable Style/DocumentDynamicEvalDefinition
class #{class_name_for(suffix:, time_period:)} < Download
attribute :downloads, :integer

def readonly?
true
end

def self.table_name = #{table_name.dump}

def self.suffix = #{suffix&.dump || :nil}

def self.time_period = #{time_period.inspect.parameterize(separator: '_').dump}
end
RUBY
end
end
end
7 changes: 7 additions & 0 deletions app/models/download_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class DownloadRecord < ApplicationRecord
include Timescaledb::Rails::Model

self.abstract_class = true

connects_to database: { writing: :downloads, reading: :downloads }
end
15 changes: 15 additions & 0 deletions app/policies/download_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class DownloadPolicy < ApplicationPolicy
class Scope < Scope
def resolve
scope.all
end
end

def avo_index?
true
end

def avo_show?
true
end
end
28 changes: 26 additions & 2 deletions config/brakeman.ignore
Original file line number Diff line number Diff line change
@@ -1,13 +1,37 @@
{
"ignored_warnings": [
{
"warning_type": "Dangerous Eval",
"warning_code": 13,
"fingerprint": "8c3ecf6e4ea1f5fdd60b0cf1ea5250b776da54c6083b88fa6b06ace80b667bac",
"check_name": "Evaluation",
"message": "User input in eval",
"file": "app/models/download.rb",
"line": 31,
"link": "https://brakemanscanner.org/docs/warning_types/dangerous_eval/",
"code": "::Download.class_eval(\"class #{class_name_for(:suffix => suffix, :time_period => time_period)} < Download\\n attribute :downloads, :integer\\n\\n def readonly?\\n true\\n end\\n\\n def self.table_name = #{\"#{Download.table_name}_#{time_period.inspect.parameterize(:separator => \"_\")}#{\"_#{suffix}\" if suffix}\".dump}\\n\\n def self.suffix = #{(suffix.dump or :nil)}\\n\\n def self.time_period = #{time_period.inspect.parameterize(:separator => \"_\").dump}\\nend\\n\", \"app/models/download.rb\", 24)",
"render_path": null,
"location": {
"type": "method",
"class": "Download",
"method": null
},
"user_input": "Download.table_name",
"confidence": "High",
"cwe_id": [
913,
95
],
"note": "Dyanmically creating a matrix of subclasses"
},
{
"warning_type": "Mass Assignment",
"warning_code": 105,
"fingerprint": "952a7d74123607aba495fea6b6bdb2009eebc024151ef3297547e9f2a690d0b8",
"check_name": "PermitAttributes",
"message": "Potentially dangerous key allowed for mass assignment",
"file": "app/controllers/api/v1/hook_relay_controller.rb",
"line": 23,
"line": 19,
"link": "https://brakemanscanner.org/docs/warning_types/mass_assignment/",
"code": "params.permit(:attempts, :account_id, :hook_id, :id, :max_attempts, :status, :stream, :failure_reason, :completed_at, :created_at, :request => ([:target_url]))",
"render_path": null,
Expand All @@ -24,6 +48,6 @@
"note": "account_id is used to validate that the request indeed comes from hook relay"
}
],
"updated": "2023-03-01 02:46:07 -0800",
"updated": "2023-04-02 21:48:50 -0700",
"brakeman_version": "5.4.1"
}
83 changes: 59 additions & 24 deletions config/database.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,69 @@ default: &default
encoding: utf8
username: postgres

development:
<<: *default
database: rubygems_development
host: localhost
password: devpassword
pool: 5
timeout: 5000
timescale: &timescale
adapter: postgresql
encoding: utf8
username: postgres
migrations_paths: db/downloads_migrate
port: 5434

development:
primary:
<<: *default
database: rubygems_development
host: localhost
password: devpassword
pool: 5
timeout: 5000
downloads:
<<: *timescale
database: rubygems_tsdb_development
host: localhost
pool: 5
timeout: 5000

test:
<<: *default
database: rubygems_test
host: localhost
min_messages: warning
password: testpassword
pool: 5
timeout: 5000
primary:
<<: *default
database: rubygems_test
host: localhost
min_messages: warning
password: testpassword
pool: 5
timeout: 5000
downloads:
<<: *timescale
database: rubygems_tsdb_test
host: localhost
min_messages: warning
pool: 5
timeout: 5000

staging:
<<: *default
database: rubygems_staging
min_messages: error
pool: 30
reconnect: true
primary:
<<: *default
database: rubygems_staging
min_messages: error
pool: 30
reconnect: true
downloads:
<<: *timescale
database: rubygems_tsdb_staging
min_messages: error
pool: 30
reconnect: true

production:
<<: *default
database: rubygems_production
min_messages: error
pool: 30
reconnect: true
primary:
<<: *default
database: rubygems_production
min_messages: error
pool: 30
reconnect: true
downloads:
<<: *timescale
database: rubygems_tsdb_production
min_messages: error
pool: 30
reconnect: true
5 changes: 5 additions & 0 deletions config/initializers/timescale.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Rails.application.configure do
config.after_initialize do
Timescaledb::Rails::ApplicationRecord.connects_to database: { writing: :downloads, reading: :downloads }
end
end
Loading