Skip to content

Commit

Permalink
feature: gracefull rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
isqad committed Jan 13, 2021
1 parent ea216dd commit 7a32189
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 43 deletions.
12 changes: 7 additions & 5 deletions Appraisals
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
appraise 'rails4.0' do
gem 'rails', '~> 4.0.0'
end
if RUBY_VERSION < '2.5'
appraise 'rails4.0' do
gem 'rails', '~> 4.0.0'
end

appraise 'rails4.1' do
gem 'rails', '~> 4.1.0'
appraise 'rails4.1' do
gem 'rails', '~> 4.1.0'
end
end

appraise 'rails4.2' do
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ if RUBY_VERSION < '2.5'
end

gem 'pry', '< 0.13.0', require: false
gem 'rspec-rails', '< 4.0.0', require: false
gem 'rspec-rails', '~> 3.9.1', require: false
2 changes: 1 addition & 1 deletion dip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ interaction:

clean:
service: app
command: rm -f Gemfile.lock gemfiles/*.gemfile.*
command: rm -rf Gemfile.lock gemfiles/

provision:
- docker volume create --name bundler_data
Expand Down
6 changes: 1 addition & 5 deletions lib/sphinx/integration/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ class Helper
end
end

DEFAULT_ROTATION_TIME = 10.second

def initialize(options = {})
::ThinkingSphinx.context.define_indexes

Expand Down Expand Up @@ -64,9 +62,7 @@ def index
index.indexing(need_lock: rotate_index) do
index.switch_rt if rotate_index

@sphinx.index(index.core_name)

sleep(index.local_options[:rotation_time] || DEFAULT_ROTATION_TIME) if rotate_index
@sphinx.index(index)

index.last_indexing_time.write
end
Expand Down
8 changes: 5 additions & 3 deletions lib/sphinx/integration/helper_adapters/local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,24 @@ def restart
def clean
remove_files("#{config.searchd_file_path}/*")
return unless config.configuration.searchd.binlog_path.present?

remove_files("#{config.configuration.searchd.binlog_path}/*")
end

def copy_config
return if config.config_file == config.generated_config_file

FileUtils.mkdir_p(File.dirname(config.config_file))
FileUtils.cp(config.generated_config_file, config.config_file)
end

def index(index_name)
def index(idx)
FileUtils.mkdir_p(config.searchd_file_path)

if rotate?
indexer("--rotate", index_name)
indexer('--rotate', idx.core_name)
else
indexer(index_name)
indexer(idx.core_name)
end
end

Expand Down
67 changes: 55 additions & 12 deletions lib/sphinx/integration/helper_adapters/remote.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def logger
end

class Remote < Base
AVG_ROTATION_TIME = 10
AVG_CLOSE_CONNECTIONS_TIME = 10
private_constant :AVG_ROTATION_TIME, :AVG_CLOSE_CONNECTIONS_TIME

def initialize(*)
super

Expand All @@ -98,11 +102,11 @@ def start
end

def suspend
set_servers_availability(false)
hosts.each { |host| disable_host(host) }
end

def resume
set_servers_availability(true)
hosts.each { |host| enable_host(host) }
end

def restart
Expand All @@ -117,6 +121,7 @@ def restart
def clean
remove_files("#{config.searchd_file_path}/*")
return unless config.configuration.searchd.binlog_path.present?

remove_files("#{config.configuration.searchd.binlog_path}/*")
end

Expand All @@ -126,18 +131,46 @@ def copy_config
@ssh.file_upload(sql_file.to_s, config.configuration.searchd.sphinxql_state) if sql_file.exist?
end

def index(index_name)
exec_indexer(index_name)
def index(idx)
exec_indexer(idx.core_name)
copy_indexes if hosts.many?
reload if rotate?

reload(idx) if rotate?
end

def reload
@ssh.execute("kill", "-SIGHUP `cat #{config.configuration.searchd.pid_file}`")
# Public: gentle rotation indexes
#
# Returns nil
def reload(idx)
logger.info "Rotation #{idx.core_name}"
local_rotatition_time = idx.local_options[:rotation_time]

# behaviour by default
return sighup if local_rotatition_time.blank? || hosts.size == 1

hosts.each do |host|
begin
disable_host(host)

@ssh.within(host) { sighup }

sleep(local_rotatition_time)
ensure
enable_host(host)
end
end
end

private

def sighup
logger.info 'Sending SIGHUP to process. Waiting rotation...'

@ssh.execute('kill', "-SIGHUP `cat #{config.configuration.searchd.pid_file}`")

sleep(AVG_ROTATION_TIME)
end

def indexer_args
args = ["--config #{config.config_file}"]
args.concat(%w(--rotate --nohup)) if rotate?
Expand Down Expand Up @@ -166,6 +199,7 @@ def copy_indexes

def hosts
return @hosts if @hosts

@hosts = Array.wrap(config.address)
@hosts = @hosts.select { |host| @options[:host] == host } if @options[:host].presence
@hosts
Expand All @@ -179,11 +213,20 @@ def reindex_host
@reindex_host ||= hosts.first
end

def set_servers_availability(value)
hosts.each do |host|
config.client.class.server_pool.find_server(host).server_status.available = value
config.mysql_client.server_pool.find_server(host).server_status.available = value
end
def disable_host(host)
logger.info "Disable host #{host}"

config.client.class.server_pool.find_server(host).server_status.available = false
config.mysql_client.server_pool.find_server(host).server_status.available = false

sleep(AVG_CLOSE_CONNECTIONS_TIME)
end

def enable_host(host)
logger.info "Enable host #{host}"

config.client.class.server_pool.find_server(host).server_status.available = true
config.mysql_client.server_pool.find_server(host).server_status.available = true
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sphinx/integration/spec/support/thinking_sphinx.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def reindex(opts = {})
end

def remote?
::ThinkingSphinx::Configuration.instance.remote?
::ThinkingSphinx::Configuration.instance.remote?
end
end
end
Expand Down Expand Up @@ -61,4 +61,4 @@ def with_sphinx(tables = nil)
context.use_transactional_fixtures = true
end
end
end
end
4 changes: 2 additions & 2 deletions spec/sphinx/integration/helper_adapters/local_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@

it do
expect(rye).to receive(:shell).with(:indexer, /--config/, "--rotate", 'index_name')
adapter.index('index_name')
adapter.index(double('Index', core_name: 'index_name'))
end
end

context "when is offline" do
it do
expect(rye).to receive(:shell).with(:indexer, /--config/, 'index_name')
adapter.index('index_name')
adapter.index(double('Index', core_name: 'index_name'))
end
end
end
Expand Down
29 changes: 21 additions & 8 deletions spec/sphinx/integration/helper_adapters/remote_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
let(:adapter) { described_class.new }

before do
allow_any_instance_of(Sphinx::Integration::HelperAdapters::Remote).to receive(:sleep)
class_double("Sphinx::Integration::HelperAdapters::SshProxy", new: ssh).as_stubbed_const
end

Expand Down Expand Up @@ -73,17 +74,29 @@
end

it do
expect(ssh).to receive(:within).with("s1.dev").and_yield
expect(ssh).to receive(:within) do |args|
expect(args).to match(/s\d\.dev/)
end.at_least(:twice).and_yield

expect(ssh).to receive(:execute).
with("indexer", "--config /path/sphinx.conf", "--rotate", "--nohup", 'index_name', exit_status: [0, 2])
expect(ssh).to receive(:execute).

expect(ssh).to receive(:execute).
with('for NAME in /path/data/*_core.tmp.*; do mv -f "${NAME}" "${NAME/\.tmp\./.new.}"; done')
server = double("server", opts: {port: 22}, user: "sphinx", host: "s1.dev")
expect(ssh).to receive(:without).with("s1.dev").and_yield(server)
expect(ssh).to receive(:execute).with("rsync", "-ptzv", "--bwlimit=70M", "--compress-level=1", any_args)
expect(ssh).to receive(:execute).with("kill", /SIGHUP/)

adapter.index('index_name')
expect(ssh).to receive(:execute).with("kill", /SIGHUP/).twice

expect(adapter).to receive(:disable_host) do |args|
expect(args).to match(/s\d\.dev/)
end.at_least(:twice)
expect(adapter).to receive(:enable_host) do |args|
expect(args).to match(/s\d\.dev/)
end.at_least(:twice)

adapter.index(double('Index', core_name: 'index_name', local_options: {rotation_time: 5 * 60}))
end
end

Expand All @@ -95,7 +108,7 @@
end

it do
expect(ssh).to receive(:within).with("s1.dev").and_yield
expect(ssh).to receive(:within).with('s1.dev').and_yield
expect(ssh).to receive(:execute).
with("indexer", "--config /path/sphinx.conf", "--rotate", "--nohup", 'index_name', exit_status: [0, 2])
expect(ssh).to receive(:execute).
Expand All @@ -104,7 +117,7 @@
expect(ssh).to_not receive(:execute).with("rsync", any_args)
expect(ssh).to receive(:execute).with("kill", /SIGHUP/)

adapter.index('index_name')
adapter.index(double('Index', core_name: 'index_name', local_options: {rotation_time: 5 * 60}))
end
end
end
Expand All @@ -127,7 +140,7 @@
expect(ssh).to receive(:execute).with("rsync", "-ptzv", "--bwlimit=70M", "--compress-level=1", any_args)
expect(ssh).to_not receive(:execute).with("kill", /SIGHUP/)

adapter.index('index_name')
adapter.index(double('Index', core_name: 'index_name', local_options: {rotation_time: 5 * 60}))
end
end

Expand All @@ -147,7 +160,7 @@
expect(ssh).to_not receive(:execute).with("rsync", any_args)
expect(ssh).to_not receive(:execute).with("kill", /SIGHUP/)

adapter.index('index_name')
adapter.index(double('Index', core_name: 'index_name', local_options: {rotation_time: 5 * 60}))
end
end
end
Expand Down
13 changes: 9 additions & 4 deletions spec/sphinx/integration/helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
helper = described_class.new(default_options.merge(rotate: true, indexes: 'model_with_rt'))
expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to receive(:reset)
expect_any_instance_of(RedisMutex).to receive(:with_lock).and_yield
expect(adapter).to receive(:index).with('model_with_rt_core')
expect(helper).to receive(:sleep).with(60)
expect(adapter).to receive(:index) do |args|
expect(args.core_name).to eq('model_with_rt_core')
end
expect(::ThinkingSphinx::Configuration.instance.mysql_client).
to receive(:write).with('TRUNCATE RTINDEX model_with_rt_rt0')
expect(::Sphinx::Integration::ReplayerJob).to receive(:enqueue).with('model_with_rt_core')
Expand All @@ -32,7 +33,9 @@
helper = described_class.new(default_options.merge(indexes: 'model_with_rt'))
expect_any_instance_of(Sphinx::Integration::Mysql::Replayer).to_not receive(:reset)
expect_any_instance_of(RedisMutex).to_not receive(:with_lock)
expect(adapter).to receive(:index).with('model_with_rt_core')
expect(adapter).to receive(:index) do |args|
expect(args.core_name).to eq('model_with_rt_core')
end
expect(::ThinkingSphinx::Configuration.instance.mysql_client).to_not receive(:write)
expect(::Sphinx::Integration::ReplayerJob).to_not receive(:enqueue)
helper.index
Expand All @@ -43,7 +46,9 @@
context "when only core indexing" do
it do
helper = described_class.new(default_options.merge(indexes: 'model_with_second_disk'))
expect(adapter).to receive(:index).with('model_with_second_disk_core')
expect(adapter).to receive(:index) do |args|
expect(args.core_name).to eq('model_with_second_disk_core')
end
expect(::ThinkingSphinx::Configuration.instance.mysql_client).to_not receive(:write)
expect(::Sphinx::Integration::ReplayerJob).to_not receive(:enqueue).with('model_with_second_disk_core')
helper.index
Expand Down

0 comments on commit 7a32189

Please sign in to comment.