From 69a3a950117979a2ecd7b8637badfe309df12203 Mon Sep 17 00:00:00 2001 From: "Andrew N. Shalaev" Date: Mon, 24 Sep 2018 14:34:34 +0500 Subject: [PATCH] feature: allow to replace by batches --- lib/sphinx/integration/transmitter.rb | 75 +++++++++++++-------- spec/sphinx/integration/transmitter_spec.rb | 62 ++++++++++++++--- 2 files changed, 100 insertions(+), 37 deletions(-) diff --git a/lib/sphinx/integration/transmitter.rb b/lib/sphinx/integration/transmitter.rb index e9cd894..b48380a 100644 --- a/lib/sphinx/integration/transmitter.rb +++ b/lib/sphinx/integration/transmitter.rb @@ -11,16 +11,18 @@ def initialize(klass) @klass = klass end - # Обновляет запись в сфинксе + # Обновляет записи в сфинксе # - # record - ActiveRecord::Base + # records - Array # # Returns boolean - def replace(record) + def replace(records) return false if write_disabled? + records = Array.wrap(records) + rt_indexes.each do |index| - transmit(index, record) + transmit(index, records) end true @@ -84,22 +86,23 @@ def update_fields(data, matching: nil, **where) private - # Запись объекта в rt index + # Запись объектов в rt index # # index - ThinkingSphinx::Index - # record - ActiveRecord::Base + # records - Array # # Returns nothing - def transmit(index, record) - data = transmitted_data(index, record) + def transmit(index, records) + data = transmitted_data(index, records) return unless data index.rt.replace(data) - index.plain.soft_delete(record.sphinx_document_id) + index.plain.soft_delete(records.map(&:sphinx_document_id)) end def transmit_all(index, ids) - klass.where(id: ids).each { |record| transmit(index, record) } + records = klass.where(id: ids) + transmit(index, records) end # Перекладывает строчки из core в rt. @@ -120,31 +123,39 @@ def retransmit(index, matching: nil, where: {}) # Данные, необходимые для записи в индекс сфинкса # # index - ThinkingSphinx::Index - # record - ActiveRecord::Base + # records - Array # # Returns Hash - def transmitted_data(index, record) - sql = index.single_query_sql.gsub(TEMPLATE_ID, record.id.to_s) + def transmitted_data(index, records) + rows = connection.select_all(prepared_sql(index, records)) + + rows.map! do |row| + record = records.find { |r| r.id == row['sphinx_internal_id'].to_i } + row.merge!(mva_attributes(index, record)) + + row.each do |key, value| + row[key] = case index.attributes_types_map[key] + when :integer then value.to_i + when :float then value.to_f + when :multi then type_cast_to_multi(value) + when :boolean then ActiveRecord::ConnectionAdapters::Column.value_to_boolean(value) + else value + end + end + + row + end + end + + def prepared_sql(index, records) + sql = index.single_query_sql.gsub(TEMPLATE_ID, "ANY(ARRAY[#{records.map(&:id).join(',')}])").sub('LIMIT 1', '') + query_options = index.local_options[:with_sql] if query_options && (update_proc = query_options[:update]).respond_to?(:call) sql = update_proc.call(sql) end - row = record.class.connection.execute(sql).first - return unless row - - row.merge!(mva_attributes(index, record)) - - row.each do |key, value| - row[key] = case index.attributes_types_map[key] - when :integer then value.to_i - when :float then value.to_f - when :multi then type_cast_to_multi(value) - when :boolean then ActiveRecord::ConnectionAdapters::Column.value_to_boolean(value) - else value - end - end - row + sql end # MVA data @@ -183,5 +194,13 @@ def type_cast_to_multi(value) value end end + + ## + # коннекция к бд + # + + def connection + @connection ||= klass.connection + end end end diff --git a/spec/sphinx/integration/transmitter_spec.rb b/spec/sphinx/integration/transmitter_spec.rb index 6f8da47..62bca52 100644 --- a/spec/sphinx/integration/transmitter_spec.rb +++ b/spec/sphinx/integration/transmitter_spec.rb @@ -11,28 +11,72 @@ allow(transmitter).to receive(:write_disabled?).and_return(false) allow(record).to receive_messages( - sphinx_document_id: 1, + id: 1, + sphinx_document_id: 10, exists_in_sphinx?: true, model_with_rt_rubrics: [] ) end describe '#replace' do - it "send valid quries to sphinx" do - expect(record.class.connection).to receive(:execute).with(/^SELECT/).and_return([{"region_id" => "123"}]) - expect(client).to receive(:write).with('REPLACE INTO model_with_rt_rt0 (`region_id`, `rubrics`) VALUES (123, ())') - expect(client).to receive(:write). - with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 1 AND `sphinx_deleted` = 0') + context 'when single result from db' do + it "send valid quries to sphinx", focus: true do + expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([ + {"sphinx_internal_id" => 1, "region_id" => "123"} + ]) + expect(client).to receive(:write).with( + 'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `rubrics`) VALUES (1, 123, ())' + ) + expect(client).to receive(:write). + with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` IN (10) AND `sphinx_deleted` = 0') + + transmitter.replace(record) + end + end + + context 'when multi result from db' do + let(:record1) { mock_model ModelWithRt } + let(:record2) { mock_model ModelWithRt } + + before do + allow(record1).to receive_messages( + id: 1, + sphinx_document_id: 10, + exists_in_sphinx?: true, + model_with_rt_rubrics: [] + ) - transmitter.replace(record) + allow(record2).to receive_messages( + id: 2, + sphinx_document_id: 20, + exists_in_sphinx?: true, + model_with_rt_rubrics: [] + ) + end + + it "send valid quries to sphinx" do + expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([ + {"sphinx_internal_id" => 1, "region_id" => "123"}, + {"sphinx_internal_id" => 2, "region_id" => "123"} + ]) + + expect(client).to receive(:write).with( + 'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `rubrics`) VALUES (1, 123, ()), (2, 123, ())' + ) + + expect(client).to receive(:write). + with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` IN (10, 20) AND `sphinx_deleted` = 0') + + transmitter.replace([record1, record2]) + end end end describe '#delete' do it "send valid quries to sphinx" do - expect(client).to receive(:write).with('DELETE FROM model_with_rt_rt0 WHERE id = 1') + expect(client).to receive(:write).with('DELETE FROM model_with_rt_rt0 WHERE id = 10') expect(client).to receive(:write). - with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 1 AND `sphinx_deleted` = 0') + with('UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE `id` = 10 AND `sphinx_deleted` = 0') transmitter.delete(record) end