Skip to content

Commit

Permalink
Merge branch 'hynd-tcp_reconnect' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed Aug 18, 2015
2 parents a302cf2 + 0d8ef78 commit 6fe420a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Bug Handling
Features
--------

* Allow TcpOutput to re-establish the connection after a configurable number of
successfully delivered messages.

0.10.0 (2015-??-??)
=====================

Expand Down
3 changes: 3 additions & 0 deletions docs/source/config/outputs/tcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ Config:
All of the :ref:`buffering <buffering>` config options are set to the
standard default options, except for `cursor_update_count`, which is set to
50 instead of the standard default of 1.
- reconnect_after (int, optional):
Re-establish the TCP connection after the specified number of successfully
delivered messages. Defaults to 0 (no reconnection).

Example:

Expand Down
6 changes: 6 additions & 0 deletions plugins/tcp/tcp_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type TcpOutputConfig struct {
KeepAlive bool `toml:"keep_alive"`
// Integer indicating seconds between keep alives.
KeepAlivePeriod int `toml:"keep_alive_period"`
// Number of successfully processed messages to re-establish the TCP
// connection after. Defaults to 0 (never)
ReconnectAfter int64 `toml:"reconnect_after"`
// Specifies whether or not Heka's stream framing wil be applied to the
// output. We do some magic to default to true if ProtobufEncoder is used,
// false otherwise.
Expand Down Expand Up @@ -166,6 +169,9 @@ func (t *TcpOutput) ProcessMessage(pack *PipelinePack) (err error) {
} else {
atomic.AddInt64(&t.processMessageCount, 1)
t.or.UpdateCursor(pack.QueueCursor)
if t.conf.ReconnectAfter > 0 && atomic.LoadInt64(&t.processMessageCount)%t.conf.ReconnectAfter == 0 {
t.cleanupConn()
}
}

return err
Expand Down

0 comments on commit 6fe420a

Please sign in to comment.