From b197e2ac7c8493fb858b6717d96123544f2d9de0 Mon Sep 17 00:00:00 2001 From: Oleg Perminov Date: Thu, 13 Dec 2018 18:56:29 +0500 Subject: [PATCH] feature: add flexibility to persistent types https://jira.railsc.ru/browse/CRM-4532 --- Gemfile | 1 + lib/redis_counters/dumpers/destination.rb | 20 +- lib/redis_counters/dumpers/dsl/destination.rb | 1 + redis_counters-dumpers.gemspec | 1 - spec/internal/app/models/realtime_stat.rb | 2 + spec/internal/db/schema.rb | 8 + .../lib/redis_counters/dumpers/engine_spec.rb | 231 +++++++++++++----- 7 files changed, 196 insertions(+), 68 deletions(-) create mode 100644 spec/internal/app/models/realtime_stat.rb diff --git a/Gemfile b/Gemfile index 09d4e4d..5d9e39c 100644 --- a/Gemfile +++ b/Gemfile @@ -2,5 +2,6 @@ source 'https://rubygems.org' gem 'activerecord-postgres-hstore', require: false gem 'simple_hstore_accessor', '~> 0.2', require: false +gem 'pg', '~> 0.11' gemspec diff --git a/lib/redis_counters/dumpers/destination.rb b/lib/redis_counters/dumpers/destination.rb index 34d2a60..c479cb3 100644 --- a/lib/redis_counters/dumpers/destination.rb +++ b/lib/redis_counters/dumpers/destination.rb @@ -18,6 +18,8 @@ class Destination extend Forwardable include ::RedisCounters::Dumpers::Dsl::Destination + VALUE_DELIMITER = ','.freeze + # Ссылка на родительский движек - дампер. attr_accessor :engine @@ -76,6 +78,9 @@ class Destination # Returns String attr_accessor :matching_expr + # Разделитель значений, String. + attr_accessor :value_delimiter + def initialize(engine) @engine = engine @fields_map = HashWithIndifferentAccess.new @@ -174,7 +179,16 @@ def full_fields_map end def updating_expression - increment_fields.map { |field| "#{field} = COALESCE(target.#{field}, 0) + source.#{field}" }.join(', ') + increment_fields.map do |field| + case model.columns_hash[field.to_s].type + when :datetime, :date + "#{field} = source.#{field}" + when :string + "#{field} = array_to_string(ARRAY[source.#{field}, target.#{field}], '#{delimiter}')" + else + "#{field} = COALESCE(target.#{field}, 0) + source.#{field}" + end + end.join(', ') end def matching_expression @@ -197,6 +211,10 @@ def source_conditions_expression "WHERE #{source_conditions.map { |source_condition| "(#{source_condition})" }.join(' AND ')}" end + + def delimiter + value_delimiter || VALUE_DELIMITER + end end end end diff --git a/lib/redis_counters/dumpers/dsl/destination.rb b/lib/redis_counters/dumpers/dsl/destination.rb index e6c6232..2304be2 100644 --- a/lib/redis_counters/dumpers/dsl/destination.rb +++ b/lib/redis_counters/dumpers/dsl/destination.rb @@ -13,6 +13,7 @@ class Configuration < ::RedisCounters::Dumpers::Dsl::Base setter :model setter :matching_expr + setter :value_delimiter varags_setter :fields varags_setter :key_fields diff --git a/redis_counters-dumpers.gemspec b/redis_counters-dumpers.gemspec index 02031b3..b0b6fab 100644 --- a/redis_counters-dumpers.gemspec +++ b/redis_counters-dumpers.gemspec @@ -18,7 +18,6 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'activesupport', '>= 3.0', '< 5' spec.add_runtime_dependency 'activerecord', '>= 3.0' - spec.add_runtime_dependency 'pg' spec.add_runtime_dependency 'redis', '>= 3.0' spec.add_runtime_dependency 'redis-namespace', '>= 1.3' spec.add_runtime_dependency 'callbacks_rb', '>= 0.0.1' diff --git a/spec/internal/app/models/realtime_stat.rb b/spec/internal/app/models/realtime_stat.rb new file mode 100644 index 0000000..1a5d2aa --- /dev/null +++ b/spec/internal/app/models/realtime_stat.rb @@ -0,0 +1,2 @@ +class RealtimeStat < ActiveRecord::Base +end diff --git a/spec/internal/db/schema.rb b/spec/internal/db/schema.rb index d6d06ec..3070a96 100644 --- a/spec/internal/db/schema.rb +++ b/spec/internal/db/schema.rb @@ -52,4 +52,12 @@ t.integer :value, null: false, default: 0 t.string :payload end + + create_table :realtime_stats do |t| + t.integer :record_id, null: false + t.integer :column_id, null: false + t.integer :hits, null: false, default: 0 + t.timestamp :date + t.string :params + end end diff --git a/spec/lib/redis_counters/dumpers/engine_spec.rb b/spec/lib/redis_counters/dumpers/engine_spec.rb index 308b0e8..7daca1b 100644 --- a/spec/lib/redis_counters/dumpers/engine_spec.rb +++ b/spec/lib/redis_counters/dumpers/engine_spec.rb @@ -65,100 +65,199 @@ describe '#process!' do context 'when increment_fields specified' do - before do - counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '') + context 'without source conditions' do + before do + counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '') - params = {a: 1}.stringify_keys.to_s[1..-2] - counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params) + params = {a: 1}.stringify_keys.to_s[1..-2] + counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params) - dumper.process!(counter, date: prev_date) + dumper.process!(counter, date: prev_date) - counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '') - counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') - counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '') + counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '') - dumper.process!(counter, date: date) - end + dumper.process!(counter, date: date) + end - it { expect(StatsByDay.count).to eq 7 } - it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 } - it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") } - it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 } - it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + it { expect(StatsByDay.count).to eq 7 } + it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 } + it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") } + it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 } + it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } - it { expect(StatsTotal.count).to eq 4 } - it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } - it { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 } - it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + it { expect(StatsTotal.count).to eq 4 } + it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } + it { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 } + it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } - it { expect(StatsAggTotal.count).to eq 3 } - it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 } - it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + it { expect(StatsAggTotal.count).to eq 3 } + it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 } + it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + end context 'with source conditions' do + context 'when incremented field class is integer' do + let(:dumper) do + RedisCounters::Dumpers::Engine.build do + name :stats_totals + fields record_id: :integer, + column_id: :integer, + value: :integer, + date: :date + + destination do + model StatsByDay + take :record_id, :column_id, :hits, :date + key_fields :record_id, :column_id, :date + increment_fields :hits + map :hits, to: :value + condition 'target.date = :date' + source_condition 'column_id = 100' + end + + destination do + model StatsTotal + take :record_id, :column_id, :hits + key_fields :record_id, :column_id + increment_fields :hits + map :hits, to: :value + source_condition 'column_id = 100' + end + + destination do + model StatsAggTotal + take :record_id, :hits + key_fields :record_id + increment_fields :hits + map :hits, to: 'sum(value)' + group_by :record_id + source_condition 'column_id = 100' + end + + on_before_merge do |dumper, _connection| + dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')} + end + end + end + + before do + counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '') + + params = {a: 1}.stringify_keys.to_s[1..-2] + counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params) + + dumper.process!(counter, date: prev_date) + + counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '') + counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '') + + dumper.process!(counter, date: date) + end + + it { expect(StatsByDay.count).to eq 4 } + it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } + it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + + it { expect(StatsTotal.count).to eq 2 } + it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } + it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + + it { expect(StatsAggTotal.count).to eq 2 } + it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 } + it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + end + end + + context 'when incremented field class is string' do let(:dumper) do RedisCounters::Dumpers::Engine.build do - name :stats_totals + name :realtime_stats fields record_id: :integer, column_id: :integer, value: :integer, - date: :date + params: :string, + date: :timestamp destination do - model StatsByDay - take :record_id, :column_id, :hits, :date - key_fields :record_id, :column_id, :date - increment_fields :hits + model RealtimeStat + take :record_id, :column_id, :date, :hits, :params + key_fields :record_id, :column_id + increment_fields :hits, :params + value_delimiter '; ' map :hits, to: :value - condition 'target.date = :date' - source_condition 'column_id = 100' + condition 'target.date::date = :date::date' end + end + end + + before do + counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: 'abc') + dumper.common_params = {date: date, params: 'abc'} + dumper.process!(counter, date: date) + + counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: 'xyz') + dumper.common_params = {date: date, params: 'xyz'} + dumper.process!(counter, date: date) + end + + it do + expect(RealtimeStat.count).to eq 1 + expect(RealtimeStat.first.params).to eq 'xyz; abc' + end + end + + context 'when incremented field class is date or time' do + let(:current_time) { Date.today.to_time } + let(:dumper) do + RedisCounters::Dumpers::Engine.build do + name :realtime_stats + fields record_id: :integer, + column_id: :integer, + value: :integer, + params: :string, + date: :timestamp destination do - model StatsTotal - take :record_id, :column_id, :hits + model RealtimeStat + take :record_id, :column_id, :hits, :params, :date key_fields :record_id, :column_id - increment_fields :hits + increment_fields :hits, :date map :hits, to: :value - source_condition 'column_id = 100' - end - - destination do - model StatsAggTotal - take :record_id, :hits - key_fields :record_id - increment_fields :hits - map :hits, to: 'sum(value)' - group_by :record_id - source_condition 'column_id = 100' - end - - on_before_merge do |dumper, _connection| - dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')} + condition 'target.date::date = :date::date' end end end - it { expect(StatsByDay.count).to eq 4 } - it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 } - it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 } + before do + counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: '') + dumper.common_params = {date: current_time} + dumper.process!(counter, date: date) - it { expect(StatsTotal.count).to eq 2 } - it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 } - it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 } + counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: '') + dumper.common_params = {date: current_time} + dumper.process!(counter, date: date) + end - it { expect(StatsAggTotal.count).to eq 2 } - it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 } - it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 } + it 'update incremented date' do + expect(RealtimeStat.count).to eq 1 + expect(RealtimeStat.first.date).to eq current_time + end end end