From 29c3f9facd1ca96e59db46c5901915209fa952ae Mon Sep 17 00:00:00 2001 From: Igor Bukanov Date: Mon, 20 May 2024 23:31:54 +0200 Subject: [PATCH 1/2] fix: handle unknown topic etc. Refactor processMessagesIntoBatchPipeline to ensure that it closes the message channel to signal that the message processing is ready including the case when the message topic does not match any of configured topics. For that move all processing of the message including the search for topic processors to runMessageProcessor() groutine and use defer to close the channel. That required to change the message channel to be of struct{} type and store the error in the message itself so a simple `defer close(msg.done)` can be used to cover both normal and error cases. Provide unit tests for processMessagesIntoBatchPipeline that cover success and error paths. Bound batchPipeline capacity by the number of CPU cores to avoid running too many CPU-intensive tasks in parallel. Closes #702 Closes #703 Closes #704 --- kafka/main.go | 181 +++++++++---------- kafka/main_test.go | 112 ++++++++++++ kafka/signed_blinded_token_issuer_handler.go | 77 ++++---- kafka/signed_token_redeem_handler.go | 74 ++++---- 4 files changed, 267 insertions(+), 177 deletions(-) create mode 100644 kafka/main_test.go diff --git a/kafka/main.go b/kafka/main.go index 37b322b8..359da047 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "runtime" "strings" "time" @@ -19,34 +20,27 @@ import ( var brokers []string -// Processor is a function that is used to process Kafka messages -type Processor func( - kafka.Message, - *kafka.Writer, - *server.Server, - *zerolog.Logger, -) error +// Processor is a function that is used to process Kafka messages on +type Processor func(context.Context, kafka.Message, *zerolog.Logger) error -// ProcessingResult contains a message and the topic to which the message should be -// emitted -type ProcessingResult struct { - ResultProducer *kafka.Writer - Message []byte - RequestID string +// Subset of kafka.Reader methods that we use. This is used for testing. +type messageReader interface { + FetchMessage(ctx context.Context) (kafka.Message, error) + Stats() kafka.ReaderStats } // TopicMapping represents a kafka topic, how to process it, and where to emit the result. type TopicMapping struct { - Topic string - ResultProducer *kafka.Writer - Processor Processor - Group string + Topic string + Processor Processor } // MessageContext is used for channel coordination when processing batches of messages type MessageContext struct { - errorResult chan error - msg kafka.Message + // The channel to close when the message is processed. + done chan struct{} + err error + msg kafka.Message } // StartConsumers reads configuration variables and starts the associated kafka consumers @@ -60,26 +54,30 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error if len(brokers) < 1 { brokers = strings.Split(os.Getenv("KAFKA_BROKERS"), ",") } + redeemWriter := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: adsResultRedeemV1Topic, + Dialer: getDialer(logger), + }) + signWriter := kafka.NewWriter(kafka.WriterConfig{ + Brokers: brokers, + Topic: adsResultSignV1Topic, + Dialer: getDialer(logger), + }) topicMappings := []TopicMapping{ { Topic: adsRequestRedeemV1Topic, - ResultProducer: kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, - Topic: adsResultRedeemV1Topic, - Dialer: getDialer(logger), - }), - Processor: SignedTokenRedeemHandler, - Group: adsConsumerGroupV1, + Processor: func(ctx context.Context, msg kafka.Message, + logger *zerolog.Logger) error { + return SignedTokenRedeemHandler(ctx, msg, redeemWriter, providedServer, logger) + }, }, { Topic: adsRequestSignV1Topic, - ResultProducer: kafka.NewWriter(kafka.WriterConfig{ - Brokers: brokers, - Topic: adsResultSignV1Topic, - Dialer: getDialer(logger), - }), - Processor: SignedBlindedTokenIssuerHandler, - Group: adsConsumerGroupV1, + Processor: func(ctx context.Context, msg kafka.Message, + logger *zerolog.Logger) error { + return SignedBlindedTokenIssuerHandler(ctx, msg, signWriter, providedServer, logger) + }, }, } var topics []string @@ -89,9 +87,13 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error reader := newConsumer(topics, adsConsumerGroupV1, logger) - batchPipeline := make(chan *MessageContext, 100) + // Each message in batchPipeline is associated with goroutine doing + // CPU-intensive cryptography, so limit the channel capacity by CPU cores + // plus some extra buffer to account for IO that a processor may potentially + // do. + batchPipeline := make(chan *MessageContext, runtime.NumCPU()+2) ctx := context.Background() - go processMessagesIntoBatchPipeline(ctx, topicMappings, providedServer, reader, batchPipeline, logger) + go processMessagesIntoBatchPipeline(ctx, topicMappings, reader, batchPipeline, logger) for { err := readAndCommitBatchPipelineResults(ctx, reader, batchPipeline, logger) if err != nil { @@ -103,7 +105,7 @@ func StartConsumers(providedServer *server.Server, logger *zerolog.Logger) error } // readAndCommitBatchPipelineResults does a blocking read of the batchPipeline channel and -// then does a blocking read of the errorResult in the MessageContext in the batchPipeline. +// then does a blocking read of the done field in the MessageContext in the batchPipeline. // When an error appears it means that the channel was closed or a temporary error was // encountered. In the case of a temporary error, the application returns an error without // committing so that the next reader gets the same message to try again. @@ -113,15 +115,12 @@ func readAndCommitBatchPipelineResults( batchPipeline chan *MessageContext, logger *zerolog.Logger, ) error { - msgCtx, ok := <-batchPipeline - if !ok { - logger.Error().Msg("batchPipeline channel closed") - return errors.New("batch item error") - } - err := <-msgCtx.errorResult - if err != nil { - logger.Error().Err(err).Msg("temporary failure encountered") - return fmt.Errorf("temporary failure encountered: %w", err) + msgCtx := <-batchPipeline + <-msgCtx.done + + if msgCtx.err != nil { + logger.Error().Err(msgCtx.err).Msg("temporary failure encountered") + return fmt.Errorf("temporary failure encountered: %w", msgCtx.err) } logger.Info().Msgf("Committing offset %d", msgCtx.msg.Offset) if err := reader.CommitMessages(ctx, msgCtx.msg); err != nil { @@ -131,27 +130,17 @@ func readAndCommitBatchPipelineResults( return nil } -// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, pushes a -// MessageContext into the batchPipeline to maintain message order, and then spawns a -// goroutine that will process the message and push to errorResult of the MessageContext -// when the processing completes. In case of an error, we panic from this function, -// triggering the deferral which closes the batchPipeline channel. This will result in -// readAndCommitBatchPipelineResults returning an error and the processing loop being recreated. -func processMessagesIntoBatchPipeline( - ctx context.Context, +// processMessagesIntoBatchPipeline fetches messages from Kafka indefinitely, +// pushes a MessageContext into the batchPipeline to maintain message order, and +// then spawns a goroutine that will process the message and push to errorResult +// of the MessageContext when the processing completes. +func processMessagesIntoBatchPipeline(ctx context.Context, topicMappings []TopicMapping, - providedServer *server.Server, - reader *kafka.Reader, + reader messageReader, batchPipeline chan *MessageContext, logger *zerolog.Logger, ) { - // During normal operation processMessagesIntoBatchPipeline will never complete and - // this deferral should not run. It's only called if we encounter some unrecoverable - // error. - defer func() { - close(batchPipeline) - }() - + // Loop forever for { msg, err := reader.FetchMessage(ctx) if err != nil { @@ -170,8 +159,8 @@ func processMessagesIntoBatchPipeline( continue } msgCtx := &MessageContext{ - errorResult: make(chan error), - msg: msg, + done: make(chan struct{}), + msg: msg, } // If batchPipeline has been closed by an error in readAndCommitBatchPipelineResults, // this write will panic, which is desired behavior, as the rest of the context @@ -180,44 +169,33 @@ func processMessagesIntoBatchPipeline( logger.Debug().Msgf("Processing message for topic %s at offset %d", msg.Topic, msg.Offset) logger.Debug().Msgf("Reader Stats: %#v", reader.Stats()) logger.Debug().Msgf("topicMappings: %+v", topicMappings) - // Check if any of the existing topicMappings match the fetched message - matchFound := false - for _, topicMapping := range topicMappings { - logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic) - if msg.Topic == topicMapping.Topic { - matchFound = true - go processMessageIntoErrorResultChannel( - msg, - topicMapping, - providedServer, - msgCtx.errorResult, - logger, - ) - } - } - if !matchFound { - logger.Error().Msgf("Topic received whose topic is not configured: %s", msg.Topic) - } + go runMessageProcessor(ctx, msgCtx, topicMappings, logger) } } -// processMessageIntoErrorResultChannel executes the processor defined by a topicMapping -// on a provided message. It then puts the result into the errChan. This result will be -// nil in cases of success or permanent failures and will be some error in the case that -// a temporary error is encountered. -func processMessageIntoErrorResultChannel( - msg kafka.Message, - topicMapping TopicMapping, - providedServer *server.Server, - errChan chan error, +// The function to execute the processor defined by a topicMapping on a provided +// message. This runs on own goroutine and closes msgCtx.done to signal +// completion. It keeps msgCtx.err as nil in cases of success or permanent +// failures and will set msgCtx.err in the case that a temporary error is +// encountered. +func runMessageProcessor( + ctx context.Context, + msgCtx *MessageContext, + topicMappings []TopicMapping, logger *zerolog.Logger, ) { - errChan <- topicMapping.Processor( - msg, - topicMapping.ResultProducer, - providedServer, - logger, - ) + defer close(msgCtx.done) + msg := msgCtx.msg + for _, topicMapping := range topicMappings { + logger.Debug().Msgf("topic: %+v, topicMapping: %+v", msg.Topic, topicMapping.Topic) + if msg.Topic == topicMapping.Topic { + msgCtx.err = topicMapping.Processor(ctx, msg, logger) + return + } + } + // This is a permanent error, so do not set msgCtx.err to commit the + // received message. + logger.Error().Msgf("topic received whose topic is not configured: %s", msg.Topic) } // NewConsumer returns a Kafka reader configured for the given topic and group. @@ -242,7 +220,12 @@ func newConsumer(topics []string, groupID string, logger *zerolog.Logger) *kafka } // Emit sends a message over the Kafka interface. -func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error { +func Emit( + ctx context.Context, + producer *kafka.Writer, + message []byte, + logger *zerolog.Logger, +) error { logger.Info().Msgf("Beginning data emission for topic %s", producer.Topic) messageKey := uuid.New() @@ -253,7 +236,7 @@ func Emit(producer *kafka.Writer, message []byte, logger *zerolog.Logger) error } err = producer.WriteMessages( - context.Background(), + ctx, kafka.Message{ Value: []byte(message), Key: []byte(marshaledMessageKey), diff --git a/kafka/main_test.go b/kafka/main_test.go new file mode 100644 index 00000000..86fc86ac --- /dev/null +++ b/kafka/main_test.go @@ -0,0 +1,112 @@ +// Package kafka manages kafka interaction +package kafka + +import ( + "context" + "errors" + "sync/atomic" + "testing" + + "github.com/rs/zerolog" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" +) + +type testMessageReader struct { + fetch func() (kafka.Message, error) +} + +func (r *testMessageReader) FetchMessage(ctx context.Context) (kafka.Message, error) { + return r.fetch() +} + +func (r *testMessageReader) Stats() kafka.ReaderStats { + return kafka.ReaderStats{} +} + +func TestProcessMessagesIntoBatchPipeline(t *testing.T) { + nopLog := zerolog.Nop() + t.Run("AbsentTopicClosesMsg", func(t *testing.T) { + t.Parallel() + + batchPipeline := make(chan *MessageContext) + + r := &testMessageReader{} + messageCounter := 0 + r.fetch = func() (kafka.Message, error) { + messageCounter++ + if messageCounter == 1 { + return kafka.Message{Topic: "absent"}, nil + } + // processMessagesIntoBatchPipeline never returns, so leak its + // goroutine via blocking here forever. + select {} + } + go processMessagesIntoBatchPipeline(context.Background(), + nil, r, batchPipeline, &nopLog) + msg := <-batchPipeline + assert.NotNil(t, msg) + <-msg.done + assert.Equal(t, "absent", msg.msg.Topic) + + // Absent topic signals permanent error and the message should be + // committed, so msg.err must be nil. + assert.Nil(t, msg.err) + }) + + t.Run("OrderPreserved", func(t *testing.T) { + t.Parallel() + + // The capacity of the pipeline. The code below posts double amount of + // messages. + N := 50 + batchPipeline := make(chan *MessageContext, N) + + r := &testMessageReader{} + messageCounter := 0 + r.fetch = func() (kafka.Message, error) { + i := messageCounter + messageCounter++ + if i < 2*N { + // processMessagesIntoBatchPipeline() does not touch + // Message.Partition, so use that to pass message number info to + // Processor below. + return kafka.Message{Topic: "topicA", Partition: i}, nil + } + select {} // block forever + } + atomicCounter := int32(N) + topicMappings := []TopicMapping{{ + Topic: "topicA", + Processor: func(ctx context.Context, msg kafka.Message, logger *zerolog.Logger) error { + if msg.Partition < N { + // Make processor to post results in the reverse order of + // messages using a busy wait + for atomic.LoadInt32(&atomicCounter) != int32(msg.Partition+1) { + } + atomic.AddInt32(&atomicCounter, int32(-1)) + } + + if msg.Partition == 0 || msg.Partition == N { + return errors.New("error") + } + return nil + }, + }} + + go processMessagesIntoBatchPipeline(context.Background(), + topicMappings, r, batchPipeline, &nopLog) + for i := 0; i < 2*N; i++ { + msg := <-batchPipeline + assert.NotNil(t, msg) + <-msg.done + assert.Equal(t, "topicA", msg.msg.Topic) + assert.Equal(t, i, msg.msg.Partition) + if i == 0 || i == N { + assert.NotNil(t, msg.err) + } else { + assert.Nil(t, msg.err) + } + } + }) +} diff --git a/kafka/signed_blinded_token_issuer_handler.go b/kafka/signed_blinded_token_issuer_handler.go index 291bf811..8990098f 100644 --- a/kafka/signed_blinded_token_issuer_handler.go +++ b/kafka/signed_blinded_token_issuer_handler.go @@ -2,6 +2,7 @@ package kafka import ( "bytes" + "context" "errors" "fmt" "math" @@ -30,6 +31,7 @@ SignedBlindedTokenIssuerHandler emits signed, blinded tokens based on provided b as an argument here. That will require a bit of refactoring. */ func SignedBlindedTokenIssuerHandler( + ctx context.Context, msg kafka.Message, producer *kafka.Writer, server *cbpServer.Server, @@ -58,7 +60,8 @@ func SignedBlindedTokenIssuerHandler( "request %s: failed avro deserialization", blindedTokenRequestSet.Request_id, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -71,7 +74,6 @@ func SignedBlindedTokenIssuerHandler( producer, log, ) - return nil } logger := log.With().Str("request_id", blindedTokenRequestSet.Request_id).Logger() @@ -86,7 +88,8 @@ func SignedBlindedTokenIssuerHandler( "request %s: data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected", blindedTokenRequestSet.Request_id, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -99,7 +102,6 @@ func SignedBlindedTokenIssuerHandler( producer, &logger, ) - return nil } OUTER: @@ -256,7 +258,8 @@ OUTER: marshaledDLEQProof, err := DLEQProof.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal dleq proof: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -269,7 +272,6 @@ OUTER: producer, &logger, ) - return nil } var marshaledBlindedTokens []string @@ -277,7 +279,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -290,7 +293,6 @@ OUTER: producer, &logger, ) - return nil } marshaledBlindedTokens = append(marshaledBlindedTokens, string(marshaledToken)) } @@ -300,7 +302,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal new tokens to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -313,7 +316,6 @@ OUTER: producer, &logger, ) - return nil } marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken)) } @@ -323,7 +325,8 @@ OUTER: marshaledPublicKey, err := publicKey.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal signing key: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -336,7 +339,6 @@ OUTER: producer, &logger, ) - return nil } blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{ @@ -385,7 +387,8 @@ OUTER: if err != nil { message := fmt.Sprintf("request %s: could not marshal dleq proof: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -398,7 +401,6 @@ OUTER: producer, &logger, ) - return nil } var marshaledBlindedTokens []string @@ -406,7 +408,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("request %s: could not marshal blinded token slice to bytes: %s", blindedTokenRequestSet.Request_id, err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -419,7 +422,6 @@ OUTER: producer, &logger, ) - return nil } marshaledBlindedTokens = append(marshaledBlindedTokens, string(marshaledToken)) } @@ -429,7 +431,8 @@ OUTER: marshaledToken, err := token.MarshalText() if err != nil { message := fmt.Sprintf("error could not marshal new tokens to bytes: %s", err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -442,7 +445,6 @@ OUTER: producer, &logger, ) - return nil } marshaledSignedTokens = append(marshaledSignedTokens, string(marshaledToken)) } @@ -451,7 +453,8 @@ OUTER: marshaledPublicKey, err := publicKey.MarshalText() if err != nil { message := fmt.Sprintf("error could not marshal signing key: %s", err) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, marshaledBlindedTokens, @@ -464,7 +467,6 @@ OUTER: producer, &logger, ) - return nil } blindedTokenResults = append(blindedTokenResults, avroSchema.SigningResultV2{ @@ -492,7 +494,8 @@ OUTER: blindedTokenRequestSet.Request_id, resultSet, ) - handlePermanentIssuanceError( + return handlePermanentIssuanceError( + ctx, message, err, nil, @@ -505,12 +508,11 @@ OUTER: producer, &logger, ) - return nil } logger.Info().Msg("ending blinded token request processor loop") logger.Info().Msgf("about to emit: %+v", resultSet) - err = Emit(producer, resultSetBuffer.Bytes(), log) + err = Emit(ctx, producer, resultSetBuffer.Bytes(), log) if err != nil { message := fmt.Sprintf( "request %s: failed to emit to topic %s with result: %v", @@ -537,9 +539,8 @@ func avroIssuerErrorResultFromError( issuerResultStatus int32, requestID string, msg kafka.Message, - producer *kafka.Writer, logger *zerolog.Logger, -) *ProcessingResult { +) []byte { signingResult := avroSchema.SigningResultV2{ Blinded_tokens: marshaledBlindedTokens, Signed_tokens: marshaledSignedTokens, @@ -556,23 +557,16 @@ func avroIssuerErrorResultFromError( err := resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", requestID) - return &ProcessingResult{ - Message: []byte(message), - ResultProducer: producer, - RequestID: requestID, - } + return []byte(message) } - return &ProcessingResult{ - Message: resultSetBuffer.Bytes(), - ResultProducer: producer, - RequestID: requestID, - } + return resultSetBuffer.Bytes() } // handlePermanentIssuanceError is a convenience function to both generate a result from -// an errorand emit it. +// an error and emit it. func handlePermanentIssuanceError( + ctx context.Context, message string, cause error, marshaledBlindedTokens []string, @@ -584,9 +578,9 @@ func handlePermanentIssuanceError( msg kafka.Message, producer *kafka.Writer, logger *zerolog.Logger, -) { +) error { logger.Error().Err(cause).Msgf("encountered permanent issuance failure: %v", message) - processingResult := avroIssuerErrorResultFromError( + toEmit := avroIssuerErrorResultFromError( message, marshaledBlindedTokens, marshaledSignedTokens, @@ -595,11 +589,14 @@ func handlePermanentIssuanceError( issuerResultStatus, requestID, msg, - producer, logger, ) - if err := Emit(producer, processingResult.Message, logger); err != nil { + if err := Emit(ctx, producer, toEmit, logger); err != nil { logger.Error().Err(err).Msg("failed to emit") } + // TODO: consider returning err here as failing to emit error should not + // commit messages the same way as failing to emit a success does not + // commit. + return nil } diff --git a/kafka/signed_token_redeem_handler.go b/kafka/signed_token_redeem_handler.go index a7fb8600..88c26d0d 100644 --- a/kafka/signed_token_redeem_handler.go +++ b/kafka/signed_token_redeem_handler.go @@ -2,12 +2,14 @@ package kafka import ( "bytes" + "context" "errors" "fmt" - "github.com/brave-intl/challenge-bypass-server/model" "strings" "time" + "github.com/brave-intl/challenge-bypass-server/model" + crypto "github.com/brave-intl/challenge-bypass-ristretto-ffi" avroSchema "github.com/brave-intl/challenge-bypass-server/avro/generated" "github.com/brave-intl/challenge-bypass-server/btd" @@ -31,6 +33,7 @@ type SignedIssuerToken struct { } func SignedTokenRedeemHandler( + ctx context.Context, msg kafka.Message, producer *kafka.Writer, server *cbpServer.Server, @@ -41,7 +44,8 @@ func SignedTokenRedeemHandler( tokenRedeemRequestSet, err := avroSchema.DeserializeRedeemRequestSet(bytes.NewReader(data)) if err != nil { message := fmt.Sprintf("request %s: failed avro deserialization", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -50,7 +54,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } logger := log.With().Str("request_id", tokenRedeemRequestSet.Request_id).Logger() @@ -62,7 +65,8 @@ func SignedTokenRedeemHandler( // NOTE: When we start supporting multiple requests we will need to review // errors and return values as well. message := fmt.Sprintf("request %s: data array unexpectedly contained more than a single message. This array is intended to make future extension easier, but no more than a single value is currently expected", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, errors.New("multiple messages"), msg, @@ -71,7 +75,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } issuers, err := server.FetchAllIssuers() if err != nil { @@ -79,7 +82,8 @@ func SignedTokenRedeemHandler( return processingError } message := fmt.Sprintf("request %s: failed to fetch all issuers", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -88,7 +92,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } // Create a lookup for issuers & signing keys based on public key @@ -110,7 +113,8 @@ func SignedTokenRedeemHandler( // Unmarshalling failure is a data issue and is probably permanent. if mErr != nil { message := fmt.Sprintf("request %s: could not unmarshal issuer public key into text", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -119,7 +123,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } signedTokens[string(marshaledPublicKey)] = SignedIssuerToken{ @@ -169,7 +172,8 @@ func SignedTokenRedeemHandler( // Unmarshaling failure is a data issue and is probably permanent. if err != nil { message := fmt.Sprintf("request %s: could not unmarshal text into preimage", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -178,14 +182,14 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } verificationSignature := crypto.VerificationSignature{} err = verificationSignature.UnmarshalText([]byte(request.Signature)) // Unmarshaling failure is a data issue and is probably permanent. if err != nil { message := fmt.Sprintf("request %s: could not unmarshal text into verification signature", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -194,7 +198,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } if signedToken, ok := signedTokens[request.Public_key]; ok { @@ -241,7 +244,8 @@ func SignedTokenRedeemHandler( } } message := fmt.Sprintf("request %s: failed to check redemption equivalence", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -250,7 +254,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } // Continue if there is a duplicate @@ -289,7 +292,8 @@ func SignedTokenRedeemHandler( return err } } - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -298,7 +302,6 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } logger.Error().Err(fmt.Errorf("request %s: duplicate redemption: %w", tokenRedeemRequestSet.Request_id, err)). @@ -351,7 +354,8 @@ func SignedTokenRedeemHandler( err = resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", tokenRedeemRequestSet.Request_id) - handlePermanentRedemptionError( + return handlePermanentRedemptionError( + ctx, message, err, msg, @@ -360,10 +364,9 @@ func SignedTokenRedeemHandler( int32(avroSchema.RedeemResultStatusError), log, ) - return nil } - err = Emit(producer, resultSetBuffer.Bytes(), log) + err = Emit(ctx, producer, resultSetBuffer.Bytes(), log) if err != nil { message := fmt.Sprintf( "request %s: failed to emit results to topic %s", @@ -392,16 +395,15 @@ func issuerTimeIsNotValid(start *time.Time, end *time.Time) bool { return !bothTimesAreNil } -// avroRedeemErrorResultFromError returns a ProcessingResult that is constructed from the -// provided values. +// avroRedeemErrorResultFromError returns a message to emit that is constructed +// from the provided values. func avroRedeemErrorResultFromError( message string, msg kafka.Message, - producer *kafka.Writer, requestID string, redeemResultStatus int32, logger *zerolog.Logger, -) *ProcessingResult { +) []byte { redeemResult := avroSchema.RedeemResult{ Issuer_name: "", Issuer_cohort: 0, @@ -416,22 +418,15 @@ func avroRedeemErrorResultFromError( err := resultSet.Serialize(&resultSetBuffer) if err != nil { message := fmt.Sprintf("request %s: failed to serialize result set", requestID) - return &ProcessingResult{ - Message: []byte(message), - ResultProducer: producer, - RequestID: requestID, - } - } - return &ProcessingResult{ - Message: resultSetBuffer.Bytes(), - ResultProducer: producer, - RequestID: requestID, + return []byte(message) } + return resultSetBuffer.Bytes() } // handleRedemptionError is a convenience function that executes a call pattern shared // when handling all errors in the redeem flow func handlePermanentRedemptionError( + ctx context.Context, message string, cause error, msg kafka.Message, @@ -439,17 +434,20 @@ func handlePermanentRedemptionError( requestID string, redeemResultStatus int32, logger *zerolog.Logger, -) { +) error { logger.Error().Err(cause).Msgf("encountered permanent redemption failure: %v", message) - processingResult := avroRedeemErrorResultFromError( + toEmit := avroRedeemErrorResultFromError( message, msg, - producer, requestID, int32(avroSchema.RedeemResultStatusError), logger, ) - if err := Emit(producer, processingResult.Message, logger); err != nil { + if err := Emit(ctx, producer, toEmit, logger); err != nil { logger.Error().Err(err).Msg("failed to emit") } + // TODO: consider returning err here as failing to emit error should not + // commit messages the same way as failing to emit a success does not + // commit. + return nil } From 4cf123a7a6fb09ff0e00494e2345d9ee26dadc3b Mon Sep 17 00:00:00 2001 From: Igor Bukanov Date: Tue, 21 May 2024 16:54:34 +0200 Subject: [PATCH 2/2] chore: verbose go test output For convinience of log analysis from GitHub test runs always include the list of run tests to the docker-test output. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 27bdefd0..3bff52b3 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ docker-test: --key-schema AttributeName=id,KeyType=HASH \ --billing-mode PAY_PER_REQUEST \ --table-name redemptions --endpoint-url http://dynamodb:8000 --region us-west-2 ) \ - && go test ./..." + && go test -v ./..." docker-lint: docker-compose -f docker-compose.yml -f docker-compose.dev.yml run --rm -p 2416:2416 challenge-bypass golangci-lint run