Skip to content

Commit

Permalink
fix: handle unknown topic etc.
Browse files Browse the repository at this point in the history
Refactor processMessagesIntoBatchPipeline to ensure that it closes the message
channel to signal that the message processing is ready including the case when
the message topic does not match any of configured topics. For that move all
processing of the message including the search for topic processors to
runMessageProcessor() groutine and use defer to close the channel. That required
to change the message channel to be of struct{} type and store the error in the
message itself so a simple `defer close(msg.done)` can be used to cover both
normal and error cases.

Provide unit tests for processMessagesIntoBatchPipeline that cover success and
error paths.

Bound batchPipeline capacity by the number of CPU cores to avoid running too
many CPU-intensive tasks in parallel.

Closes #702
Closes #703
Closes #704
  • Loading branch information
ibukanov committed May 21, 2024
1 parent 17fec68 commit 29c3f9f
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 177 deletions.
181 changes: 82 additions & 99 deletions kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"runtime"
"strings"
"time"

Expand All @@ -19,34 +20,27 @@ import (

var brokers []string

// Processor is a function that is used to process Kafka messages
type Processor func(
kafka.Message,
*kafka.Writer,
*server.Server,
*zerolog.Logger,
) error
// Processor is a function that is used to process Kafka messages on
type Processor func(context.Context, kafka.Message, *zerolog.Logger) error

// ProcessingResult contains a message and the topic to which the message should be
// emitted
type ProcessingResult struct {
ResultProducer *kafka.Writer
Message []byte
RequestID string
// Subset of kafka.Reader methods that we use. This is used for testing.
type messageReader interface {
FetchMessage(ctx context.Context) (kafka.Message, error)
Stats() kafka.ReaderStats
}

// TopicMapping represents a kafka topic, how to process it, and where to emit the result.
type TopicMapping struct {
Topic string
ResultProducer *kafka.Writer
Processor Processor
Group string
Topic string
Processor Processor
}

// MessageContext is used for channel coordination when processing batches of messages
type MessageContext struct {
errorResult chan error
msg kafka.Message
// The channel to close when the message is processed.
done chan struct{}
err error
msg kafka.Message
}

// StartConsumers reads configuration variables and starts the associated kafka consumers
Expand All @@ -60,26 +54,30 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
if len(brokers) < 1 {
brokers = strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
}
redeemWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultRedeemV1Topic,
Dialer: getDialer(logger),
})
signWriter := kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultSignV1Topic,
Dialer: getDialer(logger),
})
topicMappings := []TopicMapping{
{
Topic: adsRequestRedeemV1Topic,
ResultProducer: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultRedeemV1Topic,
Dialer: getDialer(logger),
}),
Processor: SignedTokenRedeemHandler,
Group: adsConsumerGroupV1,
Processor: func(ctx context.Context, msg kafka.Message,
logger *zerolog.Logger) error {
return SignedTokenRedeemHandler(ctx, msg, redeemWriter, providedServer, logger)
},
},
{
Topic: adsRequestSignV1Topic,
ResultProducer: kafka.NewWriter(kafka.WriterConfig{
Brokers: brokers,
Topic: adsResultSignV1Topic,
Dialer: getDialer(logger),
}),
Processor: SignedBlindedTokenIssuerHandler,
Group: adsConsumerGroupV1,
Processor: func(ctx context.Context, msg kafka.Message,
logger *zerolog.Logger) error {
return SignedBlindedTokenIssuerHandler(ctx, msg, signWriter, providedServer, logger)
},
},
}
var topics []string
Expand All @@ -89,9 +87,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error

reader := newConsumer(topics, adsConsumerGroupV1, logger)

batchPipeline := make(chan *MessageContext, 100)
// Each message in batchPipeline is associated with goroutine doing
// CPU-intensive cryptography, so limit the channel capacity by CPU cores
// plus some extra buffer to account for IO that a processor may potentially
// do.
batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2)
ctx := context.Background()
go processMessagesIntoBatchPipeline(ctx, topicMappings, providedServer, reader, batchPipeline, logger)
go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger)
for {
err := readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger)
if err != nil {
Expand All @@ -103,7 +105,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error
}

