Skip to content

Commit

Permalink
Merge pull request #6 from vadshalamov/master
Browse files Browse the repository at this point in the history
feature: набор аргументов для дампа и hstore
  • Loading branch information
Napolskih authored Sep 1, 2016
2 parents 9fec297 + b5c9bd9 commit 158cbb4
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 39 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ source 'https://rubygems.org'

group :development, :test do
gem 'combustion', github: 'pat/combustion', ref: '7d0d24c3f36ce0eb336177fc493be0721bc26665'
gem 'activerecord-postgres-hstore', require: false
gem 'simple_hstore_accessor', '~> 0.2', require: false
end

gem 'rack', '< 2' if RUBY_VERSION < '2.2.0'
Expand Down
29 changes: 11 additions & 18 deletions lib/redis_counters/dumpers/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ module Dumpers
# end
# end
#
# dumper.process!(counter, Date.yesterday)
# dumper.process!(counter, date: Date.yesterday)
#
# В результате все данные счетчика за вчера, будут
# смерджены в целевые таблицы, по ключевым полям: company_id и date,
Expand Down Expand Up @@ -100,7 +100,7 @@ class Engine
attr_accessor :common_params

attr_reader :counter
attr_reader :date
attr_reader :args

# callbacks

Expand All @@ -127,12 +127,13 @@ class Engine
# Public: Производит перенос данных счетчика.
#
# counter - экземпляр счетчика.
# date - Date - дата, за которую производится перенос данных.
# args - Hash - набор аргументов(кластер и/или партиции) для переноса данных.
#
# Returns Fixnum - кол-во обработанных строк.
#
def process!(counter, date)
@counter, @date = counter, date
def process!(counter, args = {})
@counter = counter
@args = args

db_transaction do
merge_data
Expand Down Expand Up @@ -185,7 +186,7 @@ def merge_data
end

def fill_temp_table
@rows_processed = counter.data(:date => formatted_date) do |batch|
@rows_processed = counter.data(args) do |batch|
@current_batch = batch
prepare_batch
insert_batch
Expand Down Expand Up @@ -225,8 +226,8 @@ def batch_data

def delete_from_redis
redis_session.pipelined do |redis|
counter.partitions(:date => formatted_date).each do |partition|
counter.delete_partition_direct!(partition, redis)
counter.partitions(args).each do |partition|
counter.delete_partition_direct!(args.merge(partition), redis)
end
end

Expand Down Expand Up @@ -265,12 +266,8 @@ def columns_definition
'character varying(4000)'
when :integer, :serial, :number
'integer'
when :date
'date'
when :timestamp
'timestamp'
when :boolean
'boolean'
when :date, :timestamp, :boolean, :hstore
type.to_s
else
if type.is_a?(Array) && type.first == :enum
type.last.fetch(:name)
Expand All @@ -283,10 +280,6 @@ def columns_definition
end.join(',')
end

def formatted_date
date.strftime(DATE_FORMAT)
end

def db_connection
destinations.first.connection
end
Expand Down
1 change: 1 addition & 0 deletions spec/internal/app/models/stats_by_day.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
class StatsByDay < ActiveRecord::Base
store_accessor :params
end
1 change: 1 addition & 0 deletions spec/internal/app/models/stats_total.rb
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
class StatsTotal < ActiveRecord::Base
store_accessor :params
end
10 changes: 10 additions & 0 deletions spec/internal/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
CREATE TYPE subject_types AS ENUM ('');
SQL

if ::ActiveRecord::VERSION::MAJOR < 4
execute <<-SQL
CREATE EXTENSION IF NOT EXISTS hstore;
SQL
else
enable_extension :hstore
end

create_table :stats_by_days do |t|
t.integer :record_id, null: false
t.integer :column_id, null: false
t.date :date, null: false
t.integer :hits, null: false, default: 0
t.column :subject, :subject_types
t.hstore :params
end

add_index :stats_by_days, [:record_id, :column_id, :date], unique: true
Expand All @@ -18,6 +27,7 @@
t.integer :column_id, null: false
t.integer :hits, null: false, default: 0
t.column :subject, :subject_types
t.hstore :params
end

add_index :stats_totals, [:record_id, :column_id], unique: true
Expand Down
47 changes: 26 additions & 21 deletions spec/lib/redis_counters/dumpers/engine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@
column_id: :integer,
value: :integer,
date: :date,
subject: [:enum, {name: :subject_types}]
subject: [:enum, {name: :subject_types}],
params: :hstore

destination do
model StatsByDay
take :record_id, :column_id, :hits, :date
key_fields :record_id, :column_id, :date
take :record_id, :column_id, :hits, :date, :params
key_fields :record_id, :column_id, :date, :params
increment_fields :hits
map :hits, to: :value
condition 'target.date = :date'
end

destination do
model StatsTotal
take :record_id, :column_id, :hits
key_fields :record_id, :column_id
take :record_id, :column_id, :hits, :params
key_fields :record_id, :column_id, :params
increment_fields :hits
map :hits, to: :value
end
Expand All @@ -37,7 +38,7 @@
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.date.strftime('%Y-%m-%d')}
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
end
end
end
Expand All @@ -52,7 +53,7 @@
RedisCounters.create_counter(Redis.current,
counter_class: RedisCounters::HashCounter,
counter_name: :record_hits_by_day,
group_keys: [:record_id, :column_id, :subject],
group_keys: [:record_id, :column_id, :subject, :params],
partition_keys: [:date]
)
end
Expand All @@ -63,35 +64,39 @@

describe '#process!' do
before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil)
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')

dumper.process!(counter, prev_date)
params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)

counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil)
dumper.process!(counter, date: prev_date)

dumper.process!(counter, date)
counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')

dumper.process!(counter, date: date)
end

Then { expect(StatsByDay.count).to eq 6 }
Then { expect(StatsByDay.count).to eq 7 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

And { expect(StatsTotal.count).to eq 3 }
And { expect(StatsTotal.count).to eq 4 }
And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
And { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 }
And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

And { expect(StatsAggTotal.count).to eq 2 }
And { expect(StatsAggTotal.count).to eq 3 }
And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 }
And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }

Expand Down Expand Up @@ -134,7 +139,7 @@
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.date.strftime('%Y-%m-%d')}
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
end
end
end
Expand Down
5 changes: 5 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
require 'bundler/setup'
require 'redis_counters/dumpers'

if ::ActiveRecord::VERSION::MAJOR < 4
require 'activerecord-postgres-hstore'
require 'simple_hstore_accessor'
end

require 'combustion'
Combustion.initialize! :active_record

Expand Down

0 comments on commit 158cbb4

Please sign in to comment.