Skip to content

Commit

Permalink
feature(transmitter): replase command direct on slaves
Browse files Browse the repository at this point in the history
  • Loading branch information
bibendi committed Sep 30, 2013
1 parent 584e5d3 commit 19c6abe
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 19 deletions.
94 changes: 81 additions & 13 deletions lib/sphinx/integration/mysql/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,55 @@
require 'innertube'

module Sphinx::Integration::Mysql::ConnectionPool
def self.new_connection
configuration = ThinkingSphinx::Configuration.instance.configuration

def self.address_prepared(address)
address = address || '127.0.0.1'
# If you use localhost, MySQL insists on a socket connection, but Sphinx
# requires a TCP connection. Using 127.0.0.1 fixes that.
address = configuration.searchd.address || '127.0.0.1'
address = '127.0.0.1' if address == 'localhost'
address
end

def self.port_prepared(port)
port.is_a?(TrueClass) ? 9306 : port
end

port = configuration.searchd.mysql41
port = 9306 if port.is_a?(TrueClass)
def self.master_connection
configuration = ThinkingSphinx::Configuration.instance.configuration

options = {
:host => address,
:port => port
:host => address_prepared(configuration.searchd.address),
:port => port_prepared(configuration.searchd.mysql41),
:reconnect => true
}

Sphinx::Integration::Mysql::Connection.new address, options[:port], options
Sphinx::Integration::Mysql::Connection.new options[:host], options[:port], options
end

def self.slave_connection(agent_name)
ts_config = ThinkingSphinx::Configuration.instance
agent = ts_config.agents[agent_name]

options = {
:host => address_prepared(agent[:address]),
:port => port_prepared(agent[:mysql41]),
:reconnect => true
}

Sphinx::Integration::Mysql::Connection.new options[:host], options[:port], options
end

def self.master_pool
@master_pool ||= Innertube::Pool.new(
Proc.new { Sphinx::Integration::Mysql::ConnectionPool.master_connection },
Proc.new { |connection| connection.close }
)
end

def self.pool
@pool ||= Innertube::Pool.new(
Proc.new { Sphinx::Integration::Mysql::ConnectionPool.new_connection },
def self.agents_pool(agent_name)
@agents_pool ||= {}
@agents_pool[agent_name] ||= Innertube::Pool.new(
Proc.new { Sphinx::Integration::Mysql::ConnectionPool.slave_connection(agent_name) },
Proc.new { |connection| connection.close }
)
end
Expand All @@ -31,12 +59,12 @@ def self.take
retries = 0
original = nil
begin
pool.take do |connection|
master_pool.take do |connection|
begin
yield connection
rescue Mysql2::Error => error
original = error
if error.message =~ /query error/
if error.message =~ /(parse|syntax|query) error/
raise error
else
raise Innertube::Pool::BadResource
Expand All @@ -49,4 +77,44 @@ def self.take
raise original
end
end

def self.take_slaves(silent = true)
threads = []

ThinkingSphinx::Configuration.instance.agents.each do |agent_name, _|
threads << Thread.new do

retries = 0
original = nil

begin
agents_pool(agent_name).take do |connection|
begin
yield connection
rescue Mysql2::Error => error
original = error
if error.message =~ /(parse|syntax|query) error/
raise error
else
raise Innertube::Pool::BadResource
end
end
end
rescue Mysql2::Error => e
if silent
Rails.logger.warn e.message
else
raise e
end
rescue Innertube::Pool::BadResource
retries += 1
retry if retries < 2
raise original
end

end
end

threads.each { |t| t.join }
end
end
23 changes: 17 additions & 6 deletions lib/sphinx/integration/transmitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def initialize(klass)
def replace(record)
rt_indexes do |index|
if (data = transmitted_data(index, record))
exec_replace(index.rt_name_w, data)
exec_replace(index.rt_name, data)
exec_soft_delete(index.core_name_w, record.sphinx_document_id) if record.exists_in_sphinx?(index.core_name)
exec_replace(index.delta_rt_name_w, data) if write_delta?
exec_replace(index.delta_rt_name, data) if write_delta?
end
end
end
Expand Down Expand Up @@ -165,14 +165,25 @@ def write_delta?
# query - String
#
# Returns Mysql2::Result
def execute(query)
log(query)
ThinkingSphinx.take_connection{ |c| c.execute(query) }
def execute(query, options = {})
result = nil
::ThinkingSphinx::Search.log(query) do
if options[:on_slaves]
::Sphinx::Integration::Mysql::ConnectionPool.take_slaves do |connection|
connection.execute(query)
end
else
::Sphinx::Integration::Mysql::ConnectionPool.take do |connection|
result = connection.execute(query)
end
end
end
result
end

def exec_replace(index_name, data)
query = Riddle::Query::Insert.new(index_name, data.keys, data.values).replace!.to_sql
execute(query)
execute(query, :on_slaves => true)
end

def exec_update(index_name, data, where)
Expand Down

0 comments on commit 19c6abe

Please sign in to comment.