Skip to content

Commit

Permalink
indexation with truncate
Browse files Browse the repository at this point in the history
  • Loading branch information
bibendi committed Apr 12, 2013
1 parent 57ba0d0 commit 91806b2
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 43 deletions.
2 changes: 1 addition & 1 deletion lib/sphinx/integration.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Sphinx
module Integration
autoload :SphinxHelper, 'sphinx/integration/sphinx_helper'
autoload :Helper, 'sphinx/integration/helper'
autoload :Mysql, 'sphinx/integration/mysql'
autoload :Transmitter, 'sphinx/integration/transmitter'
autoload :FastFacet, 'sphinx/integration/fast_facet'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ module TransmitterCallbacks
extend ActiveSupport::Concern

included do
delegate :create, :destroy, :update, :to => :transmitter, :prefix => true
after_commit :transmitter_replace, :on => :save
delegate :replace, :delete, :to => :transmitter, :prefix => true
after_commit :transmitter_replace, :on => :create
after_commit :transmitter_replace, :on => :update
after_commit :transmitter_delete, :on => :destroy
end

Expand Down
25 changes: 18 additions & 7 deletions lib/sphinx/integration/extensions/thinking_sphinx/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Sphinx::Integration::Extensions::ThinkingSphinx::Index
attr_accessor :merged_with_core, :is_core_index
alias_method_chain :to_riddle, :merged
alias_method_chain :to_riddle_for_distributed, :merged
alias_method_chain :all_names, :rt
end

def to_riddle_with_merged(offset)
Expand Down Expand Up @@ -45,14 +46,14 @@ def to_riddle_for_rt(delta = false)
when :multi then :rt_attr_multi
end

begin
#begin
index.send(attr_type) << attr.unique_name
rescue
puts rt_name
puts attr_type.inspect
puts attr.type.inspect
raise
end
#rescue
# puts rt_name
# puts attr_type.inspect
# puts attr.type.inspect
# raise
#end
end
index
end
Expand Down Expand Up @@ -91,6 +92,16 @@ def merged_indexes
model.sphinx_indexes.select { |i| i.merged_with_core? }
end

def all_names_with_rt
if rt?
names = [core_name, rt_name, delta_rt_name]
else
names = [core_name]
end

names
end

def single_query_sql
@single_query_sql ||= sources.
first.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# coding: utf-8
module Sphinx::Integration
class SphinxHelper
class Helper
module ClassMethods
def remote_sphinx?
sphinx_addr = ThinkingSphinx::Configuration.instance.address
Expand Down Expand Up @@ -54,7 +54,6 @@ def running_start
end

def index(online = true)
# TODO: replace all Blocker.full_reindex
Redis::Mutex.with_lock(:full_reindex, :expire => 3.hours) do
if remote_sphinx?
run_command("#{Rails.root}/script/sphinx --reindex-offline") unless online
Expand All @@ -65,18 +64,22 @@ def index(online = true)
end
end

attach_rt if online
prepare_rt if online
end
alias_method :reindex, :index

# Заполнить rt индексы из дисковых индексов
def attach_rt
ThinkingSphinx.context.indexed_models.select(&:rt_indexed_by_sphinx?).each do |model_name|
# Очистить и Заполнить rt индексы
def prepare_rt(only_index = nil)
ThinkingSphinx.context.indexed_models.each do |model_name|
model = model_name.constantize
next unless model.rt_indexed_by_sphinx?

model.sphinx_indexes.each do |index|
# атачим rt индексы
query = "TRUNCATE RTINDEX #{index.rt_name}; ATTACH INDEX #{index.core_name} TO RTINDEX #{index.rt_name};"
ThinkingSphinx.take_connection{ |c| c.execute(query) }
next if only_index && only_index != index.name
# очистим rt индексы
ThinkingSphinx.take_connection do |c|
c.execute("TRUNCATE RTINDEX #{index.rt_name}")
end

# после этого нужно накатить дельту на основной rt индекс
# просто за атачить её нельзя, смёрджить тоже нельзя, поэтому будем апдейтить по одной
Expand All @@ -98,18 +101,16 @@ def configure
end

def rebuild
Redis::Mutex.with_lock(:full_reindex, :expire => 3.hours) do
configure

if remote_sphinx?
run_command("#{Rails.root}/script/sphinx --copy-config #{config_file}")
end
configure

stop if sphinx_running?
index(false)
start
attach_rt
if remote_sphinx?
run_command("#{Rails.root}/script/sphinx --copy-config #{config_file}")
end

stop if sphinx_running?
index(false)
start
prepare_rt
end

def full_reindex?
Expand Down
9 changes: 6 additions & 3 deletions lib/sphinx/integration/mysql/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ def self.take
begin
yield connection
rescue Mysql2::Error => error
original = ThinkingSphinx::SphinxError.new_from_mysql error
raise original if original.is_a?(ThinkingSphinx::QueryError)
raise Innertube::Pool::BadResource
original = error
if error.message =~ /query error/
raise error
else
raise Innertube::Pool::BadResource
end
end
end
rescue Innertube::Pool::BadResource
Expand Down
2 changes: 1 addition & 1 deletion lib/sphinx/integration/tasks.rake
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ namespace :thinking_sphinx do
end

