Skip to content

Commit

Permalink
fix: Transmitter.update_fields when full indexing
Browse files Browse the repository at this point in the history
By default Sphinx selects only 20 rows.
Therefor Transmitter updates not all rows when full reindex running.
This commit will fix this.
And adds new method ThinkingSphinx.find_in_batches.

https://jira.railsc.ru/browse/PC4-15139
  • Loading branch information
bibendi committed Jul 15, 2015
1 parent 4c826c2 commit 5ffd81c
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 76 deletions.
12 changes: 10 additions & 2 deletions lib/sphinx/integration/extensions/thinking_sphinx.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ module Sphinx::Integration::Extensions::ThinkingSphinx
autoload :Source, 'sphinx/integration/extensions/thinking_sphinx/source'
autoload :Configuration, 'sphinx/integration/extensions/thinking_sphinx/configuration'
autoload :LastIndexingTime, 'sphinx/integration/extensions/thinking_sphinx/last_indexing_time'
autoload :Statements, 'sphinx/integration/extensions/thinking_sphinx/statements'

extend ActiveSupport::Concern

included do
DEFAULT_MATCH = :extended2
include Sphinx::Integration::FastFacet
include LastIndexingTime
extend Sphinx::Integration::Extensions::ThinkingSphinx::Statements
end

module ClassMethods

def max_matches
@ts_max_matches ||= ThinkingSphinx::Configuration.instance.configuration.searchd.max_matches || 5000
end
Expand All @@ -33,6 +34,14 @@ def reset_indexed_models
end
end

def replication?
ThinkingSphinx::Configuration.instance.replication?
end

def log(message)
::ActiveSupport::Notifications.instrument("message.thinking_sphinx", message: message)
end

# Посылает sql запрос в Sphinx
#
# query - String
Expand All @@ -59,6 +68,5 @@ def take_connection(options = {})
end
end
end

end
end
58 changes: 58 additions & 0 deletions lib/sphinx/integration/extensions/thinking_sphinx/statements.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module Sphinx
module Integration
module Extensions
module ThinkingSphinx
module Statements
def replace(index_name, data)
query = ::Riddle::Query::Insert.new(index_name, data.keys, data.values).replace!.to_sql
execute(query, on_slaves: replication?)
end

def update(index_name, data, where)
query = ::Sphinx::Integration::Extensions::Riddle::Query::Update.new(index_name, data, where).to_sql
execute(query)
end

def delete(index_name, document_id)
query = ::Riddle::Query::Delete.new(index_name, document_id).to_sql
execute(query)
end

def soft_delete(index_name, document_id)
update(index_name, {sphinx_deleted: 1}, {id: document_id})
end

def select(values, index_name, where)
query = ::Riddle::Query::Select.
new.
reset_values.
values(values).
from(index_name).
where(where).
to_sql
execute(query).to_a
end

def find_in_batches(index_name, where, options = {})
bound = select("min(sphinx_internal_id) as min_id, max(sphinx_internal_id) as max_id",
index_name,
where).first
return unless bound

min = bound["min_id"].to_i
max = bound["max_id"].to_i
return if max.zero?
batch_size = options.fetch(:batch_size, 1_000)

while min <= max
where[:sphinx_internal_id] = Range.new(min, min + batch_size - 1)
ids = select("sphinx_internal_id", index_name, where).map { |row| row["sphinx_internal_id"].to_i }
yield ids if ids.any?
min += batch_size
end
end
end
end
end
end
end
99 changes: 33 additions & 66 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ def replace(record)

rt_indexes do |index|
if (data = transmitted_data(index, record))
partitions { |partition| sphinx_replace(index.rt_name(partition), data) }
sphinx_soft_delete(index.core_name_w, record.sphinx_document_id) if record.exists_in_sphinx?(index.core_name)
partitions { |partition| ::ThinkingSphinx.replace(index.rt_name(partition), data) }

