Skip to content

Commit

Permalink
feature: allow to replace by batches
Browse files Browse the repository at this point in the history
  • Loading branch information
isqad committed Sep 24, 2018
1 parent 9a5e23a commit 69a3a95
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 37 deletions.
75 changes: 47 additions & 28 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ def initialize(klass)
@klass = klass
end

# Обновляет запись в сфинксе
# Обновляет записи в сфинксе
#
# record - ActiveRecord::Base
# records - Array
#
# Returns boolean
def replace(record)
def replace(records)
return false if write_disabled?

records = Array.wrap(records)

rt_indexes.each do |index|
transmit(index, record)
transmit(index, records)
end

true
Expand Down Expand Up @@ -84,22 +86,23 @@ def update_fields(data, matching: nil, **where)

private

# Запись объекта в rt index
# Запись объектов в rt index
#
# index - ThinkingSphinx::Index
# record - ActiveRecord::Base
# records - Array
#
# Returns nothing
def transmit(index, record)
data = transmitted_data(index, record)
def transmit(index, records)
data = transmitted_data(index, records)
return unless data

index.rt.replace(data)
index.plain.soft_delete(record.sphinx_document_id)
index.plain.soft_delete(records.map(&:sphinx_document_id))
end

def transmit_all(index, ids)
klass.where(id: ids).each { |record| transmit(index, record) }
records = klass.where(id: ids)
transmit(index, records)
end

# Перекладывает строчки из core в rt.
Expand All @@ -120,31 +123,39 @@ def retransmit(index, matching: nil, where: {})
# Данные, необходимые для записи в индекс сфинкса
#
# index - ThinkingSphinx::Index
# record - ActiveRecord::Base
# records - Array
#
# Returns Hash
def transmitted_data(index, record)
sql = index.single_query_sql.gsub(TEMPLATE_ID, record.id.to_s)
def transmitted_data(index, records)
rows = connection.select_all(prepared_sql(index, records))

rows.map! do |row|
record = records.find { |r| r.id == row['sphinx_internal_id'].to_i }
row.merge!(mva_attributes(index, record))

row.each do |key, value|
row[key] = case index.attributes_types_map[key]
when :integer then value.to_i
when :float then value.to_f
when :multi then type_cast_to_multi(value)
when :boolean then ActiveRecord::ConnectionAdapters::Column.value_to_boolean(value)
else value
end
end

row
end
end

def prepared_sql(index, records)
sql = index.single_query_sql.gsub(TEMPLATE_ID, "ANY(ARRAY[#{records.map(&:id).join(',')}])").sub('LIMIT 1', '')

query_options = index.local_options[:with_sql]
if query_options && (update_proc = query_options[:update]).respond_to?(:call)
sql = update_proc.call(sql)
end
row = record.class.connection.execute(sql).first
return unless row

row.merge!(mva_attributes(index, record))

row.each do |key, value|
row[key] = case index.attributes_types_map[key]
when :integer then value.to_i
when :float then value.to_f
when :multi then type_cast_to_multi(value)
when :boolean then ActiveRecord::ConnectionAdapters::Column.value_to_boolean(value)
else value
end
end

row
sql
end

# MVA data
Expand Down Expand Up @@ -183,5 +194,13 @@ def type_cast_to_multi(value)
value
end
end

##
# коннекция к бд
#

def connection
@connection ||= klass.connection
end
end
end
62 changes: 53 additions & 9 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,72 @@
allow(transmitter).to receive(:write_disabled?).and_return(false)

allow(record).to receive_messages(
sphinx_document_id: 1,
id: 1,
sphinx_document_id: 10,
exists_in_sphinx?: true,
model_with_rt_rubrics: []
)
end

describe '#replace' do
it "send valid quries to sphinx" do
expect(record.class.connection).to receive(:execute).with(/^SELECT/).and_return([{"region_id" => "123"}])
expect(client).to receive(:write).with('REPLACE INTO model_with_rt_rt0 (`region_id`, `rubrics`) VALUES (123, ())')
expect(client).to receive(:write).
with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 1 AND `sphinx_deleted` = 0')
context 'when single result from db' do
it "send valid quries to sphinx", focus: true do
expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([
{"sphinx_internal_id" => 1, "region_id" => "123"}
])
expect(client).to receive(:write).with(
'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `rubrics`) VALUES (1, 123, ())'
)
expect(client).to receive(:write).
with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` IN (10) AND `sphinx_deleted` = 0')

transmitter.replace(record)
end
end

context 'when multi result from db' do
let(:record1) { mock_model ModelWithRt }
let(:record2) { mock_model ModelWithRt }

before do
allow(record1).to receive_messages(
id: 1,
sphinx_document_id: 10,
exists_in_sphinx?: true,
model_with_rt_rubrics: []
)

transmitter.replace(record)
allow(record2).to receive_messages(
id: 2,
sphinx_document_id: 20,
exists_in_sphinx?: true,
model_with_rt_rubrics: []
)
end

it "send valid quries to sphinx" do
expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([
{"sphinx_internal_id" => 1, "region_id" => "123"},
{"sphinx_internal_id" => 2, "region_id" => "123"}
])

expect(client).to receive(:write).with(
'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `rubrics`) VALUES (1, 123, ()), (2, 123, ())'
)

expect(client).to receive(:write).
with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` IN (10, 20) AND `sphinx_deleted` = 0')

transmitter.replace([record1, record2])
end
end
end

describe '#delete' do
it "send valid quries to sphinx" do
expect(client).to receive(:write).with('DELETE FROM model_with_rt_rt0 WHERE id = 1')
expect(client).to receive(:write).with('DELETE FROM model_with_rt_rt0 WHERE id = 10')
expect(client).to receive(:write).
with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 1 AND `sphinx_deleted` = 0')
with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 10 AND `sphinx_deleted` = 0')

transmitter.delete(record)
end
Expand Down

0 comments on commit 69a3a95

Please sign in to comment.