Skip to content

Commit

Permalink
feature: use retransmit instead of update
Browse files Browse the repository at this point in the history
  • Loading branch information
TamarinEA committed Aug 13, 2020
1 parent aa96b01 commit 6711e33
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 25 deletions.
8 changes: 3 additions & 5 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def update_fields(data, matching: nil, **where)
return if write_disabled?

rt_indexes.each do |index|
index.distributed.update(data, matching: matching, where: where)
index.plain.update(data, matching: matching, where: where) if index.indexing?
retransmit(index, matching: matching, where: where)
end
end

Expand All @@ -121,18 +120,17 @@ def transmit(index, records)
end
alias transmit_all transmit

# Перекладывает строчки из core в rt.
# Запись объектов в rt index по условию
#
# index - ThinkingSphinx::Index
# :matching - String
# where - Hash
#
# Returns nothing
def retransmit(index, matching: nil, where: {})
index.plain.find_while_exists(PRIMARY_KEY, matching: matching, where: where) do |rows|
index.distributed.find_in_batches(primary_key: PRIMARY_KEY, matching: matching, where: where) do |rows|
ids = rows.map { |row| row[PRIMARY_KEY].to_i }
transmit(index, ids)
sleep 0.1 # empirical throttle number
end
end

Expand Down
31 changes: 11 additions & 20 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
let(:transmitter) { described_class.new(record.class) }
let(:client) { ::ThinkingSphinx::Configuration.instance.mysql_client }
let(:plain_index) { record.class.sphinx_indexes.find(&:rt?).plain }
let(:model_with_rt_index) { ModelWithRt.sphinx_indexes.first }

before(:all) { ThinkingSphinx.context.define_indexes }

Expand Down Expand Up @@ -52,7 +53,7 @@
expect(client).
to receive(:write).with("DELETE FROM model_with_rt_rt1 WHERE id = #{record.sphinx_document_id}")

ModelWithRt.sphinx_indexes.first.indexing do
model_with_rt_index.indexing do
transmitter.replace(record)
end
end
Expand Down Expand Up @@ -122,7 +123,7 @@
" AND `sphinx_deleted` = 0")
expect(plain_index).to receive(:soft_delete).with([record.sphinx_document_id]).ordered

ModelWithRt.sphinx_indexes.first.indexing do
model_with_rt_index.indexing do
transmitter.delete(record)
end
end
Expand All @@ -137,26 +138,16 @@
end

describe '#update_fields' do
context 'when indexing' do
it do
expect(client).to receive(:write).
with("UPDATE model_with_rt SET field = 2 WHERE MATCH('@id_idx 1') AND `id` = 1 AND `sphinx_deleted` = 0")
expect(plain_index).
to receive(:update).with({field: 2}, matching: "@id_idx 1", where: {id: 1, sphinx_deleted: 0})

ModelWithRt.sphinx_indexes.first.indexing do
transmitter.update_fields({field: 2}, matching: "@id_idx 1", id: 1)
end
end
end
it do
expect(client).to receive(:read).with(
"SELECT sphinx_internal_id FROM model_with_rt WHERE MATCH('@id_idx 1') AND `id` = 1" \
" AND `sphinx_internal_id` > 0 AND `sphinx_deleted` = 0" \
" ORDER BY `sphinx_internal_id` ASC LIMIT 1000 OPTION max_matches=5000"
).once.ordered.and_return([{'sphinx_internal_id' => 11}, {'sphinx_internal_id' => 12}])

context 'when not indexing' do
it do
expect(client).to receive(:write).
with("UPDATE model_with_rt SET field = 2 WHERE MATCH('@id_idx 1') AND `id` = 1 AND `sphinx_deleted` = 0")
expect(transmitter).to receive(:transmit).with(model_with_rt_index, [11, 12])

transmitter.update_fields({field: 2}, matching: "@id_idx 1", id: 1)
end
transmitter.update_fields({field: 2}, matching: "@id_idx 1", id: 1)
end
end

Expand Down

0 comments on commit 6711e33

Please sign in to comment.