Skip to content

Commit

Permalink
Merge branch 'versions/0.10' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Aug 19, 2015
2 parents 6fe420a + b381f02 commit 0cba97c
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 17 deletions.
16 changes: 14 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ Features
0.10.0 (2015-??-??)
=====================

Features
--------

* Added `set_hostname` option to UdpInput configuration (#1668).

Bug Handling
------------

* Set hostname correctly in the Graylog decoder (#1663).

* Add sample rate support for gauge type in StatsdInput.

* Fixed some cases where SandboxDecoder wasn't correctly setting
`pack.TrustMsgBytes` to false when needed.

* Fixed broken queue rolling and added test.

* Force ElasticSearch index name to lower case, as required by ElasticSearch.

0.10.0b1 (2015-08-07)
=====================

Expand Down Expand Up @@ -84,10 +96,10 @@ Backwards Incompatibilities

* LogOutput will write data to stdout instead of stderr (#1515).

* Using stftime literals for filenames during rotation in FileOutput plugin
* Using strftime literals for filenames during rotation in FileOutput plugin
(#1469).

* Implemented stftime format codes in: filenames in FileOutput plugin,
* Implemented strftime format codes in: filenames in FileOutput plugin,
ESJsonEncoder, ESLogstashV0Encoder, Payload encoder (#1469, #1508).

* The package created by 'make deb' creates an "heka" user and ships an init
Expand Down
5 changes: 5 additions & 0 deletions docs/source/config/inputs/udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ Config:
- net (string, optional, default: "udp")
Network value must be one of: "udp", "udp4", "udp6", or "unixgram".

.. versionadded:: 0.10

- set_hostname (boolean, default: false)
Set Hostname field from remote address.

Example:

.. code-block:: ini
Expand Down
24 changes: 18 additions & 6 deletions docs/source/config/outputs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,36 @@ initialization code.
- ticker_interval (uint, optional):
Frequency (in seconds) that a timer event will be sent to the filter.
Defaults to not sending timer events.
- encoder (string, optional):
.. versionadded:: 0.6

.. versionadded:: 0.6

- encoder (string, optional):
Encoder to be used by the output. This should refer to the name of an
encoder plugin section that is specified elsewhere in the TOML
configuration. Messages can be encoded using the specified encoder by
calling the OutputRunner's `Encode()` method.
- use_framing (bool, optional):
.. versionadded:: 0.6

Specifies whether or not Heka's :ref:`stream_framing` should be applied to
the binary data returned from the OutputRunner's `Encode()` method.

.. versionadded:: 0.7

- can_exit (bool, optional)
.. versionadded:: 0.7

Whether or not this plugin can exit without causing Heka to shutdown.
Defaults to false.

.. versionadded:: 0.10

- use_buffering (bool, optional)
If true, all messages delivered to this output will be buffered to disk
before delivery, preventing back pressure and allowing retries in cases of
message processing failure. Defaults to false, unless otherwise specified
by the individual output's documentation.
- buffering (QueueBufferConfig, optional)
A sub-section that specifies the settings to be used for the buffering
behavior. This will only have any impact if `use_buffering` is set to
true. See :ref:`buffering`.

Available Output Plugins
========================

Expand Down
14 changes: 12 additions & 2 deletions pipeline/queue_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func NewBufferSet(queueDir, queueName string, config *QueueBufferConfig,
}

type BufferFeeder struct {
writeFileSize uint64
writeFile *os.File
writeFileSize uint64
writeId uint
queue string
queueSize *BufferSize
Expand All @@ -119,7 +119,7 @@ func NewBufferFeeder(queue string, config *QueueBufferConfig, queueSize *BufferS
*BufferFeeder, error) {

if config.MaxFileSize == 0 {
config.MaxFileSize = 128 * 1024 * 1024
config.MaxFileSize = 512 * 1024 * 1024 // 512 MiB
}
bf := &BufferFeeder{
queue: queue,
Expand All @@ -143,6 +143,11 @@ func NewBufferFeeder(queue string, config *QueueBufferConfig, queueSize *BufferS
if err != nil {
return nil, fmt.Errorf("can't open write file: %s", err)
}
writeFileInfo, err := bf.writeFile.Stat()
if err != nil {
return nil, fmt.Errorf("can't stat write file: %s", err)
}
bf.writeFileSize = uint64(writeFileInfo.Size())
return bf, nil
}

Expand All @@ -154,9 +159,13 @@ func (bf *BufferFeeder) RollQueue() (err error) {
bf.writeId++
bf.writeFile, err = os.OpenFile(getQueueFilename(bf.queue, bf.writeId),
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
bf.writeFileSize = 0
return err
}

// QueueRecord adds a new record to the end of the current queue buffer. Note
// that QueueRecord is *not* thread safe, it should only ever be called by one
// goroutine at a time.
func (bf *BufferFeeder) QueueRecord(pack *PipelinePack) error {
maxQueueSize := bf.Config.MaxBufferSize
if maxQueueSize > 0 && (bf.queueSize.Get()+uint64(len(pack.MsgBytes)) > maxQueueSize) {
Expand Down Expand Up @@ -192,6 +201,7 @@ func (bf *BufferFeeder) QueueRecord(pack *PipelinePack) error {
return fmt.Errorf("can't write to queue: %s", err)
}
bf.queueSize.Add(uint64(n))
bf.writeFileSize += uint64(n)
return nil
}

Expand Down
25 changes: 24 additions & 1 deletion pipeline/queue_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func QueueBufferSpec(c gs.Context) {

err = feeder.QueueRecord(newpack)
c.Expect(err, gs.IsNil)
c.Expect(feeder.queueSize.Get(), gs.Equals, uint64(115))
c.Expect(feeder.queueSize.Get(), gs.Equals, uint64(expectedLen))
})

c.Specify("when queue has limit and is full", func() {
Expand Down Expand Up @@ -276,6 +276,29 @@ func QueueBufferSpec(c gs.Context) {
c.Expect(err, gs.IsNil)
c.Expect(len(queueFiles), gs.Equals, numFiles+1)
})

c.Specify("rolls when queue file hits max size", func() {
feeder.Config.MaxFileSize = uint64(300)
c.Assume(feeder.writeFileSize, gs.Equals, uint64(0))

// First two shouldn't trigger roll.
err = feeder.QueueRecord(newpack)
c.Expect(err, gs.IsNil)
err = feeder.QueueRecord(newpack)
c.Expect(err, gs.IsNil)
c.Expect(feeder.writeFileSize, gs.Equals, uint64(expectedLen*2))
queueFiles, err := ioutil.ReadDir(feeder.queue)
c.Expect(err, gs.IsNil)
c.Expect(len(queueFiles), gs.Equals, 1)

// Third one should.
err = feeder.QueueRecord(newpack)
c.Expect(err, gs.IsNil)
c.Expect(feeder.writeFileSize, gs.Equals, uint64(expectedLen))
queueFiles, err = ioutil.ReadDir(feeder.queue)
c.Expect(err, gs.IsNil)
c.Expect(len(queueFiles), gs.Equals, 2)
})
})

c.Specify("getQueueBufferSize", func() {
Expand Down
2 changes: 1 addition & 1 deletion plugins/elasticsearch/coordinates.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *ElasticSearchCoordinates) PopulateBuffer(m *message.Message, buf *bytes

interpIndex, err = interpolateFlag(e, m, e.Index)

buf.WriteString(strconv.Quote(interpIndex))
buf.WriteString(strconv.Quote(strings.ToLower(interpIndex)))
buf.WriteString(`,"_type":`)

interpType, err = interpolateFlag(e, m, e.Type)
Expand Down
51 changes: 46 additions & 5 deletions plugins/udp/udp_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import (
// Input plugin implementation that listens for Heka protocol messages on a
// specified UDP socket.
type UdpInput struct {
listener net.Conn
name string
stopChan chan struct{}
config *UdpInputConfig
listener net.Conn
reader UdpInputReader
name string
stopChan chan struct{}
config *UdpInputConfig
remote_addr string
}

// ConfigStruct for NetworkInput plugins.
Expand All @@ -44,6 +46,14 @@ type UdpInputConfig struct {
// String representation of the address of the network connection on which
// the listener should be listening (e.g. "127.0.0.1:5565").
Address string
// Set Hostname field from remote address
SetHostname bool `toml:"set_hostname"`
}

// Wrap ReadFrom into Read and set Hostname
type UdpInputReader struct {
listener *net.UDPConn
input *UdpInput
}

func (u *UdpInput) ConfigStruct() interface{} {
Expand All @@ -64,6 +74,10 @@ func (u *UdpInput) Init(config interface{}) (err error) {
return errors.New(
"Abstract sockets are linux-specific.")
}
if u.config.SetHostname {
return errors.New(
"Can't set Hostname from Unix datagram.")
}
unixAddr, err := net.ResolveUnixAddr(u.config.Net, u.config.Address)
if err != nil {
return fmt.Errorf("Error resolving unixgram address: %s", err)
Expand All @@ -82,6 +96,10 @@ func (u *UdpInput) Init(config interface{}) (err error) {

} else if len(u.config.Address) > 3 && u.config.Address[:3] == "fd:" {
// File descriptor
if u.config.SetHostname {
return errors.New(
"Can't set Hostname from file descriptor.")
}
fdStr := u.config.Address[3:]
fdInt, err := strconv.ParseUint(fdStr, 0, 0)
if err != nil {
Expand All @@ -104,6 +122,12 @@ func (u *UdpInput) Init(config interface{}) (err error) {
if err != nil {
return fmt.Errorf("ListenUDP failed: %s\n", err.Error())
}
if u.config.SetHostname {
u.reader = UdpInputReader {
u.listener.(*net.UDPConn),
u,
}
}
}
u.stopChan = make(chan struct{})
return
Expand All @@ -119,6 +143,9 @@ func (u *UdpInput) Run(ir InputRunner, h PluginHelper) error {
name := ir.Name()
packDec := func(pack *PipelinePack) {
pack.Message.SetType(name)
if u.config.SetHostname {
pack.Message.SetHostname(u.remote_addr)
}
}
sr.SetPackDecorator(packDec)
}
Expand All @@ -128,7 +155,11 @@ func (u *UdpInput) Run(ir InputRunner, h PluginHelper) error {
case _, ok = <-u.stopChan:
break
default:
err = sr.SplitStream(u.listener, nil)
if u.config.SetHostname {
err = sr.SplitStream(u.reader, nil)
} else {
err = sr.SplitStream(u.listener, nil)
}
// "use of closed" -> we're stopping.
if err != nil && !strings.Contains(err.Error(), "use of closed") {
ir.LogError(fmt.Errorf("Read error: %s", err))
Expand All @@ -152,6 +183,16 @@ func (u *UdpInput) Stop() {
u.listener.Close()
}

func (r UdpInputReader) Read(p []byte) (n int, err error) {
n, addr, err := r.listener.ReadFromUDP(p)
if addr != nil {
r.input.remote_addr = addr.IP.String()
} else {
r.input.remote_addr = ""
}
return n, err
}

func init() {
RegisterPlugin("UdpInput", func() interface{} {
return new(UdpInput)
Expand Down
4 changes: 4 additions & 0 deletions sandbox/lua/lua_sandbox.go.in
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func go_lua_write_message_string(ptr unsafe.Pointer, c, v, rep *C.char,
lsb.globals.LogMessage("go_lua_write_message_string", "No sandbox pack.")
return 1
}
lsb.pack.TrustMsgBytes = false
if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" {
lsb.pack.Message = message.CopyMessage(lsb.pack.Message)
lsb.messageCopied = true
Expand Down Expand Up @@ -458,6 +459,7 @@ func go_lua_write_message_double(ptr unsafe.Pointer, c *C.char, v C.double, rep
lsb.globals.LogMessage("go_lua_write_message_double", "No sandbox pack.")
return 1
}
lsb.pack.TrustMsgBytes = false
if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" {
lsb.pack.Message = message.CopyMessage(lsb.pack.Message)
lsb.messageCopied = true
Expand Down Expand Up @@ -500,6 +502,7 @@ func go_lua_write_message_bool(ptr unsafe.Pointer, c *C.char, v bool, rep *C.cha
lsb.globals.LogMessage("go_lua_write_message_bool", "No sandbox pack.")
return 1
}
lsb.pack.TrustMsgBytes = false
if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" {
lsb.pack.Message = message.CopyMessage(lsb.pack.Message)
lsb.messageCopied = true
Expand All @@ -525,6 +528,7 @@ func go_lua_delete_message_field(ptr unsafe.Pointer, c *C.char, fi, ai int, has_
lsb.globals.LogMessage("go_lua_delete_message_field", "No sandbox pack.")
return 1
}
lsb.pack.TrustMsgBytes = false
if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" {
lsb.pack.Message = message.CopyMessage(lsb.pack.Message)
lsb.messageCopied = true
Expand Down
2 changes: 2 additions & 0 deletions sandbox/plugins/sandbox_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ func (s *SandboxDecoder) SetDecoderRunner(dr pipeline.DecoderRunner) {
t = t.In(time.UTC)
ct, _ := time.ParseInLocation(layout, t.Format(layout), s.tz)
s.pack.Message.SetTimestamp(ct.UnixNano())
s.pack.TrustMsgBytes = false
}
} else {
s.pack.TrustMsgBytes = false
s.pack.Message.SetPayload(payload)
ptype, _ := message.NewField("payload_type", payload_type, "file-extension")
s.pack.Message.AddField(ptype)
Expand Down

0 comments on commit 0cba97c

Please sign in to comment.