wrap_task :rebuild do |orig_block|
Sphinx::Integration::Helper.reindex
Sphinx::Integration::Helper.rebuild
end
end
22 changes: 14 additions & 8 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,31 @@ def initialize(record)
def replace
rt_indexes do |index|
data = transmitted_data(index)
query = Riddle::Query::Insert.new(index.rt_name, data.keys, data.values)
ThinkingSphinx.take_connection{ |c| c.execute(query.replace!.to_sql) }
query = Riddle::Query::Insert.new(index.rt_name, data.keys, data.values).replace!.to_sql
ThinkingSphinx.take_connection{ |c| c.execute(query) }

query = "UPDATE #{index.core_name} SET sphinx_deleted = 1 WHERE id = #{record.sphinx_document_id}"
ThinkingSphinx.take_connection{ |c| c.execute(query) }

if Redis::Mutex.new(:full_reindex).locked?
query = Riddle::Query::Insert.new(index.delta_rt_name, data.keys, data.values)
ThinkingSphinx.take_connection{ |c| c.execute(query.replace!.to_sql) }
query = Riddle::Query::Insert.new(index.delta_rt_name, data.keys, data.values).replace!.to_sql
ThinkingSphinx.take_connection{ |c| c.execute(query) }
end
end
end

# Удаляет запись из сфинкса
def delete
rt_indexes do |index|
query = Riddle::Query::Delete.new(index.rt_name, record.sphinx_document_id)
ThinkingSphinx.take_connection{ |c| c.execute(query.to_sql) }
query = Riddle::Query::Delete.new(index.rt_name, record.sphinx_document_id).to_sql
ThinkingSphinx.take_connection{ |c| c.execute(query) }

query = "UPDATE #{index.core_name} SET sphinx_deleted = 1 WHERE id = #{record.sphinx_document_id}"
ThinkingSphinx.take_connection{ |c| c.execute(query) }

if Redis::Mutex.new(:full_reindex).locked?
query = Riddle::Query::Delete.new(index.delta_rt_name, record.sphinx_document_id)
ThinkingSphinx.take_connection{ |c| c.execute(query.to_sql) }
query = Riddle::Query::Delete.new(index.delta_rt_name, record.sphinx_document_id).to_sql
ThinkingSphinx.take_connection{ |c| c.execute(query) }
end
end
end
Expand Down
15 changes: 15 additions & 0 deletions spec/sphinx/integration/extensions/thinking_sphinx/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,19 @@
its(:rt_attr_uint){ should eql [:sphinx_internal_id, :sphinx_deleted, :class_crc, :region_id] }
end

describe '#all_names' do
context 'when rt' do
it 'returns rt index names' do
index.all_names.should == [index.core_name, index.rt_name, index.delta_rt_name]
end
end

context 'when disk' do
it 'returns core index names' do
index.stub(:rt?).and_return(false)
index.all_names.should == [index.core_name]
end
end
end

end
10 changes: 8 additions & 2 deletions spec/sphinx/integration/transmitter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,23 @@

subject { record.transmitter }

let(:delete_core_sql){ 'UPDATE model_with_rt_core SET sphinx_deleted = 1 WHERE id = %s' }

context 'when destroy' do
let(:delete_sql){ 'DELETE FROM model_with_rt_rt WHERE id = %s' }
let(:delete_delta_sql){ 'DELETE FROM model_with_rt_delta_rt WHERE id = %s' }

it 'delete in rt' do
connection.should_receive(:execute).with(delete_sql % record.sphinx_document_id).once
connection.should_receive(:execute).with(delete_sql % record.sphinx_document_id).ordered
connection.should_receive(:execute).with(delete_core_sql % record.sphinx_document_id).ordered
subject.delete
end

context 'when full reindex' do
it 'delete in delta_rt' do
Redis::Mutex.with_lock(:full_reindex, :expire => 3.hours) do
connection.should_receive(:execute).with(delete_sql % record.sphinx_document_id).ordered
connection.should_receive(:execute).with(delete_core_sql % record.sphinx_document_id).ordered
connection.should_receive(:execute).with(delete_delta_sql % record.sphinx_document_id).ordered
subject.delete
end
Expand All @@ -35,14 +39,16 @@

context 'when save' do
it 'replace in rt' do
connection.should_receive(:execute).with(RSpec::Mocks::ArgumentMatchers::RegexpMatcher.new(/^REPLACE INTO model_with_rt_rt/)).once
connection.should_receive(:execute).with(RSpec::Mocks::ArgumentMatchers::RegexpMatcher.new(/^REPLACE INTO model_with_rt_rt/)).ordered
connection.should_receive(:execute).with(delete_core_sql % record.sphinx_document_id).ordered
subject.replace
end

context 'when full reindex' do
it 'replace in delta_rt' do
Redis::Mutex.with_lock(:full_reindex, :expire => 3.hours) do
connection.should_receive(:execute).with(RSpec::Mocks::ArgumentMatchers::RegexpMatcher.new(/^REPLACE INTO model_with_rt_rt/)).ordered
connection.should_receive(:execute).with(delete_core_sql % record.sphinx_document_id).ordered
connection.should_receive(:execute).with(RSpec::Mocks::ArgumentMatchers::RegexpMatcher.new(/^REPLACE INTO model_with_rt_delta_rt/)).ordered
subject.replace
end
Expand Down

0 comments on commit 91806b2

Please sign in to comment.