// readAndCommitBatchPipelineResults does a blocking read of the batchPipeline channel and
// then does a blocking read of the errorResult in the MessageContext in the batchPipeline.
// then does a blocking read of the done field in the MessageContext in the batchPipeline.
// When an error appears it means that the channel was closed or a temporary error was
// encountered. In the case of a temporary error, the application returns an error without
// committing so that the next reader gets the same message to try again.
Expand All @@ -113,15 +115,12 @@ func readAndCommitBatchPipelineResults(
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) error {
msgCtx, ok := <-batchPipeline
if !ok {
logger.Error().Msg("batchPipeline channel closed")
return errors.New("batch item error")
}
err := <-msgCtx.errorResult
if err != nil {
logger.Error().Err(err).Msg("temporary failure encountered")
return fmt.Errorf("temporary failure encountered: %w", err)
msgCtx := <-batchPipeline
<-msgCtx.done

if msgCtx.err != nil {
logger.Error().Err(msgCtx.err).Msg("temporary failure encountered")
return fmt.Errorf("temporary failure encountered: %w", msgCtx.err)
}
logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset)
if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil {
Expand All @@ -131,27 +130,17 @@ func readAndCommitBatchPipelineResults(
return nil
}

// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, pushes a
// MessageContext into the batchPipeline to maintain message order, and then spawns a
// goroutine that will process the message and push to errorResult of the MessageContext
// when the processing completes. In case of an error, we panic from this function,
// triggering the deferral which closes the batchPipeline channel. This will result in
// readAndCommitBatchPipelineResults returning an error and the processing loop being recreated.
func processMessagesIntoBatchPipeline(
ctx context.Context,
// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely,
// pushes a MessageContext into the batchPipeline to maintain message order, and
// then spawns a goroutine that will process the message and push to errorResult
// of the MessageContext when the processing completes.
func processMessagesIntoBatchPipeline(ctx context.Context,
topicMappings []TopicMapping,
providedServer *server.Server,
reader *kafka.Reader,
reader messageReader,
batchPipeline chan *MessageContext,
logger *zerolog.Logger,
) {
// During normal operation processMessagesIntoBatchPipeline will never complete and
// this deferral should not run. It's only called if we encounter some unrecoverable
// error.
defer func() {
close(batchPipeline)
}()

// Loop forever
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
Expand All @@ -170,8 +159,8 @@ func processMessagesIntoBatchPipeline(
continue
}
msgCtx := &MessageContext{
errorResult: make(chan error),
msg: msg,
done: make(chan struct{}),
msg: msg,
}
// If batchPipeline has been closed by an error in readAndCommitBatchPipelineResults,
// this write will panic, which is desired behavior, as the rest of the context
Expand All @@ -180,44 +169,33 @@ func processMessagesIntoBatchPipeline(
logger.Debug().Msgf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset)
logger.Debug().Msgf("Reader Stats: %#v", reader.Stats())
logger.Debug().Msgf("topicMappings: %+v", topicMappings)
// Check if any of the existing topicMappings match the fetched message
matchFound := false
for _, topicMapping := range topicMappings {
logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic)
if msg.Topic == topicMapping.Topic {
matchFound = true
go processMessageIntoErrorResultChannel(
msg,
topicMapping,
providedServer,
msgCtx.errorResult,
logger,
)
}
}
if !matchFound {
logger.Error().Msgf("Topic received whose topic is not configured: %s", msg.Topic)
}
go runMessageProcessor(ctx, msgCtx, topicMappings, logger)
}
}

// processMessageIntoErrorResultChannel executes the processor defined by a topicMapping
// on a provided message. It then puts the result into the errChan. This result will be
// nil in cases of success or permanent failures and will be some error in the case that
// a temporary error is encountered.
func processMessageIntoErrorResultChannel(
msg kafka.Message,
topicMapping TopicMapping,
providedServer *server.Server,
errChan chan error,
// The function to execute the processor defined by a topicMapping on a provided
// message. This runs on own goroutine and closes msgCtx.done to signal
// completion. It keeps msgCtx.err as nil in cases of success or permanent
// failures and will set msgCtx.err in the case that a temporary error is
// encountered.
func runMessageProcessor(
ctx context.Context,
msgCtx *MessageContext,
topicMappings []TopicMapping,
logger *zerolog.Logger,
) {
errChan <- topicMapping.Processor(
msg,
topicMapping.ResultProducer,
providedServer,
logger,
)
defer close(msgCtx.done)
msg := msgCtx.msg
for _, topicMapping := range topicMappings {
logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic)
if msg.Topic == topicMapping.Topic {
msgCtx.err = topicMapping.Processor(ctx, msg, logger)
return
}
}
// This is a permanent error, so do not set msgCtx.err to commit the
// received message.
logger.Error().Msgf("topic received whose topic is not configured: %s", msg.Topic)
}

// NewConsumer returns a Kafka reader configured for the given topic and group.
Expand All @@ -242,7 +220,12 @@ func newConsumer(topics []string, groupID string, logger *zerolog.Logger) *kafka
}

