Skip to content

Commit

Permalink
Merge pull request #12 from OlegPerminov/CRM-4532
Browse files Browse the repository at this point in the history
feature: add flexibility to persistent types
  • Loading branch information
c1n1c authored Dec 18, 2018
2 parents edce26d + b197e2a commit 9894198
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 68 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ source 'https://rubygems.org'

gem 'activerecord-postgres-hstore', require: false
gem 'simple_hstore_accessor', '~> 0.2', require: false
gem 'pg', '~> 0.11'

gemspec
20 changes: 19 additions & 1 deletion lib/redis_counters/dumpers/destination.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class Destination
extend Forwardable
include ::RedisCounters::Dumpers::Dsl::Destination

VALUE_DELIMITER = ','.freeze

# Ссылка на родительский движек - дампер.
attr_accessor :engine

Expand Down Expand Up @@ -76,6 +78,9 @@ class Destination
# Returns String
attr_accessor :matching_expr

# Разделитель значений, String.
attr_accessor :value_delimiter

def initialize(engine)
@engine = engine
@fields_map = HashWithIndifferentAccess.new
Expand Down Expand Up @@ -174,7 +179,16 @@ def full_fields_map
end

def updating_expression
increment_fields.map { |field| "#{field} = COALESCE(target.#{field}, 0) + source.#{field}" }.join(', ')
increment_fields.map do |field|
case model.columns_hash[field.to_s].type
when :datetime, :date
"#{field} = source.#{field}"
when :string
"#{field} = array_to_string(ARRAY[source.#{field}, target.#{field}], '#{delimiter}')"
else
"#{field} = COALESCE(target.#{field}, 0) + source.#{field}"
end
end.join(', ')
end

def matching_expression
Expand All @@ -197,6 +211,10 @@ def source_conditions_expression

"WHERE #{source_conditions.map { |source_condition| "(#{source_condition})" }.join(' AND ')}"
end

def delimiter
value_delimiter || VALUE_DELIMITER
end
end
end
end
1 change: 1 addition & 0 deletions lib/redis_counters/dumpers/dsl/destination.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Configuration < ::RedisCounters::Dumpers::Dsl::Base

setter :model
setter :matching_expr
setter :value_delimiter

varags_setter :fields
varags_setter :key_fields
Expand Down
1 change: 0 additions & 1 deletion redis_counters-dumpers.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ Gem::Specification.new do |spec|

spec.add_runtime_dependency 'activesupport', '>= 3.0', '< 5'
spec.add_runtime_dependency 'activerecord', '>= 3.0'
spec.add_runtime_dependency 'pg'
spec.add_runtime_dependency 'redis', '>= 3.0'
spec.add_runtime_dependency 'redis-namespace', '>= 1.3'
spec.add_runtime_dependency 'callbacks_rb', '>= 0.0.1'
Expand Down
2 changes: 2 additions & 0 deletions spec/internal/app/models/realtime_stat.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class RealtimeStat < ActiveRecord::Base
end
8 changes: 8 additions & 0 deletions spec/internal/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,12 @@
t.integer :value, null: false, default: 0
t.string :payload
end

create_table :realtime_stats do |t|
t.integer :record_id, null: false
t.integer :column_id, null: false
t.integer :hits, null: false, default: 0
t.timestamp :date
t.string :params
end
end
231 changes: 165 additions & 66 deletions spec/lib/redis_counters/dumpers/engine_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,100 +65,199 @@

describe '#process!' do
context 'when increment_fields specified' do
before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')
context 'without source conditions' do
before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')

params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)
params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)

dumper.process!(counter, date: prev_date)
dumper.process!(counter, date: prev_date)

counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')
counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')

dumper.process!(counter, date: date)
end
dumper.process!(counter, date: date)
end

it { expect(StatsByDay.count).to eq 7 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }
it { expect(StatsByDay.count).to eq 7 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: prev_date).first.hits).to eq 2 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 3, column_id: 300, date: prev_date).first.params).to eq("a" => "1") }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 200, date: date).first.hits).to eq 2 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

it { expect(StatsTotal.count).to eq 4 }
it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
it { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 }
it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }
it { expect(StatsTotal.count).to eq 4 }
it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
it { expect(StatsTotal.where(record_id: 1, column_id: 200).first.hits).to eq 4 }
it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

it { expect(StatsAggTotal.count).to eq 3 }
it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 }
it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
it { expect(StatsAggTotal.count).to eq 3 }
it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 6 }
it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
end

context 'with source conditions' do
context 'when incremented field class is integer' do
let(:dumper) do
RedisCounters::Dumpers::Engine.build do
name :stats_totals
fields record_id: :integer,
column_id: :integer,
value: :integer,
date: :date

