From 0bf4850e6499849f42478b232ec1e25bc99af838 Mon Sep 17 00:00:00 2001 From: vadshalamov Date: Mon, 5 Sep 2016 16:10:37 +0500 Subject: [PATCH] =?UTF-8?q?feature:=20=D0=BF=D0=BE=D0=B7=D0=B2=D0=BE=D0=BB?= =?UTF-8?q?=D0=B8=D1=82=20=D0=BD=D0=B5=20=D0=B7=D0=B0=D0=B4=D0=B0=D0=B2?= =?UTF-8?q?=D0=B0=D1=82=D1=8C=20increment=5Ffields?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/redis_counters/dumpers/destination.rb | 54 ++++- spec/internal/app/models/stat.rb | 3 + spec/internal/db/schema.rb | 7 + .../lib/redis_counters/dumpers/engine_spec.rb | 204 +++++++++++------- 4 files changed, 183 insertions(+), 85 deletions(-) create mode 100644 spec/internal/app/models/stat.rb diff --git a/lib/redis_counters/dumpers/destination.rb b/lib/redis_counters/dumpers/destination.rb index 44b2c97..9ae94cf 100644 --- a/lib/redis_counters/dumpers/destination.rb +++ b/lib/redis_counters/dumpers/destination.rb @@ -73,16 +73,51 @@ def initialize(engine) end def merge + sql = generate_query + sql = model.send(:sanitize_sql, [sql, engine.common_params]) + connection.execute sql + end + + def_delegator :model, :connection + def_delegator :model, :quoted_table_name, :target_table + def_delegator :engine, :temp_table_name, :source_table + + protected + + def generate_query target_fields = fields.join(', ') temp_source = "_source_#{source_table}" - sql = <<-SQL + query = create_temp_table_query(temp_source) + + if increment_fields.present? + query.concat(insert_with_update_query(temp_source, target_fields)) + else + query.concat(insert_without_update_query(temp_source, target_fields)) + end + + query.concat(drop_temp_table_query(temp_source)) + query + end + + def create_temp_table_query(temp_source) + <<-SQL CREATE TEMP TABLE #{temp_source} ON COMMIT DROP AS SELECT #{selected_fields_expression} FROM #{source_table} #{source_conditions_expression} #{group_by_expression}; + SQL + end + + def drop_temp_table_query(temp_source) + <<-SQL + DROP TABLE #{temp_source}; + SQL + end + def insert_with_update_query(temp_source, target_fields) + <<-SQL WITH updated AS ( @@ -103,19 +138,16 @@ def merge WHERE #{matching_expression} #{extra_conditions} ); - - DROP TABLE #{temp_source}; SQL - - sql = model.send(:sanitize_sql, [sql, engine.common_params]) - connection.execute sql end - def_delegator :model, :connection - def_delegator :model, :quoted_table_name, :target_table - def_delegator :engine, :temp_table_name, :source_table - - protected + def insert_without_update_query(temp_source, target_fields) + <<-SQL + INSERT INTO #{target_table} (#{target_fields}) + SELECT #{target_fields} + FROM #{temp_source} as source; + SQL + end def selected_fields_expression full_fields_map.map { |target_field, source_field| "#{source_field} as #{target_field}" }.join(', ') diff --git a/spec/internal/app/models/stat.rb b/spec/internal/app/models/stat.rb new file mode 100644 index 0000000..fad05c1 --- /dev/null +++ b/spec/internal/app/models/stat.rb @@ -0,0 +1,3 @@ +class Stat < ActiveRecord::Base + store_accessor :params +end diff --git a/spec/internal/db/schema.rb b/spec/internal/db/schema.rb index 6549291..023991d 100644 --- a/spec/internal/db/schema.rb +++ b/spec/internal/db/schema.rb @@ -11,6 +11,13 @@ enable_extension :hstore end + create_table :stats do |t| + t.integer :record_id, null: false + t.string :entity_type, null: false + t.datetime :date, null: false + t.hstore :params + end + create_table :stats_by_days do |t| t.integer :record_id, null: false t.integer :column_id, null: false diff --git a/spec/lib/redis_counters/dumpers/engine_spec.rb b/spec/lib/redis_counters/dumpers/engine_spec.rb index d3c791d..e31ae20 100644 --- a/spec/lib/redis_counters/dumpers/engine_spec.rb +++ b/spec/lib/redis_counters/dumpers/engine_spec.rb @@ -63,100 +63,156 @@ end describe '#process!' do - before do - counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '') + context 'when increment_fields specified' do + before do + counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '') - params = {a: 1}.stringify_keys.to_s[1..-2] - counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params) + params = {a: 1}.stringify_keys.to_s[1..-2] + counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params) - dumper.process!(counter, date: prev_date) + dumper.process!(counter, date: prev_date) - counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '') - counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '') + counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '') - dumper.process!(counter, date: date) + dumper.process!(counter, date: date) + end + + Then { expect(StatsByDay.count).to eq 7 } + And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 } + And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") } + And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 } + And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + + And { expect(StatsTotal.count).to eq 4 } + And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } + And { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 } + And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + + And { expect(StatsAggTotal.count).to eq 3 } + And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 } + And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + + context 'with source conditions' do + let(:dumper) do + RedisCounters::Dumpers::Engine.build do + name :stats_totals + fields record_id: :integer, + column_id: :integer, + value: :integer, + date: :date + + destination do + model StatsByDay + take :record_id, :column_id, :hits, :date + key_fields :record_id, :column_id, :date + increment_fields :hits + map :hits, to: :value + condition 'target.date = :date' + source_condition 'column_id = 100' + end + + destination do + model StatsTotal + take :record_id, :column_id, :hits + key_fields :record_id, :column_id + increment_fields :hits + map :hits, to: :value + source_condition 'column_id = 100' + end + + destination do + model StatsAggTotal + take :record_id, :hits + key_fields :record_id + increment_fields :hits + map :hits, to: 'sum(value)' + group_by :record_id + source_condition 'column_id = 100' + end + + on_before_merge do |dumper, _connection| + dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')} + end + end + end + + Then { expect(StatsByDay.count).to eq 4 } + And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } + And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + + And { expect(StatsTotal.count).to eq 2 } + And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } + And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + + And { expect(StatsAggTotal.count).to eq 2 } + And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 } + And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + end end - Then { expect(StatsByDay.count).to eq 7 } - And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 } - And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") } - And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 } - And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } - - And { expect(StatsTotal.count).to eq 4 } - And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } - And { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 } - And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } - - And { expect(StatsAggTotal.count).to eq 3 } - And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 } - And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } - - context 'with source conditions' do + context 'when increment_fields not specified' do let(:dumper) do RedisCounters::Dumpers::Engine.build do - name :stats_totals + name :stats fields record_id: :integer, - column_id: :integer, - value: :integer, - date: :date + entity_type: :string, + date: :timestamp, + params: :hstore destination do - model StatsByDay - take :record_id, :column_id, :hits, :date - key_fields :record_id, :column_id, :date - increment_fields :hits - map :hits, to: :value - condition 'target.date = :date' - source_condition 'column_id = 100' - end - - destination do - model StatsTotal - take :record_id, :column_id, :hits - key_fields :record_id, :column_id - increment_fields :hits - map :hits, to: :value - source_condition 'column_id = 100' - end - - destination do - model StatsAggTotal - take :record_id, :hits - key_fields :record_id - increment_fields :hits - map :hits, to: 'sum(value)' - group_by :record_id - source_condition 'column_id = 100' + model Stat + take :record_id, :entity_type, :date, :params + key_fields :record_id, :entity_type, :date, :params end on_before_merge do |dumper, _connection| - dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')} + dumper.common_params = {entity_type: dumper.args[:entity_type]} end end end - Then { expect(StatsByDay.count).to eq 4 } - And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } - And { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + let(:counter) do + RedisCounters.create_counter( + Redis.current, + counter_class: RedisCounters::HashCounter, + counter_name: :all_stats, + value_delimiter: ';', + group_keys: [:date, :record_id, :params], + partition_keys: [:entity_type] + ) + end - And { expect(StatsTotal.count).to eq 2 } - And { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } - And { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + let(:date) { Time.now.utc } - And { expect(StatsAggTotal.count).to eq 2 } - And { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 } - And { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + before do + counter.increment(entity_type: 'Type1', date: date, record_id: 1, params: '') + counter.increment(entity_type: 'Type2', date: date, record_id: 1, params: '') + counter.increment(entity_type: 'Type1', date: date - 1.minute, record_id: 1, params: '') + counter.increment(entity_type: 'Type1', date: date - 10.minutes, record_id: 1, params: '') + counter.increment(entity_type: 'Type1', date: date, record_id: 2, params: '') + + params = {a: 1}.stringify_keys.to_s[1..-2] + counter.increment(entity_type: 'Type1', date: date, record_id: 3, params: params) + + dumper.process!(counter, entity_type: 'Type1') + dumper.process!(counter, entity_type: 'Type2') + end + + Then { expect(Stat.count).to eq 6 } + And { expect(Stat.where(entity_type: 'Type1').count).to eq 5 } + And { expect(Stat.where(entity_type: 'Type2').count).to eq 1 } + And { expect(Stat.where(record_id: 3, entity_type: 'Type1').first.params).to eq("a" => "1") } end end end