Skip to content

Commit

Permalink
feature: позволит не задавать increment_fields
Browse files Browse the repository at this point in the history
  • Loading branch information
vadshalamov committed Sep 5, 2016
1 parent 158cbb4 commit 0bf4850
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 85 deletions.
54 changes: 43 additions & 11 deletions lib/redis_counters/dumpers/destination.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,51 @@ def initialize(engine)
end

def merge
sql = generate_query
sql = model.send(:sanitize_sql, [sql, engine.common_params])
connection.execute sql
end

def_delegator :model, :connection
def_delegator :model, :quoted_table_name, :target_table
def_delegator :engine, :temp_table_name, :source_table

protected

def generate_query
target_fields = fields.join(', ')
temp_source = "_source_#{source_table}"

sql = <<-SQL
query = create_temp_table_query(temp_source)

if increment_fields.present?
query.concat(insert_with_update_query(temp_source, target_fields))
else
query.concat(insert_without_update_query(temp_source, target_fields))
end

query.concat(drop_temp_table_query(temp_source))
query
end

def create_temp_table_query(temp_source)
<<-SQL
CREATE TEMP TABLE #{temp_source} ON COMMIT DROP AS
SELECT #{selected_fields_expression}
FROM #{source_table}
#{source_conditions_expression}
#{group_by_expression};
SQL
end

def drop_temp_table_query(temp_source)
<<-SQL
DROP TABLE #{temp_source};
SQL
end

def insert_with_update_query(temp_source, target_fields)
<<-SQL
WITH
updated AS
(
Expand All @@ -103,19 +138,16 @@ def merge
WHERE #{matching_expression}
#{extra_conditions}
);
DROP TABLE #{temp_source};
SQL

sql = model.send(:sanitize_sql, [sql, engine.common_params])
connection.execute sql
end

def_delegator :model, :connection
def_delegator :model, :quoted_table_name, :target_table
def_delegator :engine, :temp_table_name, :source_table

protected
def insert_without_update_query(temp_source, target_fields)
<<-SQL
INSERT INTO #{target_table} (#{target_fields})
SELECT #{target_fields}
FROM #{temp_source} as source;
SQL
end

def selected_fields_expression
full_fields_map.map { |target_field, source_field| "#{source_field} as #{target_field}" }.join(', ')
Expand Down
3 changes: 3 additions & 0 deletions spec/internal/app/models/stat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class Stat < ActiveRecord::Base
store_accessor :params
end
7 changes: 7 additions & 0 deletions spec/internal/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
enable_extension :hstore
end

create_table :stats do |t|
t.integer :record_id, null: false
t.string :entity_type, null: false
t.datetime :date, null: false
t.hstore :params
end

create_table :stats_by_days do |t|
t.integer :record_id, null: false
t.integer :column_id, null: false
Expand Down
204 changes: 130 additions & 74 deletions spec/lib/redis_counters/dumpers/engine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,100 +63,156 @@
end

describe '#process!' do
before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')
context 'when increment_fields specified' do
before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')

params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)
params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)

dumper.process!(counter, date: prev_date)
dumper.process!(counter, date: prev_date)

counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')
counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')

dumper.process!(counter, date: date)
dumper.process!(counter, date: date)
end

Then { expect(StatsByDay.count).to eq 7 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

And { expect(StatsTotal.count).to eq 4 }
And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
And { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 }
And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

And { expect(StatsAggTotal.count).to eq 3 }
And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 }
And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }

context 'with source conditions' do
let(:dumper) do
RedisCounters::Dumpers::Engine.build do
name :stats_totals
fields record_id: :integer,
column_id: :integer,
value: :integer,
date: :date

destination do
model StatsByDay
take :record_id, :column_id, :hits, :date
key_fields :record_id, :column_id, :date
increment_fields :hits
map :hits, to: :value
condition 'target.date = :date'
source_condition 'column_id = 100'
end

destination do
model StatsTotal
take :record_id, :column_id, :hits
key_fields :record_id, :column_id
increment_fields :hits
map :hits, to: :value
source_condition 'column_id = 100'
end

destination do
model StatsAggTotal
take :record_id, :hits
key_fields :record_id
increment_fields :hits
map :hits, to: 'sum(value)'
group_by :record_id
source_condition 'column_id = 100'
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
end
end
end

Then { expect(StatsByDay.count).to eq 4 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

And { expect(StatsTotal.count).to eq 2 }
And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

And { expect(StatsAggTotal.count).to eq 2 }
And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 }
And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
end
end

Then { expect(StatsByDay.count).to eq 7 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

And { expect(StatsTotal.count).to eq 4 }
And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
And { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 }
And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

And { expect(StatsAggTotal.count).to eq 3 }
And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 }
And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }

context 'with source conditions' do
context 'when increment_fields not specified' do
let(:dumper) do
RedisCounters::Dumpers::Engine.build do
name :stats_totals
name :stats
fields record_id: :integer,
column_id: :integer,
value: :integer,
date: :date
entity_type: :string,
date: :timestamp,
params: :hstore

destination do
model StatsByDay
take :record_id, :column_id, :hits, :date
key_fields :record_id, :column_id, :date
increment_fields :hits
map :hits, to: :value
condition 'target.date = :date'
source_condition 'column_id = 100'
end

destination do
model StatsTotal
take :record_id, :column_id, :hits
key_fields :record_id, :column_id
increment_fields :hits
map :hits, to: :value
source_condition 'column_id = 100'
end

destination do
model StatsAggTotal
take :record_id, :hits
key_fields :record_id
increment_fields :hits
map :hits, to: 'sum(value)'
group_by :record_id
source_condition 'column_id = 100'
model Stat
take :record_id, :entity_type, :date, :params
key_fields :record_id, :entity_type, :date, :params
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
dumper.common_params = {entity_type: dumper.args[:entity_type]}
end
end
end

Then { expect(StatsByDay.count).to eq 4 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }
let(:counter) do
RedisCounters.create_counter(
Redis.current,
counter_class: RedisCounters::HashCounter,
counter_name: :all_stats,
value_delimiter: ';',
group_keys: [:date, :record_id, :params],
partition_keys: [:entity_type]
)
end

And { expect(StatsTotal.count).to eq 2 }
And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }
let(:date) { Time.now.utc }

And { expect(StatsAggTotal.count).to eq 2 }
And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 }
And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
before do
counter.increment(entity_type: 'Type1', date: date, record_id: 1, params: '')
counter.increment(entity_type: 'Type2', date: date, record_id: 1, params: '')
counter.increment(entity_type: 'Type1', date: date - 1.minute, record_id: 1, params: '')
counter.increment(entity_type: 'Type1', date: date - 10.minutes, record_id: 1, params: '')
counter.increment(entity_type: 'Type1', date: date, record_id: 2, params: '')

params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(entity_type: 'Type1', date: date, record_id: 3, params: params)

dumper.process!(counter, entity_type: 'Type1')
dumper.process!(counter, entity_type: 'Type2')
end

Then { expect(Stat.count).to eq 6 }
And { expect(Stat.where(entity_type: 'Type1').count).to eq 5 }
And { expect(Stat.where(entity_type: 'Type2').count).to eq 1 }
And { expect(Stat.where(record_id: 3, entity_type: 'Type1').first.params).to eq("a" => "1") }
end
end
end

0 comments on commit 0bf4850

Please sign in to comment.