destination do
model StatsByDay
take :record_id, :column_id, :hits, :date
key_fields :record_id, :column_id, :date
increment_fields :hits
map :hits, to: :value
condition 'target.date = :date'
source_condition 'column_id = 100'
end

destination do
model StatsTotal
take :record_id, :column_id, :hits
key_fields :record_id, :column_id
increment_fields :hits
map :hits, to: :value
source_condition 'column_id = 100'
end

destination do
model StatsAggTotal
take :record_id, :hits
key_fields :record_id
increment_fields :hits
map :hits, to: 'sum(value)'
group_by :record_id
source_condition 'column_id = 100'
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
end
end
end

before do
counter.increment(date: prev_date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: prev_date_s, record_id: 2, column_id: 100, subject: nil, params: '')

params = {a: 1}.stringify_keys.to_s[1..-2]
counter.increment(date: prev_date_s, record_id: 3, column_id: 300, subject: nil, params: params)

dumper.process!(counter, date: prev_date)

counter.increment(date: date_s, record_id: 1, column_id: 100, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 1, column_id: 200, subject: '', params: '')
counter.increment(date: date_s, record_id: 2, column_id: 100, subject: nil, params: '')

dumper.process!(counter, date: date)
end

it { expect(StatsByDay.count).to eq 4 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }

it { expect(StatsTotal.count).to eq 2 }
it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }

it { expect(StatsAggTotal.count).to eq 2 }
it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 }
it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
end
end

context 'when incremented field class is string' do
let(:dumper) do
RedisCounters::Dumpers::Engine.build do
name :stats_totals
name :realtime_stats
fields record_id: :integer,
column_id: :integer,
value: :integer,
date: :date
params: :string,
date: :timestamp

destination do
model StatsByDay
take :record_id, :column_id, :hits, :date
key_fields :record_id, :column_id, :date
increment_fields :hits
model RealtimeStat
take :record_id, :column_id, :date, :hits, :params
key_fields :record_id, :column_id
increment_fields :hits, :params
value_delimiter '; '
map :hits, to: :value
condition 'target.date = :date'
source_condition 'column_id = 100'
condition 'target.date::date = :date::date'
end
end
end

before do
counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: 'abc')
dumper.common_params = {date: date, params: 'abc'}
dumper.process!(counter, date: date)

counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: 'xyz')
dumper.common_params = {date: date, params: 'xyz'}
dumper.process!(counter, date: date)
end

it do
expect(RealtimeStat.count).to eq 1
expect(RealtimeStat.first.params).to eq 'xyz; abc'
end
end

context 'when incremented field class is date or time' do
let(:current_time) { Date.today.to_time }
let(:dumper) do
RedisCounters::Dumpers::Engine.build do
name :realtime_stats
fields record_id: :integer,
column_id: :integer,
value: :integer,
params: :string,
date: :timestamp

destination do
model StatsTotal
take :record_id, :column_id, :hits
model RealtimeStat
take :record_id, :column_id, :hits, :params, :date
key_fields :record_id, :column_id
increment_fields :hits
increment_fields :hits, :date
map :hits, to: :value
source_condition 'column_id = 100'
end

destination do
model StatsAggTotal
take :record_id, :hits
key_fields :record_id
increment_fields :hits
map :hits, to: 'sum(value)'
group_by :record_id
source_condition 'column_id = 100'
end

on_before_merge do |dumper, _connection|
dumper.common_params = {date: dumper.args[:date].strftime('%Y-%m-%d')}
condition 'target.date::date = :date::date'
end
end
end

it { expect(StatsByDay.count).to eq 4 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: prev_date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 1, column_id: 100, date: date).first.hits).to eq 1 }
it { expect(StatsByDay.where(record_id: 2, column_id: 100, date: date).first.hits).to eq 1 }
before do
counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: '')
dumper.common_params = {date: current_time}
dumper.process!(counter, date: date)

it { expect(StatsTotal.count).to eq 2 }
it { expect(StatsTotal.where(record_id: 1, column_id: 100).first.hits).to eq 2 }
it { expect(StatsTotal.where(record_id: 2, column_id: 100).first.hits).to eq 2 }
counter.increment(date: date, record_id: 1, column_id: 100, subject: '', params: '')
dumper.common_params = {date: current_time}
dumper.process!(counter, date: date)
end

it { expect(StatsAggTotal.count).to eq 2 }
it { expect(StatsAggTotal.where(record_id: 1).first.hits).to eq 2 }
it { expect(StatsAggTotal.where(record_id: 2).first.hits).to eq 2 }
it 'update incremented date' do
expect(RealtimeStat.count).to eq 1
expect(RealtimeStat.first.date).to eq current_time
end
end
end

Expand Down

0 comments on commit 9894198

Please sign in to comment.