Skip to content

Commit

Permalink
Merge pull request #58 from bibendi/master
Browse files Browse the repository at this point in the history
fix: find in batches on steroids
  • Loading branch information
bibendi committed Aug 10, 2015
2 parents aba6466 + cd1e7bd commit f6c7f1d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def select(values, index_name, where, limit = nil)

def find_in_batches(index_name, options = {})
primary_key = options.fetch(:primary_key, "sphinx_internal_id").to_s.freeze
batch_size = options.fetch(:batch_size, 3_000)
batch_size = options.fetch(:batch_size, 1_000)
batch_order = "#{primary_key} ASC"
where = options.fetch(:where, {})
where[primary_key.to_sym] = -> { "> 0" }
Expand All @@ -46,7 +46,8 @@ def find_in_batches(index_name, options = {})
from(index_name).
where(where).
order_by(batch_order).
limit(batch_size)
limit(batch_size).
matching(options[:matching])

records = execute(query.to_sql).to_a
while records.any?
Expand Down
9 changes: 7 additions & 2 deletions lib/sphinx/integration/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def initialize(node = nil)

@node = ActiveSupport::StringInquirer.new(node)
@logger = ::Logger.new(STDOUT)
@logger.formatter = ::Logger::Formatter.new
log "Sphinx Helper initialized with node: #{node}"

init_ssh if config.remote?
end
Expand Down Expand Up @@ -342,9 +344,12 @@ def reset_waste_records

def cleanup_waste_records
log "Cleanup waste records"
log "sleep 120 sec"
sleep 120
rt_indexes do |index|
log "- #{index.name}"
Sphinx::Integration::WasteRecords.for(index).cleanup
waste_records = Sphinx::Integration::WasteRecords.for(index)
log "- #{index.name} (#{waste_records.size} records)"
waste_records.cleanup
end
end

Expand Down
6 changes: 5 additions & 1 deletion lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ def update_fields(fields, where)
partitions { |i| ThinkingSphinx.update(index.rt_name_w(i), fields, where) }

# и зареплейсим всё что осталось в core
ThinkingSphinx.find_in_batches(index.core_name, where: where) do |ids|
# TODO: implement sphinx transactions
matching = where.delete(:matching)
batch_options = {where: where, matching: matching}
ThinkingSphinx.find_in_batches(index.core_name, batch_options) do |ids|
klass.where(id: ids).each { |record| transmit(index, record) }
sleep 1 # empirical number
end
else
ThinkingSphinx.update(index.name_w, fields, where)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,27 @@
expect(ThinkingSphinx).to(
receive(:execute).
with("SELECT sphinx_internal_id " +
"FROM product WHERE `company_id` = 1 AND `sphinx_internal_id` > 0 " +
"FROM product " +
"WHERE MATCH('@company_id_idx 1') " +
"AND `company_id` = 1 AND `sphinx_internal_id` > 0 " +
"ORDER BY `sphinx_internal_id` ASC LIMIT 1").
and_return([{"sphinx_internal_id" => "1"}])
)

expect(ThinkingSphinx).to(
receive(:execute).
with("SELECT sphinx_internal_id " +
"FROM product WHERE `company_id` = 1 AND `sphinx_internal_id` > 1 " +
"FROM product " +
"WHERE MATCH('@company_id_idx 1') " +
"AND `company_id` = 1 AND `sphinx_internal_id` > 1 " +
"ORDER BY `sphinx_internal_id` ASC LIMIT 1").
and_return([])
)

result = []
ThinkingSphinx.find_in_batches("product", where: {company_id: 1}, batch_size: 1) { |ids| result += ids }
ThinkingSphinx.find_in_batches(
"product",
where: {company_id: 1}, matching: "@company_id_idx 1", batch_size: 1) { |ids| result += ids }
expect(result).to eq [1]
end
end
Expand Down
11 changes: 7 additions & 4 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@
before { transmitter.stub(:full_reindex? => true) }

it do
expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt0", {field: 123}, id: 1)
expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt1", {field: 123}, id: 1)
expect(ThinkingSphinx).to receive(:find_in_batches).with("model_with_rt_core", where: {id: 1}).and_yield([1])
expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt0", {field: 123}, id: 1, matching: "@id_idx 1")
expect(ThinkingSphinx).to receive(:update).with("model_with_rt_rt1", {field: 123}, id: 1, matching: "@id_idx 1")
expect(ThinkingSphinx).
to receive(:find_in_batches).
with("model_with_rt_core", where: {id: 1}, matching: "@id_idx 1").
and_yield([1])
end

after { transmitter.update_fields({field: 123}, id: 1) }
after { transmitter.update_fields({field: 123}, id: 1, matching: "@id_idx 1") }
end

context 'when no full reindex' do
Expand Down

0 comments on commit f6c7f1d

Please sign in to comment.