Skip to content

Commit

Permalink
feature: delete prev rt rows when indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
TamarinEA committed Mar 2, 2020
1 parent 63ebe98 commit e469419
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 67 deletions.
11 changes: 9 additions & 2 deletions lib/sphinx/integration/extensions/thinking_sphinx/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ def rt?

def switch_rt
recent_rt.switch
end

def truncate_prev_rt
rt.within_partition(recent_rt.prev, &:truncate)
end

Expand Down Expand Up @@ -168,8 +171,12 @@ def indexing?
mutex(:indexing).locked?
end

def indexing
mutex(:indexing).with_lock { yield }
def indexing(need_lock: true)
if need_lock
mutex(:indexing).with_lock { yield }
else
yield
end
end

def recent_rt
Expand Down
15 changes: 10 additions & 5 deletions lib/sphinx/integration/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,20 @@ def index
log "Index sphinx"

@indexes.each do |index|
::Sphinx::Integration::Mysql::Replayer.new(index.core_name).reset
rotate_index = rotate? && index.rt?

::Sphinx::Integration::Mysql::Replayer.new(index.core_name).reset if rotate_index

index.indexing(need_lock: rotate_index) do
index.switch_rt if rotate_index

index.indexing do
@sphinx.index(index.core_name)
index.last_indexing_time.write
end

index.switch_rt if rotate? && index.rt?

::Sphinx::Integration::ReplayerJob.enqueue(index.core_name) if index.rt?
if rotate_index
index.truncate_prev_rt
::Sphinx::Integration::ReplayerJob.enqueue(index.core_name)
end
end
rescue StandardError => error
Expand Down
6 changes: 1 addition & 5 deletions lib/sphinx/integration/statements/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ module Statements
class Core < Distributed
private

def index_names
yield first_index_name
end

def first_index_name
def index_name
@index.core_name
end
end
Expand Down
32 changes: 12 additions & 20 deletions lib/sphinx/integration/statements/distributed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,23 @@ def initialize(index)
end

def update(data, matching: nil, where: {})
index_names do |index_name|
sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update.
new(index_name, data, where.merge!(sphinx_deleted: 0), prepare_matching(matching)).
to_sql
sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update.
new(index_name, data, where.merge!(sphinx_deleted: 0), prepare_matching(matching)).
to_sql

write(sql)
write(sql)

yield(index_name, sql) if block_given?
end
yield(sql) if block_given?
end

def soft_delete(document_id)
index_names do |index_name|
sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update.
new(index_name, {sphinx_deleted: 1}, {id: document_id, sphinx_deleted: 0}, nil).
to_sql
sql = ::Sphinx::Integration::Extensions::Riddle::Query::Update.
new(index_name, {sphinx_deleted: 1}, {id: document_id, sphinx_deleted: 0}, nil).
to_sql

write(sql)
write(sql)

yield(index_name, sql) if block_given?
end
yield(sql) if block_given?
end

