Skip to content

Commit

Permalink
Merge pull request mozilla-services#1703 from mozilla-services/update…
Browse files Browse the repository at this point in the history
…_sarama

Update to Sarama 1.5.0, add `git_clone_to_path` to build, use it
  • Loading branch information
rafrombrc committed Sep 16, 2015
2 parents 5b00263 + 5f63621 commit 6f4b61a
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 171 deletions.
7 changes: 7 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ Backwards Incompatibilities
Bug Handling
------------

* Updated Sarama dependency from pre-1.0 release fork to fork (with only test
code changes) of Sarama 1.5.0 release.

Features
--------

* Allow TcpOutput to re-establish the connection after a configurable number of
successfully delivered messages.

* Added `git_clone_to_path` to the cmake build to allow git repos to be cloned
into alternate locations; useful for relocating forks of Go packages into
their original import paths.

0.10.0 (2015-??-??)
=====================

Expand Down
24 changes: 22 additions & 2 deletions cmake/externals.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ function(git_clone url tag)
add_dependencies(GoPackages ${name})
endfunction(git_clone)

function(git_clone_to_path url tag dest_path)
parse_url(${url})
externalproject_add(
${name}
GIT_REPOSITORY ${url}
GIT_TAG ${tag}
SOURCE_DIR "${PROJECT_PATH}/src/${dest_path}"
BUILD_COMMAND ""
CONFIGURE_COMMAND ""
INSTALL_COMMAND ""
UPDATE_COMMAND "" # comment out to enable updates
)
add_dependencies(GoPackages ${name})
endfunction(git_clone_to_path)

