From e469419858d1f109ccdd6f71c6dfa1abf1cf7a24 Mon Sep 17 00:00:00 2001 From: TamarinEA Date: Thu, 27 Feb 2020 18:59:31 +0500 Subject: [PATCH] feature: delete prev rt rows when indexing https://jira.railsc.ru/browse/GOODS-2250 --- .../extensions/thinking_sphinx/index.rb | 11 ++++- lib/sphinx/integration/helper.rb | 15 ++++--- lib/sphinx/integration/statements/core.rb | 6 +-- .../integration/statements/distributed.rb | 32 ++++++-------- lib/sphinx/integration/statements/plain.rb | 4 +- lib/sphinx/integration/statements/rt.rb | 43 +++++++------------ lib/sphinx/integration/transmitter.rb | 16 ++++++- .../extensions/thinking_sphinx/index_spec.rb | 12 +++++- spec/sphinx/integration/helper_spec.rb | 6 ++- spec/sphinx/integration/transmitter_spec.rb | 21 +++++++++ 10 files changed, 99 insertions(+), 67 deletions(-) diff --git a/lib/sphinx/integration/extensions/thinking_sphinx/index.rb b/lib/sphinx/integration/extensions/thinking_sphinx/index.rb index 8539bd3..66fb5c4 100644 --- a/lib/sphinx/integration/extensions/thinking_sphinx/index.rb +++ b/lib/sphinx/integration/extensions/thinking_sphinx/index.rb @@ -118,6 +118,9 @@ def rt? def switch_rt recent_rt.switch + end + + def truncate_prev_rt rt.within_partition(recent_rt.prev, &:truncate) end @@ -168,8 +171,12 @@ def indexing? mutex(:indexing).locked? end - def indexing - mutex(:indexing).with_lock { yield } + def indexing(need_lock: true) + if need_lock + mutex(:indexing).with_lock { yield } + else + yield + end end def recent_rt diff --git a/lib/sphinx/integration/helper.rb b/lib/sphinx/integration/helper.rb index ab26622..ad97f79 100644 --- a/lib/sphinx/integration/helper.rb +++ b/lib/sphinx/integration/helper.rb @@ -55,15 +55,20 @@ def index log "Index sphinx" @indexes.each do |index| - ::Sphinx::Integration::Mysql::Replayer.new(index.core_name).reset + rotate_index = rotate? && index.rt? + + ::Sphinx::Integration::Mysql::Replayer.new(index.core_name).reset if rotate_index + + index.indexing(need_lock: rotate_index) do + index.switch_rt if rotate_index - index.indexing do @sphinx.index(index.core_name) index.last_indexing_time.write + end - index.switch_rt if rotate? && index.rt? - - ::Sphinx::Integration::ReplayerJob.enqueue(index.core_name) if index.rt? + if rotate_index + index.truncate_prev_rt + ::Sphinx::Integration::ReplayerJob.enqueue(index.core_name) end end rescue StandardError => error diff --git a/lib/sphinx/integration/statements/core.rb b/lib/sphinx/integration/statements/core.rb index 82d22e7..eea2760 100644 --- a/lib/sphinx/integration/statements/core.rb +++ b/lib/sphinx/integration/statements/core.rb @@ -4,11 +4,7 @@ module Statements class Core < Distributed private - def index_names - yield first_index_name - end - - def first_index_name + def index_name @index.core_name end end diff --git a/lib/sphinx/integration/statements/distributed.rb b/lib/sphinx/integration/statements/distributed.rb index 4a6f1a9..a3d9739 100644 --- a/lib/sphinx/integration/statements/distributed.rb +++ b/lib/sphinx/integration/statements/distributed.rb @@ -9,27 +9,23 @@ def initialize(index) end def update(data, matching: nil, where: {}) - index_names do |index_name| - sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update. - new(index_name, data, where.merge!(sphinx_deleted: 0), prepare_matching(matching)). - to_sql + sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update. + new(index_name, data, where.merge!(sphinx_deleted: 0), prepare_matching(matching)). + to_sql - write(sql) + write(sql) - yield(index_name, sql) if block_given? - end + yield(sql) if block_given? end def soft_delete(document_id) - index_names do |index_name| - sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update. - new(index_name, {sphinx_deleted: 1}, {id: document_id, sphinx_deleted: 0}, nil). - to_sql + sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update. + new(index_name, {sphinx_deleted: 1}, {id: document_id, sphinx_deleted: 0}, nil). + to_sql - write(sql) + write(sql) - yield(index_name, sql) if block_given? - end + yield(sql) if block_given? end def select(values, matching: nil, where: {}, where_not: {}, order_by: nil, limit: nil) @@ -38,7 +34,7 @@ def select(values, matching: nil, where: {}, where_not: {}, order_by: nil, limit query = ::Riddle::Query::Select.new.reset_values. values(values). - from(first_index_name). + from(index_name). matching(prepare_matching(matching)). where(where). where_not(where_not). @@ -81,11 +77,7 @@ def find_in_batches(primary_key: 'sphinx_internal_id', batch_size: 1_000, matchi private - def index_names - yield first_index_name - end - - def first_index_name + def index_name @index.name end diff --git a/lib/sphinx/integration/statements/plain.rb b/lib/sphinx/integration/statements/plain.rb index 8826543..3a074f5 100644 --- a/lib/sphinx/integration/statements/plain.rb +++ b/lib/sphinx/integration/statements/plain.rb @@ -5,13 +5,13 @@ class Plain < Core delegate :update_log, :soft_delete_log, to: '::ThinkingSphinx::Configuration.instance' def update(*) - super do |index_name, sql| + super do |sql| update_log.add(index_name, query: sql) end end def soft_delete(document_id) - super do |index_name, _sql| + super do |_sql| soft_delete_log.add(index_name, document_id: document_id) end end diff --git a/lib/sphinx/integration/statements/rt.rb b/lib/sphinx/integration/statements/rt.rb index 8338848..180aa56 100644 --- a/lib/sphinx/integration/statements/rt.rb +++ b/lib/sphinx/integration/statements/rt.rb @@ -22,52 +22,39 @@ def replace(data) raise ArgumentError.new('invalid schema of data') unless query_data.all? { |item| item.keys == query_keys } query_values = query_data.map!(&:values) - index_names do |index_name| - sql = ::Riddle::Query::Insert. - new(index_name, query_keys, query_values). - replace!. - to_sql + sql = ::Riddle::Query::Insert. + new(index_name, query_keys, query_values). + replace!. + to_sql - write(sql) + write(sql) - yield(index_name, sql) if block_given? - end + yield(sql) if block_given? end def delete(document_ids) - index_names do |index_name| - sql = ::Riddle::Query::Delete. - new(index_name, Array.wrap(document_ids)). - to_sql + sql = ::Riddle::Query::Delete. + new(index_name, Array.wrap(document_ids)). + to_sql - write(sql) + write(sql) - yield(index_name, sql) if block_given? - end + yield(index_name, sql) if block_given? end def truncate - index_names do |index_name| - write("TRUNCATE RTINDEX #{index_name}") - end + write("TRUNCATE RTINDEX #{index_name}") end private - def index_names + def index_name if @partition - yield @index.rt_name(@partition) - elsif @index.indexing? - yield @index.rt_name(0) - yield @index.rt_name(1) + @index.rt_name(@partition) else - yield first_index_name + @index.rt_name end end - - def first_index_name - @index.rt_name - end end end end diff --git a/lib/sphinx/integration/transmitter.rb b/lib/sphinx/integration/transmitter.rb index 7301e3d..65f1114 100644 --- a/lib/sphinx/integration/transmitter.rb +++ b/lib/sphinx/integration/transmitter.rb @@ -56,7 +56,11 @@ def delete(records) rt_indexes.each do |index| index.rt.delete(ids) index.core.soft_delete(ids) - index.plain.soft_delete(ids) if index.indexing? + + if index.indexing? + index.plain.soft_delete(ids) + prev_rt_delete(index, ids) + end end true @@ -108,8 +112,12 @@ def transmit(index, records) data = transmitted_data(index, records) return if data.blank? + ids = sphinx_document_ids(records) + index.rt.replace(data) - index.core.soft_delete(sphinx_document_ids(records)) + index.core.soft_delete(ids) + + prev_rt_delete(index, ids) if index.indexing? end alias transmit_all transmit @@ -261,5 +269,9 @@ def need_instance_records? @_need_instance_records = rt_indexes.any? { |index| index.mva_sources.present? } end + + def prev_rt_delete(index, ids) + index.rt.within_partition(index.recent_rt.prev) { |prev_index| prev_index.delete(ids) } + end end end diff --git a/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb b/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb index fd56bcb..462f9aa 100644 --- a/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb +++ b/spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb @@ -75,12 +75,20 @@ describe '#switch_rt' do it 'flips current partition' do expect(index.rt_name).to eq 'model_with_disk_rt0' - expect(::ThinkingSphinx::Configuration.instance.mysql_client). - to receive(:write).with('TRUNCATE RTINDEX model_with_disk_rt0') index.switch_rt expect(index.rt_name).to eq 'model_with_disk_rt1' end end + + describe '#truncate_prev_rt' do + it do + expect(index.rt_name).to eq 'model_with_disk_rt0' + expect(::ThinkingSphinx::Configuration.instance.mysql_client). + to receive(:write).with('TRUNCATE RTINDEX model_with_disk_rt1') + + index.truncate_prev_rt + end + end end diff --git a/spec/sphinx/integration/helper_spec.rb b/spec/sphinx/integration/helper_spec.rb index 5606721..d40684e 100644 --- a/spec/sphinx/integration/helper_spec.rb +++ b/spec/sphinx/integration/helper_spec.rb @@ -15,6 +15,8 @@ context "when online indexing" do it do helper = described_class.new(default_options.merge(rotate: true, indexes: 'model_with_rt')) + expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to receive(:reset) + expect_any_instance_of(RedisMutex).to receive(:with_lock).and_yield expect(adapter).to receive(:index).with('model_with_rt_core') expect(::ThinkingSphinx::Configuration.instance.mysql_client). to receive(:write).with('TRUNCATE RTINDEX model_with_rt_rt0') @@ -27,9 +29,11 @@ context "when offline indexing" do it do helper = described_class.new(default_options.merge(indexes: 'model_with_rt')) + expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to_not receive(:reset) + expect_any_instance_of(RedisMutex).to_not receive(:with_lock) expect(adapter).to receive(:index).with('model_with_rt_core') expect(::ThinkingSphinx::Configuration.instance.mysql_client).to_not receive(:write) - expect(::Sphinx::Integration::ReplayerJob).to receive(:enqueue).with('model_with_rt_core') + expect(::Sphinx::Integration::ReplayerJob).to_not receive(:enqueue) helper.index expect(ModelWithRt.sphinx_indexes.first.recent_rt.current).to eq 0 end diff --git a/spec/sphinx/integration/transmitter_spec.rb b/spec/sphinx/integration/transmitter_spec.rb index 92a535d..bc0a311 100644 --- a/spec/sphinx/integration/transmitter_spec.rb +++ b/spec/sphinx/integration/transmitter_spec.rb @@ -36,6 +36,27 @@ transmitter.replace(record) end + context 'when indexing' do + it do + expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([ + {'sphinx_internal_id' => 1, 'region_id' => '123', 'has_region' => 't'} + ]) + expect(client).to receive(:write).with( + 'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `has_region`, `rubrics`)' \ + ' VALUES (1, 123, 1, ())' + ) + expect(client).to receive(:write). + with("UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE " \ + "`id` IN (#{record.sphinx_document_id}) AND `sphinx_deleted` = 0") + expect(client). + to receive(:write).with("DELETE FROM model_with_rt_rt1 WHERE id = #{record.sphinx_document_id}") + + ModelWithRt.sphinx_indexes.first.indexing do + transmitter.replace(record) + end + end + end + it 'rasises error if need instances' do expect { transmitter.replace(record.id) }.to raise_error(/instance of ModelWithRt needed/) end