diff --git a/CHANGES.txt b/CHANGES.txt index eb70fd266..8b3d6ced7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,11 @@ Features 0.10.0 (2015-??-??) ===================== +Features +-------- + +* Added `set_hostname` option to UdpInput configuration (#1668). + Bug Handling ------------ @@ -24,6 +29,13 @@ Bug Handling * Add sample rate support for gauge type in StatsdInput. +* Fixed some cases where SandboxDecoder wasn't correctly setting + `pack.TrustMsgBytes` to false when needed. + +* Fixed broken queue rolling and added test. + +* Force ElasticSearch index name to lower case, as required by ElasticSearch. + 0.10.0b1 (2015-08-07) ===================== @@ -84,10 +96,10 @@ Backwards Incompatibilities * LogOutput will write data to stdout instead of stderr (#1515). -* Using stftime literals for filenames during rotation in FileOutput plugin +* Using strftime literals for filenames during rotation in FileOutput plugin (#1469). -* Implemented stftime format codes in: filenames in FileOutput plugin, +* Implemented strftime format codes in: filenames in FileOutput plugin, ESJsonEncoder, ESLogstashV0Encoder, Payload encoder (#1469, #1508). * The package created by 'make deb' creates an "heka" user and ships an init diff --git a/docs/source/config/inputs/udp.rst b/docs/source/config/inputs/udp.rst index 6e4cc0fb0..543524114 100644 --- a/docs/source/config/inputs/udp.rst +++ b/docs/source/config/inputs/udp.rst @@ -33,6 +33,11 @@ Config: - net (string, optional, default: "udp") Network value must be one of: "udp", "udp4", "udp6", or "unixgram". +.. versionadded:: 0.10 + +- set_hostname (boolean, default: false) + Set Hostname field from remote address. + Example: .. code-block:: ini diff --git a/docs/source/config/outputs/index.rst b/docs/source/config/outputs/index.rst index d097907b1..9efead1a9 100644 --- a/docs/source/config/outputs/index.rst +++ b/docs/source/config/outputs/index.rst @@ -24,24 +24,36 @@ initialization code. - ticker_interval (uint, optional): Frequency (in seconds) that a timer event will be sent to the filter. Defaults to not sending timer events. -- encoder (string, optional): - .. versionadded:: 0.6 +.. versionadded:: 0.6 + +- encoder (string, optional): Encoder to be used by the output. This should refer to the name of an encoder plugin section that is specified elsewhere in the TOML configuration. Messages can be encoded using the specified encoder by calling the OutputRunner's `Encode()` method. - use_framing (bool, optional): - .. versionadded:: 0.6 - Specifies whether or not Heka's :ref:`stream_framing` should be applied to the binary data returned from the OutputRunner's `Encode()` method. + +.. versionadded:: 0.7 + - can_exit (bool, optional) - .. versionadded:: 0.7 - Whether or not this plugin can exit without causing Heka to shutdown. Defaults to false. +.. versionadded:: 0.10 + +- use_buffering (bool, optional) + If true, all messages delivered to this output will be buffered to disk + before delivery, preventing back pressure and allowing retries in cases of + message processing failure. Defaults to false, unless otherwise specified + by the individual output's documentation. +- buffering (QueueBufferConfig, optional) + A sub-section that specifies the settings to be used for the buffering + behavior. This will only have any impact if `use_buffering` is set to + true. See :ref:`buffering`. + Available Output Plugins ======================== diff --git a/pipeline/queue_buffer.go b/pipeline/queue_buffer.go index a5d6bc81c..264d882d9 100644 --- a/pipeline/queue_buffer.go +++ b/pipeline/queue_buffer.go @@ -107,8 +107,8 @@ func NewBufferSet(queueDir, queueName string, config *QueueBufferConfig, } type BufferFeeder struct { - writeFileSize uint64 writeFile *os.File + writeFileSize uint64 writeId uint queue string queueSize *BufferSize @@ -119,7 +119,7 @@ func NewBufferFeeder(queue string, config *QueueBufferConfig, queueSize *BufferS *BufferFeeder, error) { if config.MaxFileSize == 0 { - config.MaxFileSize = 128 * 1024 * 1024 + config.MaxFileSize = 512 * 1024 * 1024 // 512 MiB } bf := &BufferFeeder{ queue: queue, @@ -143,6 +143,11 @@ func NewBufferFeeder(queue string, config *QueueBufferConfig, queueSize *BufferS if err != nil { return nil, fmt.Errorf("can't open write file: %s", err) } + writeFileInfo, err := bf.writeFile.Stat() + if err != nil { + return nil, fmt.Errorf("can't stat write file: %s", err) + } + bf.writeFileSize = uint64(writeFileInfo.Size()) return bf, nil } @@ -154,9 +159,13 @@ func (bf *BufferFeeder) RollQueue() (err error) { bf.writeId++ bf.writeFile, err = os.OpenFile(getQueueFilename(bf.queue, bf.writeId), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + bf.writeFileSize = 0 return err } +// QueueRecord adds a new record to the end of the current queue buffer. Note +// that QueueRecord is *not* thread safe, it should only ever be called by one +// goroutine at a time. func (bf *BufferFeeder) QueueRecord(pack *PipelinePack) error { maxQueueSize := bf.Config.MaxBufferSize if maxQueueSize > 0 && (bf.queueSize.Get()+uint64(len(pack.MsgBytes)) > maxQueueSize) { @@ -192,6 +201,7 @@ func (bf *BufferFeeder) QueueRecord(pack *PipelinePack) error { return fmt.Errorf("can't write to queue: %s", err) } bf.queueSize.Add(uint64(n)) + bf.writeFileSize += uint64(n) return nil } diff --git a/pipeline/queue_buffer_test.go b/pipeline/queue_buffer_test.go index 97afc7125..ca1e5f7fa 100644 --- a/pipeline/queue_buffer_test.go +++ b/pipeline/queue_buffer_test.go @@ -239,7 +239,7 @@ func QueueBufferSpec(c gs.Context) { err = feeder.QueueRecord(newpack) c.Expect(err, gs.IsNil) - c.Expect(feeder.queueSize.Get(), gs.Equals, uint64(115)) + c.Expect(feeder.queueSize.Get(), gs.Equals, uint64(expectedLen)) }) c.Specify("when queue has limit and is full", func() { @@ -276,6 +276,29 @@ func QueueBufferSpec(c gs.Context) { c.Expect(err, gs.IsNil) c.Expect(len(queueFiles), gs.Equals, numFiles+1) }) + + c.Specify("rolls when queue file hits max size", func() { + feeder.Config.MaxFileSize = uint64(300) + c.Assume(feeder.writeFileSize, gs.Equals, uint64(0)) + + // First two shouldn't trigger roll. + err = feeder.QueueRecord(newpack) + c.Expect(err, gs.IsNil) + err = feeder.QueueRecord(newpack) + c.Expect(err, gs.IsNil) + c.Expect(feeder.writeFileSize, gs.Equals, uint64(expectedLen*2)) + queueFiles, err := ioutil.ReadDir(feeder.queue) + c.Expect(err, gs.IsNil) + c.Expect(len(queueFiles), gs.Equals, 1) + + // Third one should. + err = feeder.QueueRecord(newpack) + c.Expect(err, gs.IsNil) + c.Expect(feeder.writeFileSize, gs.Equals, uint64(expectedLen)) + queueFiles, err = ioutil.ReadDir(feeder.queue) + c.Expect(err, gs.IsNil) + c.Expect(len(queueFiles), gs.Equals, 2) + }) }) c.Specify("getQueueBufferSize", func() { diff --git a/plugins/elasticsearch/coordinates.go b/plugins/elasticsearch/coordinates.go index 4ebafbd18..e2990752a 100644 --- a/plugins/elasticsearch/coordinates.go +++ b/plugins/elasticsearch/coordinates.go @@ -47,7 +47,7 @@ func (e *ElasticSearchCoordinates) PopulateBuffer(m *message.Message, buf *bytes interpIndex, err = interpolateFlag(e, m, e.Index) - buf.WriteString(strconv.Quote(interpIndex)) + buf.WriteString(strconv.Quote(strings.ToLower(interpIndex))) buf.WriteString(`,"_type":`) interpType, err = interpolateFlag(e, m, e.Type) diff --git a/plugins/udp/udp_input.go b/plugins/udp/udp_input.go index 14f07f7a6..2c68c034f 100644 --- a/plugins/udp/udp_input.go +++ b/plugins/udp/udp_input.go @@ -30,10 +30,12 @@ import ( // Input plugin implementation that listens for Heka protocol messages on a // specified UDP socket. type UdpInput struct { - listener net.Conn - name string - stopChan chan struct{} - config *UdpInputConfig + listener net.Conn + reader UdpInputReader + name string + stopChan chan struct{} + config *UdpInputConfig + remote_addr string } // ConfigStruct for NetworkInput plugins. @@ -44,6 +46,14 @@ type UdpInputConfig struct { // String representation of the address of the network connection on which // the listener should be listening (e.g. "127.0.0.1:5565"). Address string + // Set Hostname field from remote address + SetHostname bool `toml:"set_hostname"` +} + +// Wrap ReadFrom into Read and set Hostname +type UdpInputReader struct { + listener *net.UDPConn + input *UdpInput } func (u *UdpInput) ConfigStruct() interface{} { @@ -64,6 +74,10 @@ func (u *UdpInput) Init(config interface{}) (err error) { return errors.New( "Abstract sockets are linux-specific.") } + if u.config.SetHostname { + return errors.New( + "Can't set Hostname from Unix datagram.") + } unixAddr, err := net.ResolveUnixAddr(u.config.Net, u.config.Address) if err != nil { return fmt.Errorf("Error resolving unixgram address: %s", err) @@ -82,6 +96,10 @@ func (u *UdpInput) Init(config interface{}) (err error) { } else if len(u.config.Address) > 3 && u.config.Address[:3] == "fd:" { // File descriptor + if u.config.SetHostname { + return errors.New( + "Can't set Hostname from file descriptor.") + } fdStr := u.config.Address[3:] fdInt, err := strconv.ParseUint(fdStr, 0, 0) if err != nil { @@ -104,6 +122,12 @@ func (u *UdpInput) Init(config interface{}) (err error) { if err != nil { return fmt.Errorf("ListenUDP failed: %s\n", err.Error()) } + if u.config.SetHostname { + u.reader = UdpInputReader { + u.listener.(*net.UDPConn), + u, + } + } } u.stopChan = make(chan struct{}) return @@ -119,6 +143,9 @@ func (u *UdpInput) Run(ir InputRunner, h PluginHelper) error { name := ir.Name() packDec := func(pack *PipelinePack) { pack.Message.SetType(name) + if u.config.SetHostname { + pack.Message.SetHostname(u.remote_addr) + } } sr.SetPackDecorator(packDec) } @@ -128,7 +155,11 @@ func (u *UdpInput) Run(ir InputRunner, h PluginHelper) error { case _, ok = <-u.stopChan: break default: - err = sr.SplitStream(u.listener, nil) + if u.config.SetHostname { + err = sr.SplitStream(u.reader, nil) + } else { + err = sr.SplitStream(u.listener, nil) + } // "use of closed" -> we're stopping. if err != nil && !strings.Contains(err.Error(), "use of closed") { ir.LogError(fmt.Errorf("Read error: %s", err)) @@ -152,6 +183,16 @@ func (u *UdpInput) Stop() { u.listener.Close() } +func (r UdpInputReader) Read(p []byte) (n int, err error) { + n, addr, err := r.listener.ReadFromUDP(p) + if addr != nil { + r.input.remote_addr = addr.IP.String() + } else { + r.input.remote_addr = "" + } + return n, err +} + func init() { RegisterPlugin("UdpInput", func() interface{} { return new(UdpInput) diff --git a/sandbox/lua/lua_sandbox.go.in b/sandbox/lua/lua_sandbox.go.in index b900a7e4b..49dc603da 100644 --- a/sandbox/lua/lua_sandbox.go.in +++ b/sandbox/lua/lua_sandbox.go.in @@ -358,6 +358,7 @@ func go_lua_write_message_string(ptr unsafe.Pointer, c, v, rep *C.char, lsb.globals.LogMessage("go_lua_write_message_string", "No sandbox pack.") return 1 } + lsb.pack.TrustMsgBytes = false if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" { lsb.pack.Message = message.CopyMessage(lsb.pack.Message) lsb.messageCopied = true @@ -458,6 +459,7 @@ func go_lua_write_message_double(ptr unsafe.Pointer, c *C.char, v C.double, rep lsb.globals.LogMessage("go_lua_write_message_double", "No sandbox pack.") return 1 } + lsb.pack.TrustMsgBytes = false if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" { lsb.pack.Message = message.CopyMessage(lsb.pack.Message) lsb.messageCopied = true @@ -500,6 +502,7 @@ func go_lua_write_message_bool(ptr unsafe.Pointer, c *C.char, v bool, rep *C.cha lsb.globals.LogMessage("go_lua_write_message_bool", "No sandbox pack.") return 1 } + lsb.pack.TrustMsgBytes = false if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" { lsb.pack.Message = message.CopyMessage(lsb.pack.Message) lsb.messageCopied = true @@ -525,6 +528,7 @@ func go_lua_delete_message_field(ptr unsafe.Pointer, c *C.char, fi, ai int, has_ lsb.globals.LogMessage("go_lua_delete_message_field", "No sandbox pack.") return 1 } + lsb.pack.TrustMsgBytes = false if !lsb.messageCopied && lsb.sbConfig.PluginType == "encoder" { lsb.pack.Message = message.CopyMessage(lsb.pack.Message) lsb.messageCopied = true diff --git a/sandbox/plugins/sandbox_decoder.go b/sandbox/plugins/sandbox_decoder.go index 4f765759f..3a2bf26f6 100644 --- a/sandbox/plugins/sandbox_decoder.go +++ b/sandbox/plugins/sandbox_decoder.go @@ -209,8 +209,10 @@ func (s *SandboxDecoder) SetDecoderRunner(dr pipeline.DecoderRunner) { t = t.In(time.UTC) ct, _ := time.ParseInLocation(layout, t.Format(layout), s.tz) s.pack.Message.SetTimestamp(ct.UnixNano()) + s.pack.TrustMsgBytes = false } } else { + s.pack.TrustMsgBytes = false s.pack.Message.SetPayload(payload) ptype, _ := message.NewField("payload_type", payload_type, "file-extension") s.pack.Message.AddField(ptype)