Skip to content

Commit

Permalink
Merge branch 'versions/0.10' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Sep 15, 2015
2 parents e158ec8 + d501394 commit eb2042c
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 33 deletions.
11 changes: 7 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
sudo: false
language: go
go:
- 1.4
notifications:
irc:
channels:
- "irc.mozilla.org#heka"
before_install:
- sudo apt-get install protobuf-compiler cmake libgeoip-dev

addons:
apt:
packages:
- protobuf-compiler
- cmake
- libgeoip-dev
install:
- . build.sh

script:
- make test
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ Bug Handling
* Fixed bug where a SandboxInput configured with a `ticker_interval` would
get stuck in an infinite loop on shutdown (#1705).

* Changes in StatAccumInput and FileOutput tests to minimize intermittent
Travis failures.

0.10.0b1 (2015-08-07)
=====================

Expand Down
17 changes: 9 additions & 8 deletions docs/source/config/common_sandbox_parameter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ Sandbox plugins. The are consumed by Heka when it initializes the plugin.
- preserve_data (bool):
True if the sandbox global data should be preserved/restored on plugin
shutdown/startup. When true this works in conjunction with a global Lua
_PRESERVATION_VERSION variable which is examined during restoration;
if the previous version does not match the current version the restoration
will be aborted and the sandbox will start cleanly. _PRESERVATION_VERSION
should be incremented any time an incompatible change is made to the global
data schema. If no version is set the check will always succeed and a
version of zero is assumed.
_PRESERVATION_VERSION variable which is examined during restoration; if the
previous version does not match the current version the restoration will be
aborted and the sandbox will start cleanly. _PRESERVATION_VERSION should be
incremented any time an incompatible change is made to the global data
schema. If no version is set the check will always succeed and a version of
zero is assumed.

- memory_limit (uint):
The number of bytes the sandbox is allowed to consume before being
Expand All @@ -38,8 +38,9 @@ Sandbox plugins. The are consumed by Heka when it initializes the plugin.
since they exceed the maximum message size.

- module_directory (string):
The directory where 'require' will attempt to load the external Lua
modules from. Defaults to ${SHARE_DIR}/lua_modules.
The directory or directories where 'require' will attempt to load the
external Lua modules from. Supports multiple paths separated by
semicolons. Defaults to ${SHARE_DIR}/lua_modules.

- config (object):
A map of configuration variables available to the sandbox via read_config.
Expand Down
1 change: 1 addition & 0 deletions docs/source/config/encoders/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Available Encoder Plugins
protobuf
rst
sandbox
schema_carbon_line
schema_influx
schema_influx_line
statmetric_influx
3 changes: 3 additions & 0 deletions docs/source/config/encoders/index_noref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ Encoders
.. include:: /config/encoders/sandbox.rst
:start-line: 1

.. include:: /config/encoders/schema_carbon_line.rst
:start-line: 1

.. include:: /config/encoders/schema_influx.rst
:start-line: 1

Expand Down
13 changes: 13 additions & 0 deletions docs/source/config/encoders/schema_carbon_line.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
.. _config_schema_carbon_line_encoder:

Schema Carbon Line Encoder
==========================

.. versionadded:: 0.10

| Plugin Name: **SandboxEncoder**
| File Name: **lua_encoders/schema_carbon_line.lua**
.. include:: /../../sandbox/lua/encoders/schema_carbon_line.lua
:start-after: --[=[
:end-before: --]=]
4 changes: 2 additions & 2 deletions docs/source/sandbox/encoder.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ESPayloadEncoder
:end-before: --]]

Schema Carbon Line Encoder
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: /../../sandbox/lua/encoders/schema_carbon_line.lua
:start-after: --[=[
:end-before: --]=]
Expand All @@ -39,7 +39,7 @@ Schema InfluxDB Encoder
:end-before: --]=]

Schema InfluxDB Line Encoder
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. include:: /../../sandbox/lua/encoders/schema_influx_line.lua
:start-after: --[=[
:end-before: --]=]
Expand Down
33 changes: 29 additions & 4 deletions pipeline/stat_accum_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package pipeline

import (
"log"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -94,7 +95,8 @@ func StatAccumInputSpec(c gs.Context) {
}

ith.MockInputRunner.EXPECT().InChan().Return(ith.PackSupply)
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput").AnyTimes()
// Need one of these for every Inject
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")

injectCall := ith.MockInputRunner.EXPECT().Inject(ith.Pack)
var injectCalled sync.WaitGroup
Expand Down Expand Up @@ -131,9 +133,26 @@ func StatAccumInputSpec(c gs.Context) {
}
}

drainStats := func() {
ok := true
for ok {
if len(statAccumInput.statChan) > 0 {
time.Sleep(100 * time.Millisecond)
} else {
ok = false
}
}
}

validateValueAtKey := func(msg *message.Message, key string, value interface{}) {
fieldValue, ok := msg.GetFieldValue(key)
if !ok {
log.Printf("%s field is missing from the message\n", key)
}
c.Expect(ok, gs.IsTrue)
if fieldValue != value {
log.Printf("%s should be %v is %v\n", key, value, fieldValue)
}
c.Expect(fieldValue, gs.Equals, value)
}

