diff --git a/CHANGES.txt b/CHANGES.txt index 2f3ce5147..04dd614b4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,12 +8,19 @@ Backwards Incompatibilities Bug Handling ------------ +* Updated Sarama dependency from pre-1.0 release fork to fork (with only test + code changes) of Sarama 1.5.0 release. + Features -------- * Allow TcpOutput to re-establish the connection after a configurable number of successfully delivered messages. +* Added `git_clone_to_path` to the cmake build to allow git repos to be cloned + into alternate locations; useful for relocating forks of Go packages into + their original import paths. + 0.10.0 (2015-??-??) ===================== diff --git a/cmake/externals.cmake b/cmake/externals.cmake index d554ff4f2..31e4be2be 100644 --- a/cmake/externals.cmake +++ b/cmake/externals.cmake @@ -60,6 +60,21 @@ function(git_clone url tag) add_dependencies(GoPackages ${name}) endfunction(git_clone) +function(git_clone_to_path url tag dest_path) + parse_url(${url}) + externalproject_add( + ${name} + GIT_REPOSITORY ${url} + GIT_TAG ${tag} + SOURCE_DIR "${PROJECT_PATH}/src/${dest_path}" + BUILD_COMMAND "" + CONFIGURE_COMMAND "" + INSTALL_COMMAND "" + UPDATE_COMMAND "" # comment out to enable updates + ) + add_dependencies(GoPackages ${name}) +endfunction(git_clone_to_path) + function(hg_clone url tag) parse_url(${url}) externalproject_add( @@ -146,8 +161,13 @@ git_clone(https://github.com/rafrombrc/gospec 2e46585948f47047b0c217d00fa24bbc4e git_clone(https://github.com/crankycoder/xmlpath 670b185b686fd11aa115291fb2f6dc3ed7ebb488) git_clone(https://github.com/thoj/go-ircevent 90dc7f966b95d133f1c65531c6959b52effd5e40) git_clone(https://github.com/cactus/gostrftime 4544856e3a415ff5668bb75fed36726240ea1f8d) -git_clone(https://github.com/golang/snappy eaa750b9bf4dcb7cb20454be850613b66cda3273) -git_clone(https://github.com/rafrombrc/sarama fda3e239249dd96f4a2c446aea39dfc823f4030a) + +git_clone(https://github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a) +git_clone(https://github.com/eapache/go-resiliency v1.0.0) +git_clone(https://github.com/eapache/queue v1.0.2) +git_clone_to_path(https://github.com/rafrombrc/sarama f742e1e20b15b31320e0b6ff2f995bc5f0482fed github.com/Shopify/sarama) +git_clone(https://github.com/davecgh/go-spew 2df174808ee097f90d259e432cc04442cf60be21) + add_dependencies(sarama snappy) if (INCLUDE_GEOIP) diff --git a/docs/source/config/outputs/kafka.rst b/docs/source/config/outputs/kafka.rst index 19e273ce9..6ca770599 100644 --- a/docs/source/config/outputs/kafka.rst +++ b/docs/source/config/outputs/kafka.rst @@ -24,6 +24,8 @@ Config: background (in milliseconds). Default is 600000 (10 minutes). Set to 0 to disable. +- max_message_bytes (uint32) + Maximum size of a single message. Defaults to Heka's maximum record size. - max_open_reqests (int) How many outstanding requests the broker is allowed to have before blocking attempts to send. Default is 4. diff --git a/plugins/kafka/kafka_input.go b/plugins/kafka/kafka_input.go index 7e16a2aad..3eb6071ff 100644 --- a/plugins/kafka/kafka_input.go +++ b/plugins/kafka/kafka_input.go @@ -24,9 +24,9 @@ import ( "sync/atomic" "time" + "github.com/Shopify/sarama" "github.com/mozilla-services/heka/message" "github.com/mozilla-services/heka/pipeline" - "github.com/rafrombrc/sarama" ) type KafkaInputConfig struct { @@ -62,10 +62,9 @@ type KafkaInput struct { processMessageFailures int64 config *KafkaInputConfig - clientConfig *sarama.ClientConfig - consumerConfig *sarama.ConsumerConfig - client *sarama.Client - consumer *sarama.Consumer + saramaConfig *sarama.Config + consumer sarama.Consumer + partitionConsumer sarama.PartitionConsumer pConfig *pipeline.PipelineConfig ir pipeline.InputRunner checkpointFile *os.File @@ -141,64 +140,63 @@ func (k *KafkaInput) Init(config interface{}) (err error) { k.config.Group = k.config.Id } - k.clientConfig = sarama.NewClientConfig() - k.clientConfig.MetadataRetries = k.config.MetadataRetries - k.clientConfig.WaitForElection = time.Duration(k.config.WaitForElection) * time.Millisecond - k.clientConfig.BackgroundRefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond - - k.clientConfig.DefaultBrokerConf = sarama.NewBrokerConfig() - k.clientConfig.DefaultBrokerConf.MaxOpenRequests = k.config.MaxOpenRequests - k.clientConfig.DefaultBrokerConf.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond - k.clientConfig.DefaultBrokerConf.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond - k.clientConfig.DefaultBrokerConf.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond - - k.consumerConfig = sarama.NewConsumerConfig() - k.consumerConfig.DefaultFetchSize = k.config.DefaultFetchSize - k.consumerConfig.MinFetchSize = k.config.MinFetchSize - k.consumerConfig.MaxMessageSize = k.config.MaxMessageSize - k.consumerConfig.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond + k.saramaConfig = sarama.NewConfig() + k.saramaConfig.ClientID = k.config.Id + k.saramaConfig.Metadata.Retry.Max = k.config.MetadataRetries + k.saramaConfig.Metadata.Retry.Backoff = time.Duration(k.config.WaitForElection) * time.Millisecond + k.saramaConfig.Metadata.RefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond + + k.saramaConfig.Net.MaxOpenRequests = k.config.MaxOpenRequests + k.saramaConfig.Net.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond + k.saramaConfig.Net.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond + k.saramaConfig.Net.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond + + k.saramaConfig.Consumer.Fetch.Default = k.config.DefaultFetchSize + k.saramaConfig.Consumer.Fetch.Min = k.config.MinFetchSize + k.saramaConfig.Consumer.Fetch.Max = k.config.MaxMessageSize + k.saramaConfig.Consumer.MaxWaitTime = time.Duration(k.config.MaxWaitTime) * time.Millisecond k.checkpointFilename = k.pConfig.Globals.PrependBaseDir(filepath.Join("kafka", fmt.Sprintf("%s.%s.%d.offset.bin", k.name, k.config.Topic, k.config.Partition))) + var offset int64 switch k.config.OffsetMethod { case "Manual": - k.consumerConfig.OffsetMethod = sarama.OffsetMethodManual if fileExists(k.checkpointFilename) { - if k.consumerConfig.OffsetValue, err = readCheckpoint(k.checkpointFilename); err != nil { + if offset, err = readCheckpoint(k.checkpointFilename); err != nil { return fmt.Errorf("readCheckpoint %s", err) } } else { if err = os.MkdirAll(filepath.Dir(k.checkpointFilename), 0766); err != nil { - return + return err } - k.consumerConfig.OffsetMethod = sarama.OffsetMethodOldest + offset = sarama.OffsetOldest } case "Newest": - k.consumerConfig.OffsetMethod = sarama.OffsetMethodNewest + offset = sarama.OffsetNewest if fileExists(k.checkpointFilename) { if err = os.Remove(k.checkpointFilename); err != nil { - return + return err } } case "Oldest": - k.consumerConfig.OffsetMethod = sarama.OffsetMethodOldest + offset = sarama.OffsetOldest if fileExists(k.checkpointFilename) { if err = os.Remove(k.checkpointFilename); err != nil { - return + return err } } default: return fmt.Errorf("invalid offset_method: %s", k.config.OffsetMethod) } - k.consumerConfig.EventBufferSize = k.config.EventBufferSize + k.saramaConfig.ChannelBufferSize = k.config.EventBufferSize - k.client, err = sarama.NewClient(k.config.Id, k.config.Addrs, k.clientConfig) + k.consumer, err = sarama.NewConsumer(k.config.Addrs, k.saramaConfig) if err != nil { - return + return err } - k.consumer, err = sarama.NewConsumer(k.client, k.config.Topic, k.config.Partition, k.config.Group, k.consumerConfig) - return + k.partitionConsumer, err = k.consumer.ConsumePartition(k.config.Topic, k.config.Partition, offset) + return err } func (k *KafkaInput) addField(pack *pipeline.PipelinePack, name string, @@ -215,8 +213,8 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err sRunner := ir.NewSplitterRunner("") defer func() { + k.partitionConsumer.Close() k.consumer.Close() - k.client.Close() if k.checkpointFile != nil { k.checkpointFile.Close() } @@ -227,7 +225,8 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err var ( hostname = k.pConfig.Hostname() - event *sarama.ConsumerEvent + event *sarama.ConsumerMessage + cError *sarama.ConsumerError ok bool n int ) @@ -245,30 +244,15 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err sRunner.SetPackDecorator(packDec) } + eventChan := k.partitionConsumer.Messages() + cErrChan := k.partitionConsumer.Errors() for { select { - case event, ok = <-k.consumer.Events(): + case event, ok = <-eventChan: if !ok { - return + return nil } atomic.AddInt64(&k.processMessageCount, 1) - if event.Err != nil { - if event.Err == sarama.OffsetOutOfRange { - ir.LogError(fmt.Errorf( - "removing the out of range checkpoint file and stopping")) - if k.checkpointFile != nil { - k.checkpointFile.Close() - k.checkpointFile = nil - } - if err := os.Remove(k.checkpointFilename); err != nil { - ir.LogError(err) - } - return - } - atomic.AddInt64(&k.processMessageFailures, 1) - ir.LogError(event.Err) - break - } if n, err = sRunner.SplitBytes(event.Value, nil); err != nil { ir.LogError(fmt.Errorf("processing message from topic %s: %s", event.Topic, err)) @@ -280,12 +264,33 @@ func (k *KafkaInput) Run(ir pipeline.InputRunner, h pipeline.PluginHelper) (err if k.config.OffsetMethod == "Manual" { if err = k.writeCheckpoint(event.Offset + 1); err != nil { - return + return err } } + case cError, ok = <-cErrChan: + if !ok { + // Don't exit until the eventChan is closed. + ok = true + continue + } + if cError.Err == sarama.ErrOffsetOutOfRange { + ir.LogError(fmt.Errorf( + "removing the out of range checkpoint file and stopping")) + if k.checkpointFile != nil { + k.checkpointFile.Close() + k.checkpointFile = nil + } + if err := os.Remove(k.checkpointFilename); err != nil { + ir.LogError(err) + } + return err + } + atomic.AddInt64(&k.processMessageFailures, 1) + ir.LogError(cError.Err) + case <-k.stopChan: - return + return nil } } } diff --git a/plugins/kafka/kafka_input_test.go b/plugins/kafka/kafka_input_test.go index faa78def0..2114d746c 100644 --- a/plugins/kafka/kafka_input_test.go +++ b/plugins/kafka/kafka_input_test.go @@ -21,11 +21,11 @@ import ( "path/filepath" "testing" + "github.com/Shopify/sarama" . "github.com/mozilla-services/heka/pipeline" "github.com/mozilla-services/heka/pipelinemock" plugins_ts "github.com/mozilla-services/heka/plugins/testsupport" "github.com/rafrombrc/gomock/gomock" - "github.com/rafrombrc/sarama" ) func TestEmptyInputAddress(t *testing.T) { @@ -59,8 +59,7 @@ func TestInvalidOffsetMethod(t *testing.T) { } func TestReceivePayloadMessage(t *testing.T) { - b1 := sarama.NewMockBroker(t, 1) - b2 := sarama.NewMockBroker(t, 2) + broker := sarama.NewMockBroker(t, 1) ctrl := gomock.NewController(t) tmpDir, tmpErr := ioutil.TempDir("", "kafkainput-tests") if tmpErr != nil { @@ -75,18 +74,18 @@ func TestReceivePayloadMessage(t *testing.T) { }() topic := "test" - mdr := new(sarama.MetadataResponse) - mdr.AddBroker(b2.Addr(), b2.BrokerID()) - mdr.AddTopicPartition(topic, 0, 2) - b1.Returns(mdr) - - or := new(sarama.OffsetResponse) - or.AddTopicPartition(topic, 0, 0) - b2.Returns(or) - - fr := new(sarama.FetchResponse) - fr.AddMessage(topic, 0, nil, sarama.ByteEncoder([]byte{0x41, 0x42}), 0) - b2.Returns(fr) + mockFetchResponse := sarama.NewMockFetchResponse(t, 1) + mockFetchResponse.SetMessage(topic, 0, 0, sarama.ByteEncoder([]byte{0x41, 0x42})) + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader(topic, 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset(topic, 0, sarama.OffsetOldest, 0). + SetOffset(topic, 0, sarama.OffsetNewest, 2), + "FetchRequest": mockFetchResponse, + }) pConfig := NewPipelineConfig(nil) pConfig.Globals.BaseDir = tmpDir @@ -94,7 +93,7 @@ func TestReceivePayloadMessage(t *testing.T) { ki.SetName(topic) ki.SetPipelineConfig(pConfig) config := ki.ConfigStruct().(*KafkaInputConfig) - config.Addrs = append(config.Addrs, b1.Addr()) + config.Addrs = append(config.Addrs, broker.Addr()) config.Topic = topic ith := new(plugins_ts.InputTestHelper) @@ -144,8 +143,7 @@ func TestReceivePayloadMessage(t *testing.T) { // There is a hang on the consumer close with the mock broker // closing the brokers before the consumer works around the issue // and is good enough for this test. - b1.Close() - b2.Close() + broker.Close() ki.Stop() err = <-errChan @@ -164,8 +162,7 @@ func TestReceivePayloadMessage(t *testing.T) { } func TestReceiveProtobufMessage(t *testing.T) { - b1 := sarama.NewMockBroker(t, 1) - b2 := sarama.NewMockBroker(t, 2) + broker := sarama.NewMockBroker(t, 2) ctrl := gomock.NewController(t) tmpDir, tmpErr := ioutil.TempDir("", "kafkainput-tests") if tmpErr != nil { @@ -180,18 +177,18 @@ func TestReceiveProtobufMessage(t *testing.T) { }() topic := "test" - mdr := new(sarama.MetadataResponse) - mdr.AddBroker(b2.Addr(), b2.BrokerID()) - mdr.AddTopicPartition(topic, 0, 2) - b1.Returns(mdr) - - or := new(sarama.OffsetResponse) - or.AddTopicPartition(topic, 0, 0) - b2.Returns(or) - - fr := new(sarama.FetchResponse) - fr.AddMessage(topic, 0, nil, sarama.ByteEncoder([]byte{0x41, 0x42}), 0) - b2.Returns(fr) + mockFetchResponse := sarama.NewMockFetchResponse(t, 1) + mockFetchResponse.SetMessage(topic, 0, 0, sarama.ByteEncoder([]byte{0x41, 0x42})) + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader(topic, 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(t). + SetOffset(topic, 0, sarama.OffsetOldest, 0). + SetOffset(topic, 0, sarama.OffsetNewest, 2), + "FetchRequest": mockFetchResponse, + }) pConfig := NewPipelineConfig(nil) pConfig.Globals.BaseDir = tmpDir @@ -199,7 +196,7 @@ func TestReceiveProtobufMessage(t *testing.T) { ki.SetName(topic) ki.SetPipelineConfig(pConfig) config := ki.ConfigStruct().(*KafkaInputConfig) - config.Addrs = append(config.Addrs, b1.Addr()) + config.Addrs = append(config.Addrs, broker.Addr()) config.Topic = topic ith := new(plugins_ts.InputTestHelper) @@ -237,8 +234,7 @@ func TestReceiveProtobufMessage(t *testing.T) { // There is a hang on the consumer close with the mock broker // closing the brokers before the consumer works around the issue // and is good enough for this test. - b1.Close() - b2.Close() + broker.Close() ki.Stop() err = <-errChan diff --git a/plugins/kafka/kafka_output.go b/plugins/kafka/kafka_output.go index 4ddc95f73..0a90eed8e 100644 --- a/plugins/kafka/kafka_output.go +++ b/plugins/kafka/kafka_output.go @@ -4,11 +4,12 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # The Initial Developer of the Original Code is the Mozilla Foundation. -# Portions created by the Initial Developer are Copyright (C) 2014 +# Portions created by the Initial Developer are Copyright (C) 2014-2015 # the Initial Developer. All Rights Reserved. # # Contributor(s): # Mike Trinkala (trink@mozilla.com) +# Rob Miller (rmiller@mozilla.com) # # ***** END LICENSE BLOCK *****/ @@ -23,9 +24,9 @@ import ( "sync/atomic" "time" + "github.com/Shopify/sarama" "github.com/mozilla-services/heka/message" "github.com/mozilla-services/heka/pipeline" - "github.com/rafrombrc/sarama" ) type KafkaOutputConfig struct { @@ -56,9 +57,9 @@ type KafkaOutputConfig struct { MaxBufferTime uint32 `toml:"max_buffer_time"` MaxBufferedBytes uint32 `toml:"max_buffered_bytes"` BackPressureThresholdBytes uint32 `toml:"back_pressure_threshold_bytes"` + MaxMessageBytes uint32 `toml:"max_message_bytes"` } -var Shutdown = errors.New("Shutdown Kafka error processing") var fieldRegex = regexp.MustCompile("^Fields\\[([^\\]]*)\\](?:\\[(\\d+)\\])?(?:\\[(\\d+)\\])?$") type messageVariable struct { @@ -78,10 +79,9 @@ type KafkaOutput struct { hashVariable *messageVariable topicVariable *messageVariable config *KafkaOutputConfig - cconfig *sarama.ClientConfig - pconfig *sarama.ProducerConfig - client *sarama.Client - producer *sarama.Producer + saramaConfig *sarama.Config + client sarama.Client + producer sarama.AsyncProducer pipelineConfig *pipeline.PipelineConfig } @@ -93,6 +93,7 @@ func (k *KafkaOutput) ConfigStruct() interface{} { WaitForElection: 250, BackgroundRefreshFrequency: 10 * 60 * 1000, MaxOpenRequests: 4, + Timeout: 1000, DialTimeout: 60 * 1000, ReadTimeout: 60 * 1000, WriteTimeout: 60 * 1000, @@ -101,7 +102,6 @@ func (k *KafkaOutput) ConfigStruct() interface{} { CompressionCodec: "None", MaxBufferTime: 1, MaxBufferedBytes: 1, - BackPressureThresholdBytes: 50 * 1024 * 1024, } } @@ -205,32 +205,35 @@ func (k *KafkaOutput) Init(config interface{}) (err error) { return errors.New("addrs must have at least one entry") } - k.cconfig = sarama.NewClientConfig() - k.cconfig.MetadataRetries = k.config.MetadataRetries - k.cconfig.WaitForElection = time.Duration(k.config.WaitForElection) * time.Millisecond - k.cconfig.BackgroundRefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond + if k.config.MaxMessageBytes == 0 { + k.config.MaxMessageBytes = message.MAX_RECORD_SIZE + } - k.cconfig.DefaultBrokerConf = sarama.NewBrokerConfig() - k.cconfig.DefaultBrokerConf.MaxOpenRequests = k.config.MaxOpenRequests - k.cconfig.DefaultBrokerConf.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond - k.cconfig.DefaultBrokerConf.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond - k.cconfig.DefaultBrokerConf.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond + k.saramaConfig = sarama.NewConfig() + k.saramaConfig.ClientID = k.config.Id + k.saramaConfig.Metadata.Retry.Max = k.config.MetadataRetries + k.saramaConfig.Metadata.Retry.Backoff = time.Duration(k.config.WaitForElection) * time.Millisecond + k.saramaConfig.Metadata.RefreshFrequency = time.Duration(k.config.BackgroundRefreshFrequency) * time.Millisecond - k.pconfig = sarama.NewProducerConfig() + k.saramaConfig.Net.MaxOpenRequests = k.config.MaxOpenRequests + k.saramaConfig.Net.DialTimeout = time.Duration(k.config.DialTimeout) * time.Millisecond + k.saramaConfig.Net.ReadTimeout = time.Duration(k.config.ReadTimeout) * time.Millisecond + k.saramaConfig.Net.WriteTimeout = time.Duration(k.config.WriteTimeout) * time.Millisecond + k.saramaConfig.Producer.MaxMessageBytes = int(k.config.MaxMessageBytes) switch k.config.Partitioner { case "Random": - k.pconfig.Partitioner = sarama.NewRandomPartitioner() + k.saramaConfig.Producer.Partitioner = sarama.NewRandomPartitioner if len(k.config.HashVariable) > 0 { return fmt.Errorf("hash_variable should not be set for the %s partitioner", k.config.Partitioner) } case "RoundRobin": - k.pconfig.Partitioner = new(sarama.RoundRobinPartitioner) + k.saramaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner if len(k.config.HashVariable) > 0 { return fmt.Errorf("hash_variable should not be set for the %s partitioner", k.config.Partitioner) } case "Hash": - k.pconfig.Partitioner = sarama.NewHashPartitioner() + k.saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner if k.hashVariable = verifyMessageVariable(k.config.HashVariable); k.hashVariable == nil { return fmt.Errorf("invalid hash_variable: %s", k.config.HashVariable) } @@ -248,54 +251,68 @@ func (k *KafkaOutput) Init(config interface{}) (err error) { switch k.config.RequiredAcks { case "NoResponse": - k.pconfig.RequiredAcks = sarama.NoResponse + k.saramaConfig.Producer.RequiredAcks = sarama.NoResponse case "WaitForLocal": - k.pconfig.RequiredAcks = sarama.WaitForLocal + k.saramaConfig.Producer.RequiredAcks = sarama.WaitForLocal case "WaitForAll": - k.pconfig.RequiredAcks = sarama.WaitForAll + k.saramaConfig.Producer.RequiredAcks = sarama.WaitForAll default: return fmt.Errorf("invalid required_acks: %s", k.config.RequiredAcks) } - k.pconfig.Timeout = time.Duration(k.config.Timeout) * time.Millisecond + k.saramaConfig.Producer.Timeout = time.Duration(k.config.Timeout) * time.Millisecond switch k.config.CompressionCodec { case "None": - k.pconfig.Compression = sarama.CompressionNone + k.saramaConfig.Producer.Compression = sarama.CompressionNone case "GZIP": - k.pconfig.Compression = sarama.CompressionGZIP + k.saramaConfig.Producer.Compression = sarama.CompressionGZIP case "Snappy": - k.pconfig.Compression = sarama.CompressionSnappy + k.saramaConfig.Producer.Compression = sarama.CompressionSnappy default: return fmt.Errorf("invalid compression_codec: %s", k.config.CompressionCodec) } - k.pconfig.MaxBufferedBytes = k.config.MaxBufferedBytes - k.pconfig.MaxBufferTime = time.Duration(k.config.MaxBufferTime) * time.Millisecond - k.pconfig.BackPressureThresholdBytes = k.config.BackPressureThresholdBytes + k.saramaConfig.Producer.Flush.Bytes = int(k.config.MaxBufferedBytes) + k.saramaConfig.Producer.Flush.Frequency = time.Duration(k.config.MaxBufferTime) * time.Millisecond - k.client, err = sarama.NewClient(k.config.Id, k.config.Addrs, k.cconfig) + k.client, err = sarama.NewClient(k.config.Addrs, k.saramaConfig) if err != nil { - return + return err } - k.producer, err = sarama.NewProducer(k.client, k.pconfig) - return + k.producer, err = sarama.NewAsyncProducer(k.config.Addrs, k.saramaConfig) + return err } -func (k *KafkaOutput) processKafkaErrors(or pipeline.OutputRunner, errChan chan error, wg *sync.WaitGroup) { -shutdown: - for err := range errChan { - switch err { - case nil: - case Shutdown: - break shutdown - case sarama.EncodingError: - atomic.AddInt64(&k.kafkaEncodingErrors, 1) - default: - if e, ok := err.(sarama.DroppedMessagesError); ok { - atomic.AddInt64(&k.kafkaDroppedMessages, int64(e.DroppedMessages)) +func (k *KafkaOutput) processKafkaErrors(or pipeline.OutputRunner, errChan <-chan *sarama.ProducerError, + shutdownChan chan struct{}, wg *sync.WaitGroup) { + + var ( + ok = true + pErr *sarama.ProducerError + ) + for ok { + select { + case pErr, ok = <-errChan: + if !ok { + break } - or.LogError(err) + err := pErr.Err + switch err.(type) { + case sarama.PacketEncodingError: + atomic.AddInt64(&k.kafkaEncodingErrors, 1) + or.LogError(fmt.Errorf("kafka encoding error: %s", err.Error())) + default: + atomic.AddInt64(&k.kafkaDroppedMessages, 1) + if err != nil { + msgValue, _ := pErr.Msg.Value.Encode() + or.LogError(fmt.Errorf("kafka error '%s' for message '%s'", err.Error(), + string(msgValue))) + } + } + case <-shutdownChan: + ok = false + break } } wg.Done() @@ -312,11 +329,12 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er } inChan := or.InChan() - useBuffering := or.UsesBuffering() errChan := k.producer.Errors() + pInChan := k.producer.Input() + shutdownChan := make(chan struct{}) var wg sync.WaitGroup wg.Add(1) - go k.processKafkaErrors(or, errChan, &wg) + go k.processKafkaErrors(or, errChan, shutdownChan, &wg) var ( pack *pipeline.PipelinePack @@ -349,17 +367,16 @@ func (k *KafkaOutput) Run(or pipeline.OutputRunner, h pipeline.PluginHelper) (er pack.Recycle(nil) continue } - err = k.producer.QueueMessage(topic, key, sarama.ByteEncoder(msgBytes)) - if err != nil { - if !useBuffering { - atomic.AddInt64(&k.processMessageFailures, 1) - } - or.LogError(err) + pMessage := &sarama.ProducerMessage{ + Topic: topic, + Key: key, + Value: sarama.ByteEncoder(msgBytes), } - pack.Recycle(err) + pInChan <- pMessage + pack.Recycle(nil) } - errChan <- Shutdown + close(shutdownChan) wg.Wait() return } diff --git a/plugins/kafka/kafka_output_test.go b/plugins/kafka/kafka_output_test.go index 706138ffc..8e8f37951 100644 --- a/plugins/kafka/kafka_output_test.go +++ b/plugins/kafka/kafka_output_test.go @@ -9,6 +9,7 @@ # # Contributor(s): # Mike Trinkala (trink@mozilla.com) +# Rob Miller (rmiller@mozilla.com) # # ***** END LICENSE BLOCK *****/ @@ -18,13 +19,13 @@ import ( "sync/atomic" "testing" + "github.com/Shopify/sarama" "github.com/mozilla-services/heka/message" . "github.com/mozilla-services/heka/pipeline" pipeline_ts "github.com/mozilla-services/heka/pipeline/testsupport" "github.com/mozilla-services/heka/plugins" plugins_ts "github.com/mozilla-services/heka/plugins/testsupport" "github.com/rafrombrc/gomock/gomock" - "github.com/rafrombrc/sarama" ) func TestVerifyMessageInvalidVariables(t *testing.T) { @@ -268,12 +269,10 @@ func TestInvalidCompressionCodec(t *testing.T) { func TestSendMessage(t *testing.T) { ctrl := gomock.NewController(t) - b1 := sarama.NewMockBroker(t, 1) - b2 := sarama.NewMockBroker(t, 2) + broker := sarama.NewMockBroker(t, 2) defer func() { - b1.Close() - b2.Close() + broker.Close() ctrl.Finish() }() @@ -281,19 +280,17 @@ func TestSendMessage(t *testing.T) { globals := DefaultGlobals() pConfig := NewPipelineConfig(globals) - mdr := new(sarama.MetadataResponse) - mdr.AddBroker(b2.Addr(), b2.BrokerID()) - mdr.AddTopicPartition(topic, 0, 2) - b1.Returns(mdr) - - pr := new(sarama.ProduceResponse) - pr.AddTopicPartition(topic, 0, sarama.NoError) - b2.Returns(pr) + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader(topic, 0, broker.BrokerID()), + "ProduceRequest": sarama.NewMockProduceResponse(t), + }) ko := new(KafkaOutput) ko.SetPipelineConfig(pConfig) config := ko.ConfigStruct().(*KafkaOutputConfig) - config.Addrs = append(config.Addrs, b1.Addr()) + config.Addrs = append(config.Addrs, broker.Addr()) config.Topic = topic err := ko.Init(config) if err != nil { @@ -309,13 +306,8 @@ func TestSendMessage(t *testing.T) { pack := NewPipelinePack(pConfig.InputRecycleChan()) pack.Message = msg - outStr := "Write me out to the network" - newpack := NewPipelinePack(nil) - newpack.Message = msg - inChanCall := oth.MockOutputRunner.EXPECT().InChan().AnyTimes() inChanCall.Return(inChan) - oth.MockOutputRunner.EXPECT().UsesBuffering().Return(false) errChan := make(chan error) startOutput := func() { @@ -328,6 +320,7 @@ func TestSendMessage(t *testing.T) { oth.MockOutputRunner.EXPECT().Encoder().Return(encoder) oth.MockOutputRunner.EXPECT().Encode(pack).Return(encoder.Encode(pack)) + outStr := "Write me out to the network" pack.Message.SetPayload(outStr) startOutput()