diff --git a/kafka-http-connector/main.go b/kafka-http-connector/main.go index bce3a8d..e7dd839 100644 --- a/kafka-http-connector/main.go +++ b/kafka-http-connector/main.go @@ -179,6 +179,28 @@ func getConfig(metadata kafkaMetadata) (*sarama.Config, error) { return config, nil } +func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []sarama.RecordHeader) { + var ( + key []byte + ) + tombstone := false + cleaned := make([]sarama.RecordHeader, 0, len(headers)) + for _, header := range headers { + if strings.ToLower(string(header.Key)) == "keda-message-key" { + key = header.Value + continue + } + + if strings.ToLower(string(header.Key)) == "keda-message-tombstone" { + tombstone = true + continue + } + cleaned = append(cleaned, header) + } + + return key, tombstone, cleaned +} + // kafkaConnector represents a Sarama consumer group consumer type kafkaConnector struct { ready chan bool @@ -206,7 +228,7 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl // The `ConsumeClaim` itself is called within a goroutine, see: // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 for message := range claim.Messages() { - conn.logger.Info(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)) + conn.logger.Info(fmt.Sprintf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", string(message.Key), string(message.Value), message.Timestamp, message.Topic)) msg := string(message.Value) headers := http.Header{ @@ -217,6 +239,17 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl "KEDA-Source-Name": {conn.connectorData.SourceName}, } + // Add the message key, if it's been set. + if message.Key != nil { + headers.Add("KEDA-Message-Key", string(message.Key)) + } + + // Indicate that this is a tombstone, not a empty message. + // Normally indicative of a deletion request + if message.Value == nil { + headers.Add("KEDA-Message-Tombstone", "true") + } + // Set the headers came from Kafka record for _, h := range message.Headers { if utf8.ValidString(string(h.Value)) { @@ -226,21 +259,14 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl resp, err := common.HandleHTTPRequest(msg, headers, conn.connectorData, conn.logger) if err != nil { - conn.errorHandler(err) + conn.errorHandler(resp, err) } else { body, err := io.ReadAll(resp.Body) if err != nil { - conn.errorHandler(err) + conn.errorHandler(nil, err) } else { // Generate Kafka record headers - var kafkaRecordHeaders []sarama.RecordHeader - - for k, v := range resp.Header { - // One key may have multiple values - for _, v := range v { - kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}) - } - } + kafkaRecordHeaders := mapHeaders(resp) if success := conn.responseHandler(string(body), kafkaRecordHeaders); success { session.MarkMessage(message, "") } @@ -254,12 +280,33 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl return nil } -func (conn *kafkaConnector) errorHandler(err error) { +func mapHeaders(resp *http.Response) []sarama.RecordHeader { + var kafkaRecordHeaders []sarama.RecordHeader + + for k, v := range resp.Header { + // One key may have multiple values + for _, v := range v { + kafkaRecordHeaders = append(kafkaRecordHeaders, sarama.RecordHeader{Key: []byte(k), Value: []byte(v)}) + } + } + return kafkaRecordHeaders +} + +func (conn *kafkaConnector) errorHandler(resp *http.Response, err error) { if len(conn.connectorData.ErrorTopic) > 0 { - _, _, e := conn.producer.SendMessage(&sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ Topic: conn.connectorData.ErrorTopic, Value: sarama.StringEncoder(err.Error()), - }) + } + if resp != nil { + kafkaRecordHeaders := mapHeaders(resp) + key, _, headers := extractControlHeaders(kafkaRecordHeaders) + message.Headers = headers + if key != nil { + message.Key = sarama.StringEncoder(key) + } + } + _, _, e := conn.producer.SendMessage(message) if e != nil { conn.logger.Error("failed to publish message to error topic", zap.Error(e), @@ -277,12 +324,25 @@ func (conn *kafkaConnector) errorHandler(err error) { } func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordHeader) bool { + + // extract the key and tombstone should they exist. + key, tombstone, headers := extractControlHeaders(headers) + if len(conn.connectorData.ResponseTopic) > 0 { - _, _, err := conn.producer.SendMessage(&sarama.ProducerMessage{ + message := &sarama.ProducerMessage{ Topic: conn.connectorData.ResponseTopic, - Value: sarama.StringEncoder(msg), Headers: headers, - }) + } + + if key != nil { + message.Key = sarama.StringEncoder(key) + } + + if len(msg) > 0 || !tombstone { + message.Value = sarama.StringEncoder(msg) + } + + _, _, err := conn.producer.SendMessage(message) if err != nil { conn.logger.Warn("failed to publish response body from http request to topic", zap.Error(err),