if record.exists_in_sphinx?(index.core_name)
::ThinkingSphinx.soft_delete(index.core_name_w, record.sphinx_document_id)
end
end
end

Expand All @@ -40,47 +43,58 @@ def delete(record)
return false if write_disabled?

rt_indexes do |index|
partitions { |partition| sphinx_delete(index.rt_name_w(partition), record.sphinx_document_id) }
sphinx_soft_delete(index.core_name_w, record.sphinx_document_id) if record.exists_in_sphinx?(index.core_name)
partitions { |partition| ::ThinkingSphinx.delete(index.rt_name_w(partition), record.sphinx_document_id) }

if record.exists_in_sphinx?(index.core_name)
::ThinkingSphinx.soft_delete(index.core_name_w, record.sphinx_document_id)
end
end

true
end

# Обновление отдельных атрибутов записи
#
# record - ActiveRecord::Base
# data - Hash (:field => :value)
# record - ActiveRecord::Base
# data - Hash (:field => :value)
# options - Hash
# index_name: String (optional, default: first index)
#
# Returns nothing
def update(record, data)
def update(record, data, options = {})
return if write_disabled?

update_fields(data, {:id => record.sphinx_document_id})
update_fields(data, {:id => record.sphinx_document_id}, options)
end

# Обновление отдельных атрибутов индекса по условию
#
# fields - Hash (:field => value)
# where - Hash
# fields - Hash (:field => value)
# where - Hash
# options - Hash
# index_name: String (optional, default: first index)
#
# Returns nothing
def update_fields(fields, where)
def update_fields(fields, where, options = {})
return if write_disabled?

rt_indexes do |index|
if full_reindex?
ids = sphinx_select('sphinx_internal_id', index.name, where).map{ |row| row['sphinx_internal_id'] }
ids.each_slice(500) do |slice_ids|
klass.where(id: slice_ids).each { |record| replace(record) }
end if ids.any?
if full_reindex?
options[:index_name] ||= klass.sphinx_indexes.first.name
::ThinkingSphinx.find_in_batches(options[:index_name], where) do |ids|
klass.where(id: ids).each { |record| replace(record) }
end
else
if options[:index_name]
::ThinkingSphinx.update(options[:index_name], fields, where)
else
sphinx_update(index.name_w, fields, where)
rt_indexes do |index|
::ThinkingSphinx.update(index.name_w, fields, where)
end
end
end
end

protected
private

# Данные, необходимые для записи в индекс сфинкса
#
Expand Down Expand Up @@ -125,8 +139,6 @@ def mva_attributes(index, record)
attrs
end

private

# Итератор по всем rt индексам
#
# Yields ThinkingSphinx::Index
Expand Down Expand Up @@ -164,50 +176,5 @@ def type_cast_to_multi(value)
value
end
end

# Залогировать
#
# message - String
def log(message)
::ActiveSupport::Notifications.instrument('message.thinking_sphinx', :message => message)
end

def config
ThinkingSphinx::Configuration.instance
end

def replication?
config.replication?
end

def sphinx_replace(index_name, data)
query = Riddle::Query::Insert.new(index_name, data.keys, data.values).replace!.to_sql
ThinkingSphinx.execute(query, :on_slaves => replication?)
end

def sphinx_update(index_name, data, where)
query = ::Sphinx::Integration::Extensions::Riddle::Query::Update.new(index_name, data, where).to_sql
ThinkingSphinx.execute(query)
end

def sphinx_delete(index_name, document_id)
query = Riddle::Query::Delete.new(index_name, document_id).to_sql
ThinkingSphinx.execute(query)
end

def sphinx_soft_delete(index_name, document_id)
sphinx_update(index_name, {:sphinx_deleted => 1}, {:id => document_id})
end

def sphinx_select(values, index_name, where)
query = Riddle::Query::Select.
new.
reset_values.
values(values).
from(index_name).
where(where).
to_sql
ThinkingSphinx.execute(query).to_a
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require "spec_helper"