def select(values, matching: nil, where: {}, where_not: {}, order_by: nil, limit: nil)
Expand All @@ -38,7 +34,7 @@ def select(values, matching: nil, where: {}, where_not: {}, order_by: nil, limit

query = ::Riddle::Query::Select.new.reset_values.
values(values).
from(first_index_name).
from(index_name).
matching(prepare_matching(matching)).
where(where).
where_not(where_not).
Expand Down Expand Up @@ -81,11 +77,7 @@ def find_in_batches(primary_key: 'sphinx_internal_id', batch_size: 1_000, matchi

private

def index_names
yield first_index_name
end

def first_index_name
def index_name
@index.name
end

Expand Down
4 changes: 2 additions & 2 deletions lib/sphinx/integration/statements/plain.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ class Plain < Core
delegate :update_log, :soft_delete_log, to: '::ThinkingSphinx::Configuration.instance'

def update(*)
super do |index_name, sql|
super do |sql|
update_log.add(index_name, query: sql)
end
end

def soft_delete(document_id)
super do |index_name, _sql|
super do |_sql|
soft_delete_log.add(index_name, document_id: document_id)
end
end
Expand Down
43 changes: 15 additions & 28 deletions lib/sphinx/integration/statements/rt.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,39 @@ def replace(data)
raise ArgumentError.new('invalid schema of data') unless query_data.all? { |item| item.keys == query_keys }
query_values = query_data.map!(&:values)

index_names do |index_name|
sql = ::Riddle::Query::Insert.
new(index_name, query_keys, query_values).
replace!.
to_sql
sql = ::Riddle::Query::Insert.
new(index_name, query_keys, query_values).
replace!.
to_sql

write(sql)
write(sql)

yield(index_name, sql) if block_given?
end
yield(sql) if block_given?
end

def delete(document_ids)
index_names do |index_name|
sql = ::Riddle::Query::Delete.
new(index_name, Array.wrap(document_ids)).
to_sql
sql = ::Riddle::Query::Delete.
new(index_name, Array.wrap(document_ids)).
to_sql

write(sql)
write(sql)

yield(index_name, sql) if block_given?
end
yield(index_name, sql) if block_given?
end

def truncate
index_names do |index_name|
write("TRUNCATE RTINDEX #{index_name}")
end
write("TRUNCATE RTINDEX #{index_name}")
end

private

def index_names
def index_name
if @partition
yield @index.rt_name(@partition)
elsif @index.indexing?
yield @index.rt_name(0)
yield @index.rt_name(1)
@index.rt_name(@partition)
else
yield first_index_name
@index.rt_name
end
end

def first_index_name
@index.rt_name
end
end
end
end
Expand Down
16 changes: 14 additions & 2 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ def delete(records)
rt_indexes.each do |index|
index.rt.delete(ids)
index.core.soft_delete(ids)
index.plain.soft_delete(ids) if index.indexing?

if index.indexing?
index.plain.soft_delete(ids)
prev_rt_delete(index, ids)
end
end

true
Expand Down Expand Up @@ -108,8 +112,12 @@ def transmit(index, records)
data = transmitted_data(index, records)
return if data.blank?

ids = sphinx_document_ids(records)

index.rt.replace(data)
index.core.soft_delete(sphinx_document_ids(records))
index.core.soft_delete(ids)

prev_rt_delete(index, ids) if index.indexing?
end
alias transmit_all transmit

Expand Down Expand Up @@ -261,5 +269,9 @@ def need_instance_records?

@_need_instance_records = rt_indexes.any? { |index| index.mva_sources.present? }
end

def prev_rt_delete(index, ids)
index.rt.within_partition(index.recent_rt.prev) { |prev_index| prev_index.delete(ids) }
end
end
end
12 changes: 10 additions & 2 deletions spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,20 @@
describe '#switch_rt' do
it 'flips current partition' do
expect(index.rt_name).to eq 'model_with_disk_rt0'
expect(::ThinkingSphinx::Configuration.instance.mysql_client).
to receive(:write).with('TRUNCATE RTINDEX model_with_disk_rt0')

index.switch_rt

expect(index.rt_name).to eq 'model_with_disk_rt1'
end
end

describe '#truncate_prev_rt' do
it do
expect(index.rt_name).to eq 'model_with_disk_rt0'
expect(::ThinkingSphinx::Configuration.instance.mysql_client).
to receive(:write).with('TRUNCATE RTINDEX model_with_disk_rt1')

index.truncate_prev_rt
end
end
end
6 changes: 5 additions & 1 deletion spec/sphinx/integration/helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
context "when online indexing" do
it do
helper = described_class.new(default_options.merge(rotate: true, indexes: 'model_with_rt'))
expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to receive(:reset)
expect_any_instance_of(RedisMutex).to receive(:with_lock).and_yield
expect(adapter).to receive(:index).with('model_with_rt_core')
expect(::ThinkingSphinx::Configuration.instance.mysql_client).
to receive(:write).with('TRUNCATE RTINDEX model_with_rt_rt0')
Expand All @@ -27,9 +29,11 @@
context "when offline indexing" do
it do
helper = described_class.new(default_options.merge(indexes: 'model_with_rt'))
expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to_not receive(:reset)
expect_any_instance_of(RedisMutex).to_not receive(:with_lock)
expect(adapter).to receive(:index).with('model_with_rt_core')
expect(::ThinkingSphinx::Configuration.instance.mysql_client).to_not receive(:write)
expect(::Sphinx::Integration::ReplayerJob).to receive(:enqueue).with('model_with_rt_core')
expect(::Sphinx::Integration::ReplayerJob).to_not receive(:enqueue)
helper.index
expect(ModelWithRt.sphinx_indexes.first.recent_rt.current).to eq 0
end
Expand Down
21 changes: 21 additions & 0 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@
transmitter.replace(record)
end

context 'when indexing' do
it do
expect(record.class.connection).to receive(:select_all).with(/^SELECT/).and_return([
{'sphinx_internal_id' => 1, 'region_id' => '123', 'has_region' => 't'}
])
expect(client).to receive(:write).with(
'REPLACE INTO model_with_rt_rt0 (`sphinx_internal_id`, `region_id`, `has_region`, `rubrics`)' \
' VALUES (1, 123, 1, ())'
)
expect(client).to receive(:write).
with("UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE " \
"`id` IN (#{record.sphinx_document_id}) AND `sphinx_deleted` = 0")
expect(client).
to receive(:write).with("DELETE FROM model_with_rt_rt1 WHERE id = #{record.sphinx_document_id}")

ModelWithRt.sphinx_indexes.first.indexing do
transmitter.replace(record)
end
end
end

it 'rasises error if need instances' do
expect { transmitter.replace(record.id) }.to raise_error(/instance of ModelWithRt needed/)
end
Expand Down

0 comments on commit e469419

Please sign in to comment.