function(hg_clone url tag)
parse_url(${url})
externalproject_add(
Expand Down Expand Up @@ -146,8 +161,13 @@ git_clone(https://github.com/rafrombrc/gospec 2e46585948f47047b0c217d00fa24bbc4e
git_clone(https://github.com/crankycoder/xmlpath 670b185b686fd11aa115291fb2f6dc3ed7ebb488)
git_clone(https://github.com/thoj/go-ircevent 90dc7f966b95d133f1c65531c6959b52effd5e40)
git_clone(https://github.com/cactus/gostrftime 4544856e3a415ff5668bb75fed36726240ea1f8d)
git_clone(https://github.com/golang/snappy eaa750b9bf4dcb7cb20454be850613b66cda3273)
git_clone(https://github.com/rafrombrc/sarama fda3e239249dd96f4a2c446aea39dfc823f4030a)

git_clone(https://github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a)
git_clone(https://github.com/eapache/go-resiliency v1.0.0)
git_clone(https://github.com/eapache/queue v1.0.2)
git_clone_to_path(https://github.com/rafrombrc/sarama f742e1e20b15b31320e0b6ff2f995bc5f0482fed github.com/Shopify/sarama)
git_clone(https://github.com/davecgh/go-spew 2df174808ee097f90d259e432cc04442cf60be21)

add_dependencies(sarama snappy)

if (INCLUDE_GEOIP)
Expand Down
2 changes: 2 additions & 0 deletions docs/source/config/outputs/kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Config:
background (in milliseconds). Default is 600000 (10 minutes). Set to 0 to
disable.

- max_message_bytes (uint32)
Maximum size of a single message. Defaults to Heka's maximum record size.
- max_open_reqests (int)
How many outstanding requests the broker is allowed to have before blocking
attempts to send. Default is 4.
Expand Down
119 changes: 62 additions & 57 deletions plugins/kafka/kafka_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"sync/atomic"
"time"

"github.com/Shopify/sarama"
"github.com/mozilla-services/heka/message"
"github.com/mozilla-services/heka/pipeline"
"github.com/rafrombrc/sarama"
)

type KafkaInputConfig struct {
Expand Down Expand Up @@ -62,10 +62,9 @@ type KafkaInput struct {
processMessageFailures int64

config *KafkaInputConfig
clientConfig *sarama.ClientConfig
consumerConfig *sarama.ConsumerConfig
client *sarama.Client
consumer *sarama.Consumer
saramaConfig *sarama.Config
consumer sarama.Consumer
partitionConsumer sarama.PartitionConsumer
pConfig *pipeline.PipelineConfig
ir pipeline.InputRunner
checkpointFile *os.File
Expand Down Expand Up @@ -141,64 +140,63 @@ func (k *KafkaInput) Init(config interface{}) (err error) {
k.config.Group = k.config.Id
}

k.clientConfig = sarama.NewClientConfig()
k.clientConfig.MetadataRetries = k.config.MetadataRetries
k.clientConfig.WaitForElection = time.Duration(k.config.WaitForElection) * time.Millisecond
k.clientConfig.BackgroundRefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond

k.clientConfig.DefaultBrokerConf = sarama.NewBrokerConfig()
k.clientConfig.DefaultBrokerConf.MaxOpenRequests = k.config.MaxOpenRequests
k.clientConfig.DefaultBrokerConf.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond
k.clientConfig.DefaultBrokerConf.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond
k.clientConfig.DefaultBrokerConf.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond

k.consumerConfig = sarama.NewConsumerConfig()
k.consumerConfig.DefaultFetchSize = k.config.DefaultFetchSize
k.consumerConfig.MinFetchSize = k.config.MinFetchSize
k.consumerConfig.MaxMessageSize = k.config.MaxMessageSize
k.consumerConfig.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond
k.saramaConfig = sarama.NewConfig()
k.saramaConfig.ClientID = k.config.Id
k.saramaConfig.Metadata.Retry.Max = k.config.MetadataRetries
k.saramaConfig.Metadata.Retry.Backoff = time.Duration(k.config.WaitForElection) * time.Millisecond
k.saramaConfig.Metadata.RefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond

k.saramaConfig.Net.MaxOpenRequests = k.config.MaxOpenRequests
k.saramaConfig.Net.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond
k.saramaConfig.Net.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond
k.saramaConfig.Net.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond

k.saramaConfig.Consumer.Fetch.Default = k.config.DefaultFetchSize
k.saramaConfig.Consumer.Fetch.Min = k.config.MinFetchSize
k.saramaConfig.Consumer.Fetch.Max = k.config.MaxMessageSize
k.saramaConfig.Consumer.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond
k.checkpointFilename = k.pConfig.Globals.PrependBaseDir(filepath.Join("kafka",
fmt.Sprintf("%s.%s.%d.offset.bin", k.name, k.config.Topic, k.config.Partition)))

var offset int64
switch k.config.OffsetMethod {
case "Manual":
k.consumerConfig.OffsetMethod = sarama.OffsetMethodManual
if fileExists(k.checkpointFilename) {
if k.consumerConfig.OffsetValue, err = readCheckpoint(k.checkpointFilename); err != nil {
if offset, err = readCheckpoint(k.checkpointFilename); err != nil {
return fmt.Errorf("readCheckpoint %s", err)
}
} else {
if err = os.MkdirAll(filepath.Dir(k.checkpointFilename), 0766); err != nil {
return
return err
}
k.consumerConfig.OffsetMethod = sarama.OffsetMethodOldest
offset = sarama.OffsetOldest
}
case "Newest":
k.consumerConfig.OffsetMethod = sarama.OffsetMethodNewest
offset = sarama.OffsetNewest
if fileExists(k.checkpointFilename) {
if err = os.Remove(k.checkpointFilename); err != nil {
return
return err
}
}
case "Oldest":
k.consumerConfig.OffsetMethod = sarama.OffsetMethodOldest
offset = sarama.OffsetOldest
if fileExists(k.checkpointFilename) {
if err = os.Remove(k.checkpointFilename); err != nil {
return
return err
}
}
default:
return fmt.Errorf("invalid offset_method: %s", k.config.OffsetMethod)
}

k.consumerConfig.EventBufferSize = k.config.EventBufferSize
k.saramaConfig.ChannelBufferSize = k.config.EventBufferSize

k.client, err = sarama.NewClient(k.config.Id, k.config.Addrs, k.clientConfig)
k.consumer, err = sarama.NewConsumer(k.config.Addrs, k.saramaConfig)
if err != nil {
return
return err
}
k.consumer, err = sarama.NewConsumer(k.client, k.config.Topic, k.config.Partition, k.config.Group, k.consumerConfig)
return
k.partitionConsumer, err = k.consumer.ConsumePartition(k.config.Topic, k.config.Partition, offset)
return err
}

func (k *KafkaInput) addField(pack *pipeline.PipelinePack, name string,
Expand All @@ -215,8 +213,8 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err
sRunner := ir.NewSplitterRunner("")

defer func() {
k.partitionConsumer.Close()
k.consumer.Close()
k.client.Close()
if k.checkpointFile != nil {
k.checkpointFile.Close()
}
Expand All @@ -227,7 +225,8 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err

var (
hostname = k.pConfig.Hostname()
event *sarama.ConsumerEvent
event *sarama.ConsumerMessage
cError *sarama.ConsumerError
ok bool
n int
)
Expand All @@ -245,30 +244,15 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err
sRunner.SetPackDecorator(packDec)
}

eventChan := k.partitionConsumer.Messages()
cErrChan := k.partitionConsumer.Errors()
for {
select {
case event, ok = <-k.consumer.Events():
case event, ok = <-eventChan:
if !ok {
return
return nil
}
atomic.AddInt64(&k.processMessageCount, 1)
if event.Err != nil {
if event.Err == sarama.OffsetOutOfRange {
ir.LogError(fmt.Errorf(
"removing the out of range checkpoint file and stopping"))
if k.checkpointFile != nil {
k.checkpointFile.Close()
k.checkpointFile = nil
}
if err := os.Remove(k.checkpointFilename); err != nil {
ir.LogError(err)
}
return
}
atomic.AddInt64(&k.processMessageFailures, 1)
ir.LogError(event.Err)
break
}
if n, err = sRunner.SplitBytes(event.Value, nil); err != nil {
ir.LogError(fmt.Errorf("processing message from topic %s: %s",
event.Topic, err))
Expand All @@ -280,12 +264,33 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err

if k.config.OffsetMethod == "Manual" {
if err = k.writeCheckpoint(event.Offset + 1); err != nil {
return
return err
}
}

case cError, ok = <-cErrChan:
if !ok {
// Don't exit until the eventChan is closed.
ok = true
continue
}
if cError.Err == sarama.ErrOffsetOutOfRange {
ir.LogError(fmt.Errorf(
"removing the out of range checkpoint file and stopping"))
if k.checkpointFile != nil {
k.checkpointFile.Close()
k.checkpointFile = nil
}
if err := os.Remove(k.checkpointFilename); err != nil {
ir.LogError(err)
}
return err
}
atomic.AddInt64(&k.processMessageFailures, 1)
ir.LogError(cError.Err)

case <-k.stopChan:
return
return nil
}
}
}
Expand Down
Loading

0 comments on commit 6f4b61a

Please sign in to comment.