diff --git a/.travis.yml b/.travis.yml index 7c0bae47f..c29d31c26 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +sudo: false language: go go: - 1.4 @@ -5,11 +6,13 @@ notifications: irc: channels: - "irc.mozilla.org#heka" -before_install: - - sudo apt-get install protobuf-compiler cmake libgeoip-dev - +addons: + apt: + packages: + - protobuf-compiler + - cmake + - libgeoip-dev install: - . build.sh - script: - make test diff --git a/CHANGES.txt b/CHANGES.txt index 01f5e4580..e86bbe984 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -47,6 +47,9 @@ Bug Handling * Fixed bug where a SandboxInput configured with a `ticker_interval` would get stuck in an infinite loop on shutdown (#1705). +* Changes in StatAccumInput and FileOutput tests to minimize intermittent + Travis failures. + 0.10.0b1 (2015-08-07) ===================== diff --git a/docs/source/config/common_sandbox_parameter.rst b/docs/source/config/common_sandbox_parameter.rst index 8a27c7307..3b655db48 100644 --- a/docs/source/config/common_sandbox_parameter.rst +++ b/docs/source/config/common_sandbox_parameter.rst @@ -16,12 +16,12 @@ Sandbox plugins. The are consumed by Heka when it initializes the plugin. - preserve_data (bool): True if the sandbox global data should be preserved/restored on plugin shutdown/startup. When true this works in conjunction with a global Lua - _PRESERVATION_VERSION variable which is examined during restoration; - if the previous version does not match the current version the restoration - will be aborted and the sandbox will start cleanly. _PRESERVATION_VERSION - should be incremented any time an incompatible change is made to the global - data schema. If no version is set the check will always succeed and a - version of zero is assumed. + _PRESERVATION_VERSION variable which is examined during restoration; if the + previous version does not match the current version the restoration will be + aborted and the sandbox will start cleanly. _PRESERVATION_VERSION should be + incremented any time an incompatible change is made to the global data + schema. If no version is set the check will always succeed and a version of + zero is assumed. - memory_limit (uint): The number of bytes the sandbox is allowed to consume before being @@ -38,8 +38,9 @@ Sandbox plugins. The are consumed by Heka when it initializes the plugin. since they exceed the maximum message size. - module_directory (string): - The directory where 'require' will attempt to load the external Lua - modules from. Defaults to ${SHARE_DIR}/lua_modules. + The directory or directories where 'require' will attempt to load the + external Lua modules from. Supports multiple paths separated by + semicolons. Defaults to ${SHARE_DIR}/lua_modules. - config (object): A map of configuration variables available to the sandbox via read_config. diff --git a/docs/source/config/encoders/index.rst b/docs/source/config/encoders/index.rst index 8d7e756c8..dcac6cda6 100644 --- a/docs/source/config/encoders/index.rst +++ b/docs/source/config/encoders/index.rst @@ -21,6 +21,7 @@ Available Encoder Plugins protobuf rst sandbox + schema_carbon_line schema_influx schema_influx_line statmetric_influx diff --git a/docs/source/config/encoders/index_noref.rst b/docs/source/config/encoders/index_noref.rst index b96d648fc..80414a96e 100644 --- a/docs/source/config/encoders/index_noref.rst +++ b/docs/source/config/encoders/index_noref.rst @@ -30,6 +30,9 @@ Encoders .. include:: /config/encoders/sandbox.rst :start-line: 1 +.. include:: /config/encoders/schema_carbon_line.rst + :start-line: 1 + .. include:: /config/encoders/schema_influx.rst :start-line: 1 diff --git a/docs/source/config/encoders/schema_carbon_line.rst b/docs/source/config/encoders/schema_carbon_line.rst new file mode 100644 index 000000000..aa96e8feb --- /dev/null +++ b/docs/source/config/encoders/schema_carbon_line.rst @@ -0,0 +1,13 @@ +.. _config_schema_carbon_line_encoder: + +Schema Carbon Line Encoder +========================== + +.. versionadded:: 0.10 + +| Plugin Name: **SandboxEncoder** +| File Name: **lua_encoders/schema_carbon_line.lua** + +.. include:: /../../sandbox/lua/encoders/schema_carbon_line.lua + :start-after: --[=[ + :end-before: --]=] diff --git a/docs/source/sandbox/encoder.rst b/docs/source/sandbox/encoder.rst index 20578a9ed..617019324 100644 --- a/docs/source/sandbox/encoder.rst +++ b/docs/source/sandbox/encoder.rst @@ -27,7 +27,7 @@ ESPayloadEncoder :end-before: --]] Schema Carbon Line Encoder -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: /../../sandbox/lua/encoders/schema_carbon_line.lua :start-after: --[=[ :end-before: --]=] @@ -39,7 +39,7 @@ Schema InfluxDB Encoder :end-before: --]=] Schema InfluxDB Line Encoder -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. include:: /../../sandbox/lua/encoders/schema_influx_line.lua :start-after: --[=[ :end-before: --]=] diff --git a/pipeline/stat_accum_input_test.go b/pipeline/stat_accum_input_test.go index 666649ceb..070c8759f 100644 --- a/pipeline/stat_accum_input_test.go +++ b/pipeline/stat_accum_input_test.go @@ -15,6 +15,7 @@ package pipeline import ( + "log" "strconv" "strings" "sync" @@ -94,7 +95,8 @@ func StatAccumInputSpec(c gs.Context) { } ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply) - ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput").AnyTimes() + // Need one of these for every Inject + ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput") injectCall := ith.MockInputRunner.EXPECT().Inject(ith.Pack) var injectCalled sync.WaitGroup @@ -131,9 +133,26 @@ func StatAccumInputSpec(c gs.Context) { } } + drainStats := func() { + ok := true + for ok { + if len(statAccumInput.statChan) > 0 { + time.Sleep(100 * time.Millisecond) + } else { + ok = false + } + } + } + validateValueAtKey := func(msg *message.Message, key string, value interface{}) { fieldValue, ok := msg.GetFieldValue(key) + if !ok { + log.Printf("%s field is missing from the message\n", key) + } c.Expect(ok, gs.IsTrue) + if fieldValue != value { + log.Printf("%s should be %v is %v\n", key, value, fieldValue) + } c.Expect(fieldValue, gs.Equals, value) } @@ -203,15 +222,17 @@ func StatAccumInputSpec(c gs.Context) { c.Specify("emits proper idle stats", func() { startInput() + inputStarted.Wait() sendGauge("sample.gauge", 1, 2) sendCounter("sample.cnt", 1, 2, 3, 4, 5) sendTimer("sample.timer", 10, 10, 20, 20) - inputStarted.Wait() + drainStats() tickChan <- time.Now() injectCalled.Wait() ith.Pack.Recycle(nil) ith.PackSupply <- ith.Pack + ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput") ith.MockInputRunner.EXPECT().Inject(ith.Pack) msg, err := finalizeSendingStats() @@ -225,19 +246,22 @@ func StatAccumInputSpec(c gs.Context) { c.Specify("omits idle stats", func() { config.DeleteIdleStats = true err := statAccumInput.Init(config) - c.Expect(err, gs.IsNil) + c.Assume(err, gs.IsNil) startInput() + inputStarted.Wait() // Can't flush until the input has started. sendGauge("sample.gauge", 1, 2) sendCounter("sample.cnt", 1, 2, 3, 4, 5) sendTimer("sample.timer", 10, 10, 20, 20) - inputStarted.Wait() // Can't flush until the input has started. + drainStats() tickChan <- time.Now() injectCalled.Wait() sendTimer("sample2.timer", 10, 20) + drainStats() ith.Pack.Recycle(nil) ith.PackSupply <- ith.Pack + ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput") ith.MockInputRunner.EXPECT().Inject(ith.Pack) msg, err := finalizeSendingStats() c.Assume(err, gs.IsNil) @@ -379,6 +403,7 @@ func StatAccumInputSpec(c gs.Context) { // Prep pack and EXPECTS for the close. ith.PackSupply <- ith.Pack + ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput") ith.MockInputRunner.EXPECT().Inject(ith.Pack) close(statAccumInput.statChan) diff --git a/plugins/file/file_output_test.go b/plugins/file/file_output_test.go index 4160d9ba0..e275bb45d 100644 --- a/plugins/file/file_output_test.go +++ b/plugins/file/file_output_test.go @@ -107,22 +107,27 @@ func FileOutputSpec(c gs.Context) { }) }) - c.Specify("tests rotation of files", func() { + c.Specify("rotates files correctly", func() { config.Path = "%Y-%m-%d" + config.RotationInterval = 24 rotateChan := make(chan time.Time) closingChan := make(chan struct{}) err := fileOutput.Init(config) - defer fileOutput.file.Close() - c.Assume(err, gs.IsNil) + defer fileOutput.file.Close() + fileOutput.rotateChan = rotateChan fileOutput.closing = closingChan fileOutput.startRotateNotifier() - go fileOutput.committer(oth.MockOutputRunner, errChan) + committerChan := make(chan struct{}) + go func() { + fileOutput.committer(oth.MockOutputRunner, errChan) + close(committerChan) + }() c.Assume(fileOutput.path, gs.Equals, time.Now().Format("2006-01-02")) @@ -130,11 +135,12 @@ func FileOutputSpec(c gs.Context) { futureNow := time.Now().Add(futureDuration) rotateChan <- futureNow + close(inChan) + close(fileOutput.batchChan) + <-committerChan c.Assume(fileOutput.path, gs.Equals, futureNow.Format("2006-01-02")) - close(inChan) - close(fileOutput.batchChan) }) c.Specify("processes incoming messages", func() { @@ -256,6 +262,7 @@ func FileOutputSpec(c gs.Context) { timerChan := make(chan time.Time) msg2 := pipeline_ts.GetTestMessage() + msg2.SetPayload("MESSAGE 2") pack2 := NewPipelinePack(pConfig.InputRecycleChan()) pack2.Message = msg2 @@ -299,24 +306,26 @@ func FileOutputSpec(c gs.Context) { defer cleanUp() inChan <- pack + after := time.After(100 * time.Millisecond) select { case <-fileOutput.batchChan: c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet") - default: + case <-after: } timerChan <- time.Now() + after = time.After(100 * time.Millisecond) select { case <-fileOutput.batchChan: c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet") - default: + case <-after: } + after = time.After(100 * time.Millisecond) inChan <- pack2 - runtime.Gosched() select { case <-fileOutput.batchChan: - default: + case <-after: c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now") } }) @@ -329,18 +338,19 @@ func FileOutputSpec(c gs.Context) { defer cleanUp() inChan <- pack + after := time.After(100 * time.Millisecond) select { case <-fileOutput.batchChan: c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet") - default: + case <-after: } c.Specify("when interval triggers first", func() { timerChan <- time.Now() - runtime.Gosched() + after = time.After(100 * time.Millisecond) select { case <-fileOutput.batchChan: - default: + case <-after: c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now") } }) @@ -349,10 +359,10 @@ func FileOutputSpec(c gs.Context) { out, err := encoder.Encode(pack2) oth.MockOutputRunner.EXPECT().Encode(gomock.Any()).Return(out, err) inChan <- pack2 - runtime.Gosched() + after = time.After(100 * time.Millisecond) select { case <-fileOutput.batchChan: - default: + case <-after: c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now") } })