// Emit sends a message over the Kafka interface.
func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error {
func Emit(
ctx context.Context,
producer *kafka.Writer,
message []byte,
logger *zerolog.Logger,
) error {
logger.Info().Msgf("Beginning data emission for topic %s", producer.Topic)

messageKey := uuid.New()
Expand All @@ -253,7 +236,7 @@ func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error
}

err = producer.WriteMessages(
context.Background(),
ctx,
kafka.Message{
Value: []byte(message),
Key: []byte(marshaledMessageKey),
Expand Down
112 changes: 112 additions & 0 deletions kafka/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Package kafka manages kafka interaction
package kafka

import (
"context"
"errors"
"sync/atomic"
"testing"

"github.com/rs/zerolog"
"github.com/segmentio/kafka-go"
"github.com/stretchr/testify/assert"
)

type testMessageReader struct {
fetch func() (kafka.Message, error)
}

func (r *testMessageReader) FetchMessage(ctx context.Context) (kafka.Message, error) {
return r.fetch()
}

func (r *testMessageReader) Stats() kafka.ReaderStats {
return kafka.ReaderStats{}
}

func TestProcessMessagesIntoBatchPipeline(t *testing.T) {
nopLog := zerolog.Nop()
t.Run("AbsentTopicClosesMsg", func(t *testing.T) {
t.Parallel()

batchPipeline := make(chan *MessageContext)

r := &testMessageReader{}
messageCounter := 0
r.fetch = func() (kafka.Message, error) {
messageCounter++
if messageCounter == 1 {
return kafka.Message{Topic: "absent"}, nil
}
// processMessagesIntoBatchPipeline never returns, so leak its
// goroutine via blocking here forever.
select {}
}
go processMessagesIntoBatchPipeline(context.Background(),
nil, r, batchPipeline, &nopLog)
msg := <-batchPipeline
assert.NotNil(t, msg)
<-msg.done
assert.Equal(t, "absent", msg.msg.Topic)

// Absent topic signals permanent error and the message should be
// committed, so msg.err must be nil.
assert.Nil(t, msg.err)
})

t.Run("OrderPreserved", func(t *testing.T) {
t.Parallel()

// The capacity of the pipeline. The code below posts double amount of
// messages.
N := 50
batchPipeline := make(chan *MessageContext, N)

r := &testMessageReader{}
messageCounter := 0
r.fetch = func() (kafka.Message, error) {
i := messageCounter
messageCounter++
if i < 2*N {
// processMessagesIntoBatchPipeline() does not touch
// Message.Partition, so use that to pass message number info to
// Processor below.
return kafka.Message{Topic: "topicA", Partition: i}, nil
}
select {} // block forever
}
atomicCounter := int32(N)
topicMappings := []TopicMapping{{
Topic: "topicA",
Processor: func(ctx context.Context, msg kafka.Message, logger *zerolog.Logger) error {
if msg.Partition < N {
// Make processor to post results in the reverse order of
// messages using a busy wait
for atomic.LoadInt32(&atomicCounter) != int32(msg.Partition+1) {
}
atomic.AddInt32(&atomicCounter, int32(-1))
}

if msg.Partition == 0 || msg.Partition == N {
return errors.New("error")
}
return nil
},
}}

go processMessagesIntoBatchPipeline(context.Background(),
topicMappings, r, batchPipeline, &nopLog)
for i := 0; i < 2*N; i++ {
msg := <-batchPipeline
assert.NotNil(t, msg)
<-msg.done
assert.Equal(t, "topicA", msg.msg.Topic)
assert.Equal(t, i, msg.msg.Partition)
if i == 0 || i == N {
assert.NotNil(t, msg.err)
} else {
assert.Nil(t, msg.err)
}
}
})
}
Loading

0 comments on commit 29c3f9f

Please sign in to comment.