From 75ac8eb23e1f705030d96a09ae22520f6e54c897 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Thu, 20 Aug 2020 17:14:24 -0400 Subject: [PATCH] Add test for ordered pipeline flushing fix --- .../spec/logstash/java_pipeline_spec.rb | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index ac7f0f2099e..9181dac67e8 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -776,8 +776,9 @@ def flush(options) end context "Periodic Flush" do - let(:config) do - <<-EOS + shared_examples 'it flushes correctly' do + let(:config) do + <<-EOS input { dummy_input {} } @@ -787,37 +788,48 @@ def flush(options) output { dummy_output {} } - EOS - end - let(:output) { ::LogStash::Outputs::DummyOutput.new } - - before do - allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output) - allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput) - allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic) - allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput) - allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain) - end + EOS + end + let(:output) { ::LogStash::Outputs::DummyOutput.new } - it "flush periodically" do - Thread.abort_on_exception = true - pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - Timeout.timeout(timeout) do - pipeline.start + before do + allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(output) + allow(LogStash::Plugin).to receive(:lookup).with("input", "dummy_input").and_return(LogStash::Inputs::DummyBlockingInput) + allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummy_flushing_filter").and_return(DummyFlushingFilterPeriodic) + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummy_output").and_return(::LogStash::Outputs::DummyOutput) + allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain) end - Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do - wait(10).for do - # give us a bit of time to flush the events - output.events.empty? - end.to be_falsey + + it "flush periodically" do + Thread.abort_on_exception = true + pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) + Timeout.timeout(timeout) do + pipeline.start + end + Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do + wait(10).for do + # give us a bit of time to flush the events + output.events.empty? + end.to be_falsey + end + + expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) + + pipeline.shutdown end - expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) + end - pipeline.shutdown + it_behaves_like 'it flushes correctly' + + context 'with pipeline ordered' do + before do + pipeline_settings_obj.set("pipeline.workers", 1) + pipeline_settings_obj.set("pipeline.ordered", true) + end + it_behaves_like 'it flushes correctly' end end - context "Periodic Flush that intermittently returns nil" do let(:config) do <<-EOS