describe ThinkingSphinx do
describe ".replace" do
it do
expect(ThinkingSphinx).
to receive(:execute).with("REPLACE INTO product (`company_id`) VALUES (1)", on_slaves: false)
ThinkingSphinx.replace("product", company_id: 1)
end
end

describe ".update" do
it do
expect(ThinkingSphinx).
to receive(:execute).with("UPDATE product SET company_id = 1 WHERE `id` = 1")

ThinkingSphinx.update("product", {company_id: 1}, id: 1)
end
end

describe ".delete" do
it do
expect(ThinkingSphinx).
to receive(:execute).with("DELETE FROM product WHERE id = 1")

ThinkingSphinx.delete("product", 1)
end
end

describe ".soft_delete" do
it do
expect(ThinkingSphinx).
to receive(:execute).with("UPDATE product SET sphinx_deleted = 1 WHERE `id` = 1")

ThinkingSphinx.soft_delete("product", 1)
end
end

describe ".select" do
it do
expect(ThinkingSphinx).
to receive(:execute).with("SELECT company_id FROM product WHERE `id` = 1")

ThinkingSphinx.select("company_id", "product", id: 1)
end
end

describe ".find_in_batches" do
it do
expect(ThinkingSphinx).to(
receive(:execute).
with("SELECT min(sphinx_internal_id) as min_id, max(sphinx_internal_id) as max_id " +
"FROM product WHERE `company_id` = 1").
and_return([{"min_id" => "1", "max_id" => "2"}])
)

expect(ThinkingSphinx).to(
receive(:execute).
with("SELECT sphinx_internal_id " +
"FROM product WHERE `company_id` = 1 AND `sphinx_internal_id` BETWEEN 1 AND 1000").
and_return([{"sphinx_internal_id" => "1"}, {"sphinx_internal_id" => "2"}])
)

result = []
ThinkingSphinx.find_in_batches("product", company_id: 1) { |ids| result += ids }
expect(result).to eq [1, 2]
end
end
end
17 changes: 9 additions & 8 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
describe '#replace' do
it do
expect(transmitter).to receive(:transmitted_data).and_return(field: 123)
expect(transmitter).to receive(:sphinx_replace).with('model_with_rt_rt0', field: 123)
expect(transmitter).to receive(:sphinx_soft_delete)
expect(ThinkingSphinx).to receive(:replace).with('model_with_rt_rt0', field: 123)
expect(ThinkingSphinx).to receive(:soft_delete)
end
after { transmitter.replace(record) }
end

describe '#delete' do
it do
expect(transmitter).to receive(:sphinx_delete).with('model_with_rt_rt0', 1)
expect(transmitter).to receive(:sphinx_soft_delete)
expect(ThinkingSphinx).to receive(:delete).with('model_with_rt_rt0', 1)
expect(ThinkingSphinx).to receive(:soft_delete)
end
after { transmitter.delete(record) }
end
Expand All @@ -41,18 +41,19 @@
describe '#update_fields' do
context 'when full reindex' do
before { transmitter.stub(:full_reindex? => true) }

it do
expect(transmitter).to receive(:sphinx_select).and_return([{'sphinx_internal_id' => 123}])
expect(ModelWithRt).to receive(:where).with(:id => [123]).and_return([record])
expect(transmitter).to receive(:replace).with(record)
expect(ThinkingSphinx).to receive(:find_in_batches).with("model_with_rt", id: 1).and_yield([1])
end

after { transmitter.update_fields({:field => 123}, {:id => 1}) }
end

context 'when no full reindex' do
it do
expect(transmitter).to receive(:sphinx_update)
expect(ThinkingSphinx).to receive(:update)
end

after { transmitter.update_fields({:field => 123}, {:id => 1}) }
end
end
Expand Down

0 comments on commit 5ffd81c

Please sign in to comment.