Skip to content

Commit

Permalink
Add test for ordered pipeline flushing fix
Browse files Browse the repository at this point in the history
  • Loading branch information
robbavey authored and colinsurprenant committed Aug 26, 2020
1 parent dfc7e51 commit 75ac8eb
Showing 1 changed file with 38 additions and 26 deletions.
64 changes: 38 additions & 26 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,9 @@ def flush(options)
end

context "Periodic Flush" do
let(:config) do
<<-EOS
shared_examples 'it flushes correctly' do
let(:config) do
<<-EOS
input {
dummy_input {}
}
Expand All @@ -787,37 +788,48 @@ def flush(options)
output {
dummy_output {}
}
EOS
end
let(:output) { ::LogStash::Outputs::DummyOutput.new }

before do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
end
EOS
end
let(:output) { ::LogStash::Outputs::DummyOutput.new }

it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
Timeout.timeout(timeout) do
pipeline.start
before do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output)
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput)
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
# give us a bit of time to flush the events
output.events.empty?
end.to be_falsey

it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
Timeout.timeout(timeout) do
pipeline.start
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
# give us a bit of time to flush the events
output.events.empty?
end.to be_falsey
end

expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)

pipeline.shutdown
end

expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
end

pipeline.shutdown
it_behaves_like 'it flushes correctly'

context 'with pipeline ordered' do
before do
pipeline_settings_obj.set("pipeline.workers", 1)
pipeline_settings_obj.set("pipeline.ordered", true)
end
it_behaves_like 'it flushes correctly'
end
end

context "Periodic Flush that intermittently returns nil" do
let(:config) do
<<-EOS
Expand Down

0 comments on commit 75ac8eb

Please sign in to comment.