Expand Down Expand Up @@ -203,15 +222,17 @@ func StatAccumInputSpec(c gs.Context) {

c.Specify("emits proper idle stats", func() {
startInput()
inputStarted.Wait()
sendGauge("sample.gauge", 1, 2)
sendCounter("sample.cnt", 1, 2, 3, 4, 5)
sendTimer("sample.timer", 10, 10, 20, 20)
inputStarted.Wait()
drainStats()
tickChan <- time.Now()

injectCalled.Wait()
ith.Pack.Recycle(nil)
ith.PackSupply <- ith.Pack
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
ith.MockInputRunner.EXPECT().Inject(ith.Pack)

msg, err := finalizeSendingStats()
Expand All @@ -225,19 +246,22 @@ func StatAccumInputSpec(c gs.Context) {
c.Specify("omits idle stats", func() {
config.DeleteIdleStats = true
err := statAccumInput.Init(config)
c.Expect(err, gs.IsNil)
c.Assume(err, gs.IsNil)

startInput()
inputStarted.Wait() // Can't flush until the input has started.
sendGauge("sample.gauge", 1, 2)
sendCounter("sample.cnt", 1, 2, 3, 4, 5)
sendTimer("sample.timer", 10, 10, 20, 20)
inputStarted.Wait() // Can't flush until the input has started.
drainStats()
tickChan <- time.Now()
injectCalled.Wait()

sendTimer("sample2.timer", 10, 20)
drainStats()
ith.Pack.Recycle(nil)
ith.PackSupply <- ith.Pack
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
ith.MockInputRunner.EXPECT().Inject(ith.Pack)
msg, err := finalizeSendingStats()
c.Assume(err, gs.IsNil)
Expand Down Expand Up @@ -379,6 +403,7 @@ func StatAccumInputSpec(c gs.Context) {

// Prep pack and EXPECTS for the close.
ith.PackSupply <- ith.Pack
ith.MockInputRunner.EXPECT().Name().Return("StatAccumInput")
ith.MockInputRunner.EXPECT().Inject(ith.Pack)

close(statAccumInput.statChan)
Expand Down
40 changes: 25 additions & 15 deletions plugins/file/file_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,40 @@ func FileOutputSpec(c gs.Context) {
})
})

c.Specify("tests rotation of files", func() {
c.Specify("rotates files correctly", func() {
config.Path = "%Y-%m-%d"
config.RotationInterval = 24
rotateChan := make(chan time.Time)
closingChan := make(chan struct{})

err := fileOutput.Init(config)
defer fileOutput.file.Close()

c.Assume(err, gs.IsNil)

defer fileOutput.file.Close()

fileOutput.rotateChan = rotateChan
fileOutput.closing = closingChan

fileOutput.startRotateNotifier()

go fileOutput.committer(oth.MockOutputRunner, errChan)
committerChan := make(chan struct{})
go func() {
fileOutput.committer(oth.MockOutputRunner, errChan)
close(committerChan)
}()

c.Assume(fileOutput.path, gs.Equals, time.Now().Format("2006-01-02"))

futureDuration, _ := time.ParseDuration("24h")
futureNow := time.Now().Add(futureDuration)

rotateChan <- futureNow
close(inChan)
close(fileOutput.batchChan)
<-committerChan

c.Assume(fileOutput.path, gs.Equals, futureNow.Format("2006-01-02"))

close(inChan)
close(fileOutput.batchChan)
})

c.Specify("processes incoming messages", func() {
Expand Down Expand Up @@ -256,6 +262,7 @@ func FileOutputSpec(c gs.Context) {
timerChan := make(chan time.Time)

msg2 := pipeline_ts.GetTestMessage()
msg2.SetPayload("MESSAGE 2")
pack2 := NewPipelinePack(pConfig.InputRecycleChan())
pack2.Message = msg2

Expand Down Expand Up @@ -299,24 +306,26 @@ func FileOutputSpec(c gs.Context) {
defer cleanUp()
inChan <- pack

after := time.After(100 * time.Millisecond)
select {
case <-fileOutput.batchChan:
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
default:
case <-after:
}

timerChan <- time.Now()
after = time.After(100 * time.Millisecond)
select {
case <-fileOutput.batchChan:
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
default:
case <-after:
}

after = time.After(100 * time.Millisecond)
inChan <- pack2
runtime.Gosched()
select {
case <-fileOutput.batchChan:
default:
case <-after:
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
}
})
Expand All @@ -329,18 +338,19 @@ func FileOutputSpec(c gs.Context) {
defer cleanUp()
inChan <- pack

after := time.After(100 * time.Millisecond)
select {
case <-fileOutput.batchChan:
c.Expect("", gs.Equals, "fileOutput.batchChan should NOT have fired yet")
default:
case <-after:
}

c.Specify("when interval triggers first", func() {
timerChan <- time.Now()
runtime.Gosched()
after = time.After(100 * time.Millisecond)
select {
case <-fileOutput.batchChan:
default:
case <-after:
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
}
})
Expand All @@ -349,10 +359,10 @@ func FileOutputSpec(c gs.Context) {
out, err := encoder.Encode(pack2)
oth.MockOutputRunner.EXPECT().Encode(gomock.Any()).Return(out, err)
inChan <- pack2
runtime.Gosched()
after = time.After(100 * time.Millisecond)
select {
case <-fileOutput.batchChan:
default:
case <-after:
c.Expect("", gs.Equals, "fileOutput.batchChan SHOULD have fired by now")
}
})
Expand Down

0 comments on commit eb2042c

Please sign in to comment.