diff --git a/Makefile b/Makefile index 27bdefd0..3bff52b3 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ docker-test: --key-schema AttributeName=id,KeyType=HASH \ --billing-mode PAY_PER_REQUEST \ --table-name redemptions --endpoint-url http://dynamodb:8000 --region us-west-2 ) \ - && go test ./..." + && go test -v ./..." docker-lint: docker-compose -f docker-compose.yml -f docker-compose.dev.yml run --rm -p 2416:2416 challenge-bypass golangci-lint run diff --git a/kafka/main.go b/kafka/main.go index 37b322b8..359da047 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "runtime" "strings" "time" @@ -19,34 +20,27 @@ import ( var brokers []string -// Processor is a function that is used to process Kafka messages -type Processor func( - kafka.Message, - *kafka.Writer, - *server.Server, - *zerolog.Logger, -) error +// Processor is a function that is used to process Kafka messages on +type Processor func(context.Context, kafka.Message, *zerolog.Logger) error -// ProcessingResult contains a message and the topic to which the message should be -// emitted -type ProcessingResult struct { - ResultProducer *kafka.Writer - Message []byte - RequestID string +// Subset of kafka.Reader methods that we use. This is used for testing. +type messageReader interface { + FetchMessage(ctx context.Context) (kafka.Message, error) + Stats() kafka.ReaderStats } // TopicMapping represents a kafka topic, how to process it, and where to emit the result. type TopicMapping struct { - Topic string - ResultProducer *kafka.Writer - Processor Processor - Group string + Topic string + Processor Processor } // MessageContext is used for channel coordination when processing batches of messages type MessageContext struct { - errorResult chan error - msg kafka.Message + // The channel to close when the message is processed. + done chan struct{} + err error + msg kafka.Message } // StartConsumers reads configuration variables and starts the associated kafka consumers @@ -60,26 +54,30 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error if len(brokers) < 1 { brokers = strings.Split(os.Getenv("KAFKA_BROKERS"), ",") } + redeemWriter := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: adsResultRedeemV1Topic, + Dialer: getDialer(logger), + }) + signWriter := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: adsResultSignV1Topic, + Dialer: getDialer(logger), + }) topicMappings := []TopicMapping{ { Topic: adsRequestRedeemV1Topic, - ResultProducer: kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, - Topic: adsResultRedeemV1Topic, - Dialer: getDialer(logger), - }), - Processor: SignedTokenRedeemHandler, - Group: adsConsumerGroupV1, + Processor: func(ctx context.Context, msg kafka.Message, + logger *zerolog.Logger) error { + return SignedTokenRedeemHandler(ctx, msg, redeemWriter, providedServer, logger) + }, }, { Topic: adsRequestSignV1Topic, - ResultProducer: kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, - Topic: adsResultSignV1Topic, - Dialer: getDialer(logger), - }), - Processor: SignedBlindedTokenIssuerHandler, - Group: adsConsumerGroupV1, + Processor: func(ctx context.Context, msg kafka.Message, + logger *zerolog.Logger) error { + return SignedBlindedTokenIssuerHandler(ctx, msg, signWriter, providedServer, logger) + }, }, } var topics []string @@ -89,9 +87,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error reader := newConsumer(topics, adsConsumerGroupV1, logger) - batchPipeline := make(chan *MessageContext, 100) + // Each message in batchPipeline is associated with goroutine doing + // CPU-intensive cryptography, so limit the channel capacity by CPU cores + // plus some extra buffer to account for IO that a processor may potentially + // do. + batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2) ctx := context.Background() - go processMessagesIntoBatchPipeline(ctx, topicMappings, providedServer, reader, batchPipeline, logger) + go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger) for { err := readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger) if err != nil { @@ -103,7 +105,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error } // readAndCommitBatchPipelineResults does a blocking read of the batchPipeline channel and -// then does a blocking read of the errorResult in the MessageContext in the batchPipeline. +// then does a blocking read of the done field in the MessageContext in the batchPipeline. // When an error appears it means that the channel was closed or a temporary error was // encountered. In the case of a temporary error, the application returns an error without // committing so that the next reader gets the same message to try again. @@ -113,15 +115,12 @@ func readAndCommitBatchPipelineResults( batchPipeline chan *MessageContext, logger *zerolog.Logger, ) error { - msgCtx, ok := <-batchPipeline - if !ok { - logger.Error().Msg("batchPipeline channel closed") - return errors.New("batch item error") - } - err := <-msgCtx.errorResult - if err != nil { - logger.Error().Err(err).Msg("temporary failure encountered") - return fmt.Errorf("temporary failure encountered: %w", err) + msgCtx := <-batchPipeline + <-msgCtx.done + + if msgCtx.err != nil { + logger.Error().Err(msgCtx.err).Msg("temporary failure encountered") + return fmt.Errorf("temporary failure encountered: %w", msgCtx.err) } logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset) if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil { @@ -131,27 +130,17 @@ func readAndCommitBatchPipelineResults( return nil } -// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, pushes a -// MessageContext into the batchPipeline to maintain message order, and then spawns a -// goroutine that will process the message and push to errorResult of the MessageContext -// when the processing completes. In case of an error, we panic from this function, -// triggering the deferral which closes the batchPipeline channel. This will result in -// readAndCommitBatchPipelineResults returning an error and the processing loop being recreated. -func processMessagesIntoBatchPipeline( - ctx context.Context, +// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, +// pushes a MessageContext into the batchPipeline to maintain message order, and +// then spawns a goroutine that will process the message and push to errorResult +// of the MessageContext when the processing completes. +func processMessagesIntoBatchPipeline(ctx context.Context, topicMappings []TopicMapping, - providedServer *server.Server, - reader *kafka.Reader, + reader messageReader, batchPipeline chan *MessageContext, logger *zerolog.Logger, ) { - // During normal operation processMessagesIntoBatchPipeline will never complete and - // this deferral should not run. It's only called if we encounter some unrecoverable - // error. - defer func() { - close(batchPipeline) - }() - + // Loop forever for { msg, err := reader.FetchMessage(ctx) if err != nil { @@ -170,8 +159,8 @@ func processMessagesIntoBatchPipeline( continue } msgCtx := &MessageContext{ - errorResult: make(chan error), - msg: msg, + done: make(chan struct{}), + msg: msg, } // If batchPipeline has been closed by an error in readAndCommitBatchPipelineResults, // this write will panic, which is desired behavior, as the rest of the context @@ -180,44 +169,33 @@ func processMessagesIntoBatchPipeline( logger.Debug().Msgf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset) logger.Debug().Msgf("Reader Stats: %#v", reader.Stats()) logger.Debug().Msgf("topicMappings: %+v", topicMappings) - // Check if any of the existing topicMappings match the fetched message - matchFound := false - for _, topicMapping := range topicMappings { - logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic) - if msg.Topic == topicMapping.Topic { - matchFound = true - go processMessageIntoErrorResultChannel( - msg, - topicMapping, - providedServer, - msgCtx.errorResult, - logger, - ) - } - } - if !matchFound { - logger.Error().Msgf("Topic received whose topic is not configured: %s", msg.Topic) - } + go runMessageProcessor(ctx, msgCtx, topicMappings, logger) } } -// processMessageIntoErrorResultChannel executes the processor defined by a topicMapping -// on a provided message. It then puts the result into the errChan. This result will be -// nil in cases of success or permanent failures and will be some error in the case that -// a temporary error is encountered. -func processMessageIntoErrorResultChannel( - msg kafka.Message, - topicMapping TopicMapping, - providedServer *server.Server, - errChan chan error, +// The function to execute the processor defined by a topicMapping on a provided +// message. This runs on own goroutine and closes msgCtx.done to signal +// completion. It keeps msgCtx.err as nil in cases of success or permanent +// failures and will set msgCtx.err in the case that a temporary error is +// encountered. +func runMessageProcessor( + ctx context.Context, + msgCtx *MessageContext, + topicMappings []TopicMapping, logger *zerolog.Logger, ) { - errChan <- topicMapping.Processor( - msg, - topicMapping.ResultProducer, - providedServer, - logger, - ) + defer close(msgCtx.done) + msg := msgCtx.msg + for _, topicMapping := range topicMappings { + logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic) + if msg.Topic == topicMapping.Topic { + msgCtx.err = topicMapping.Processor(ctx, msg, logger) + return + } + } + // This is a permanent error, so do not set msgCtx.err to commit the + // received message. + logger.Error().Msgf("topic received whose topic is not configured: %s", msg.Topic) } // NewConsumer returns a Kafka reader configured for the given topic and group. @@ -242,7 +220,12 @@ func newConsumer(topics []string, groupID string, logger *zerolog.Logger) *kafka } // Emit sends a message over the Kafka interface. -func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error { +func Emit( + ctx context.Context, + producer *kafka.Writer, + message []byte, + logger *zerolog.Logger, +) error { logger.Info().Msgf("Beginning data emission for topic %s", producer.Topic) messageKey := uuid.New() @@ -253,7 +236,7 @@ func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error } err = producer.WriteMessages( - context.Background(), + ctx, kafka.Message{ Value: []byte(message), Key: []byte(marshaledMessageKey), diff --git a/kafka/main_test.go b/kafka/main_test.go new file mode 100644 index 00000000..86fc86ac --- /dev/null +++ b/kafka/main_test.go @@ -0,0 +1,112 @@ +// Package kafka manages kafka interaction +package kafka + +import ( + "context" + "errors" + "sync/atomic" + "testing" + + "github.com/rs/zerolog" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" +) + +type testMessageReader struct { + fetch func() (kafka.Message, error) +} + +func (r *testMessageReader) FetchMessage(ctx context.Context) (kafka.Message, error) { + return r.fetch() +} + +func (r *testMessageReader) Stats() kafka.ReaderStats { + return kafka.ReaderStats{} +} + +func TestProcessMessagesIntoBatchPipeline(t *testing.T) { + nopLog := zerolog.Nop() + t.Run("AbsentTopicClosesMsg", func(t *testing.T) { + t.Parallel() + + batchPipeline := make(chan *MessageContext) + + r := &testMessageReader{} + messageCounter := 0 + r.fetch = func() (kafka.Message, error) { + messageCounter++ + if messageCounter == 1 { + return kafka.Message{Topic: "absent"}, nil + } + // processMessagesIntoBatchPipeline never returns, so leak its + // goroutine via blocking here forever. + select {} + } + go processMessagesIntoBatchPipeline(context.Background(), + nil, r, batchPipeline, &nopLog) + msg := <-batchPipeline + assert.NotNil(t, msg) + <-msg.done + assert.Equal(t, "absent", msg.msg.Topic) + + // Absent topic signals permanent error and the message should be + // committed, so msg.err must be nil. + assert.Nil(t, msg.err) + }) + + t.Run("OrderPreserved", func(t *testing.T) { + t.Parallel() + + // The capacity of the pipeline. The code below posts double amount of + // messages. + N := 50 + batchPipeline := make(chan *MessageContext, N) + + r := &testMessageReader{} + messageCounter := 0 + r.fetch = func() (kafka.Message, error) { + i := messageCounter + messageCounter++ + if i < 2*N { + // processMessagesIntoBatchPipeline() does not touch + // Message.Partition, so use that to pass message number info to + // Processor below. + return kafka.Message{Topic: "topicA", Partition: i}, nil + } + select {} // block forever + } + atomicCounter := int32(N) + topicMappings := []TopicMapping{{ + Topic: "topicA", + Processor: func(ctx context.Context, msg kafka.Message, logger *zerolog.Logger) error { + if msg.Partition < N { + // Make processor to post results in the reverse order of + // messages using a busy wait + for atomic.LoadInt32(&atomicCounter) != int32(msg.Partition+1) { + } + atomic.AddInt32(&atomicCounter, int32(-1)) + } + + if msg.Partition == 0 || msg.Partition == N { + return errors.New("error") + } + return nil + }, + }} + + go processMessagesIntoBatchPipeline(context.Background(), + topicMappings, r, batchPipeline, &nopLog) + for i := 0; i < 2*N; i++ { + msg := <-batchPipeline + assert.NotNil(t, msg) + <-msg.done + assert.Equal(t, "topicA", msg.msg.Topic) + assert.Equal(t, i, msg.msg.Partition) + if i == 0 || i == N { + assert.NotNil(t, msg.err) + } else { + assert.Nil(t, msg.err) + } + } + }) +} diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 291bf811..8990098f 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -2,6 +2,7 @@ package kafka import ( "bytes" + "context" "errors" "fmt" "math" @@ -30,6 +31,7 @@ SignedBlindedTokenIssuerHandler emits signed, blinded tokens based on provided b as an argument here. That will require a bit of refactoring. */ func SignedBlindedTokenIssuerHandler( + ctx context.Context, msg kafka.Message, producer *kafka.Writer, server *cbpServer.Server, @@ -58,7 +60,8 @@ func SignedBlindedTokenIssuerHandler( "request %s: failed avro deserialization", blindedTokenRequestSet.Request_id, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -71,7 +74,6 @@ func SignedBlindedTokenIssuerHandler( producer, log, ) - return nil } logger := log.With().Str("request_id", blindedTokenRequestSet.Request_id).Logger() @@ -86,7 +88,8 @@ func SignedBlindedTokenIssuerHandler( "request %s: data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected", blindedTokenRequestSet.Request_id, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -99,7 +102,6 @@ func SignedBlindedTokenIssuerHandler( producer, &logger, ) - return nil } OUTER: @@ -256,7 +258,8 @@ OUTER: marshaledDLEQProof, err := DLEQProof.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal dleq proof: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -269,7 +272,6 @@ OUTER: producer, &logger, ) - return nil } var marshaledBlindedTokens []string @@ -277,7 +279,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -290,7 +293,6 @@ OUTER: producer, &logger, ) - return nil } marshaledBlindedTokens = append(marshaledBlindedTokens, string(marshaledToken)) } @@ -300,7 +302,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal new tokens to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -313,7 +316,6 @@ OUTER: producer, &logger, ) - return nil } marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken)) } @@ -323,7 +325,8 @@ OUTER: marshaledPublicKey, err := publicKey.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal signing key: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -336,7 +339,6 @@ OUTER: producer, &logger, ) - return nil } blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{ @@ -385,7 +387,8 @@ OUTER: if err != nil { message := fmt.Sprintf("request %s: could not marshal dleq proof: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -398,7 +401,6 @@ OUTER: producer, &logger, ) - return nil } var marshaledBlindedTokens []string @@ -406,7 +408,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -419,7 +422,6 @@ OUTER: producer, &logger, ) - return nil } marshaledBlindedTokens = append(marshaledBlindedTokens, string(marshaledToken)) } @@ -429,7 +431,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("error could not marshal new tokens to bytes: %s", err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -442,7 +445,6 @@ OUTER: producer, &logger, ) - return nil } marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken)) } @@ -451,7 +453,8 @@ OUTER: marshaledPublicKey, err := publicKey.MarshalText() if err != nil { message := fmt.Sprintf("error could not marshal signing key: %s", err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -464,7 +467,6 @@ OUTER: producer, &logger, ) - return nil } blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{ @@ -492,7 +494,8 @@ OUTER: blindedTokenRequestSet.Request_id, resultSet, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -505,12 +508,11 @@ OUTER: producer, &logger, ) - return nil } logger.Info().Msg("ending blinded token request processor loop") logger.Info().Msgf("about to emit: %+v", resultSet) - err = Emit(producer, resultSetBuffer.Bytes(), log) + err = Emit(ctx, producer, resultSetBuffer.Bytes(), log) if err != nil { message := fmt.Sprintf( "request %s: failed to emit to topic %s with result: %v", @@ -537,9 +539,8 @@ func avroIssuerErrorResultFromError( issuerResultStatus int32, requestID string, msg kafka.Message, - producer *kafka.Writer, logger *zerolog.Logger, -) *ProcessingResult { +) []byte { signingResult := avroSchema.SigningResultV2{ Blinded_tokens: marshaledBlindedTokens, Signed_tokens: marshaledSignedTokens, @@ -556,23 +557,16 @@ func avroIssuerErrorResultFromError( err := resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", requestID) - return &ProcessingResult{ - Message: []byte(message), - ResultProducer: producer, - RequestID: requestID, - } + return []byte(message) } - return &ProcessingResult{ - Message: resultSetBuffer.Bytes(), - ResultProducer: producer, - RequestID: requestID, - } + return resultSetBuffer.Bytes() } // handlePermanentIssuanceError is a convenience function to both generate a result from -// an errorand emit it. +// an error and emit it. func handlePermanentIssuanceError( + ctx context.Context, message string, cause error, marshaledBlindedTokens []string, @@ -584,9 +578,9 @@ func handlePermanentIssuanceError( msg kafka.Message, producer *kafka.Writer, logger *zerolog.Logger, -) { +) error { logger.Error().Err(cause).Msgf("encountered permanent issuance failure: %v", message) - processingResult := avroIssuerErrorResultFromError( + toEmit := avroIssuerErrorResultFromError( message, marshaledBlindedTokens, marshaledSignedTokens, @@ -595,11 +589,14 @@ func handlePermanentIssuanceError( issuerResultStatus, requestID, msg, - producer, logger, ) - if err := Emit(producer, processingResult.Message, logger); err != nil { + if err := Emit(ctx, producer, toEmit, logger); err != nil { logger.Error().Err(err).Msg("failed to emit") } + // TODO: consider returning err here as failing to emit error should not + // commit messages the same way as failing to emit a success does not + // commit. + return nil } diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index a7fb8600..88c26d0d 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -2,12 +2,14 @@ package kafka import ( "bytes" + "context" "errors" "fmt" - "github.com/brave-intl/challenge-bypass-server/model" "strings" "time" + "github.com/brave-intl/challenge-bypass-server/model" + crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -31,6 +33,7 @@ type SignedIssuerToken struct { } func SignedTokenRedeemHandler( + ctx context.Context, msg kafka.Message, producer *kafka.Writer, server *cbpServer.Server, @@ -41,7 +44,8 @@ func SignedTokenRedeemHandler( tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { message := fmt.Sprintf("request %s: failed avro deserialization", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -50,7 +54,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } logger := log.With().Str("request_id", tokenRedeemRequestSet.Request_id).Logger() @@ -62,7 +65,8 @@ func SignedTokenRedeemHandler( // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. message := fmt.Sprintf("request %s: data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, errors.New("multiple messages"), msg, @@ -71,7 +75,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } issuers, err := server.FetchAllIssuers() if err != nil { @@ -79,7 +82,8 @@ func SignedTokenRedeemHandler( return processingError } message := fmt.Sprintf("request %s: failed to fetch all issuers", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -88,7 +92,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } // Create a lookup for issuers & signing keys based on public key @@ -110,7 +113,8 @@ func SignedTokenRedeemHandler( // Unmarshalling failure is a data issue and is probably permanent. if mErr != nil { message := fmt.Sprintf("request %s: could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -119,7 +123,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } signedTokens[string(marshaledPublicKey)] = SignedIssuerToken{ @@ -169,7 +172,8 @@ func SignedTokenRedeemHandler( // Unmarshaling failure is a data issue and is probably permanent. if err != nil { message := fmt.Sprintf("request %s: could not unmarshal text into preimage", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -178,14 +182,14 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) // Unmarshaling failure is a data issue and is probably permanent. if err != nil { message := fmt.Sprintf("request %s: could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -194,7 +198,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } if signedToken, ok := signedTokens[request.Public_key]; ok { @@ -241,7 +244,8 @@ func SignedTokenRedeemHandler( } } message := fmt.Sprintf("request %s: failed to check redemption equivalence", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -250,7 +254,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } // Continue if there is a duplicate @@ -289,7 +292,8 @@ func SignedTokenRedeemHandler( return err } } - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -298,7 +302,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } logger.Error().Err(fmt.Errorf("request %s: duplicate redemption: %w", tokenRedeemRequestSet.Request_id, err)). @@ -351,7 +354,8 @@ func SignedTokenRedeemHandler( err = resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -360,10 +364,9 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } - err = Emit(producer, resultSetBuffer.Bytes(), log) + err = Emit(ctx, producer, resultSetBuffer.Bytes(), log) if err != nil { message := fmt.Sprintf( "request %s: failed to emit results to topic %s", @@ -392,16 +395,15 @@ func issuerTimeIsNotValid(start *time.Time, end *time.Time) bool { return !bothTimesAreNil } -// avroRedeemErrorResultFromError returns a ProcessingResult that is constructed from the -// provided values. +// avroRedeemErrorResultFromError returns a message to emit that is constructed +// from the provided values. func avroRedeemErrorResultFromError( message string, msg kafka.Message, - producer *kafka.Writer, requestID string, redeemResultStatus int32, logger *zerolog.Logger, -) *ProcessingResult { +) []byte { redeemResult := avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, @@ -416,22 +418,15 @@ func avroRedeemErrorResultFromError( err := resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", requestID) - return &ProcessingResult{ - Message: []byte(message), - ResultProducer: producer, - RequestID: requestID, - } - } - return &ProcessingResult{ - Message: resultSetBuffer.Bytes(), - ResultProducer: producer, - RequestID: requestID, + return []byte(message) } + return resultSetBuffer.Bytes() } // handleRedemptionError is a convenience function that executes a call pattern shared // when handling all errors in the redeem flow func handlePermanentRedemptionError( + ctx context.Context, message string, cause error, msg kafka.Message, @@ -439,17 +434,20 @@ func handlePermanentRedemptionError( requestID string, redeemResultStatus int32, logger *zerolog.Logger, -) { +) error { logger.Error().Err(cause).Msgf("encountered permanent redemption failure: %v", message) - processingResult := avroRedeemErrorResultFromError( + toEmit := avroRedeemErrorResultFromError( message, msg, - producer, requestID, int32(avroSchema.RedeemResultStatusError), logger, ) - if err := Emit(producer, processingResult.Message, logger); err != nil { + if err := Emit(ctx, producer, toEmit, logger); err != nil { logger.Error().Err(err).Msg("failed to emit") } + // TODO: consider returning err here as failing to emit error should not + // commit messages the same way as failing to emit a success does not + // commit. + return nil }