From ab5337686403439d829ac326c90642a9f0544ebd Mon Sep 17 00:00:00 2001 From: Ian Suvak Date: Fri, 21 Feb 2025 09:33:35 -0500 Subject: [PATCH] propagate log context from relayer to signature-aggregator --- messages/message_handler.go | 5 ++ .../off-chain-registry/message_handler.go | 4 + messages/teleporter/message_handler.go | 14 +++ relayer/application_relayer.go | 3 + signature-aggregator/aggregator/aggregator.go | 87 ++++++++++--------- .../aggregator/aggregator_test.go | 7 +- signature-aggregator/api/api.go | 1 + 7 files changed, 78 insertions(+), 43 deletions(-) diff --git a/messages/message_handler.go b/messages/message_handler.go index c60cfa94..f671d7d8 100644 --- a/messages/message_handler.go +++ b/messages/message_handler.go @@ -10,6 +10,7 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/warp" "github.com/ava-labs/icm-services/vms" "github.com/ethereum/go-ethereum/common" + "go.uber.org/zap" ) // MessageManager is specific to each message protocol. The interface handles choosing which messages to send @@ -30,6 +31,10 @@ type MessageHandler interface { // returns the transaction hash if the transaction is successful. SendMessage(signedMessage *warp.Message, destinationClient vms.DestinationClient) (common.Hash, error) + // GetLogContext returns extra fields to be set in the logger + // when passing it along to the signature aggregator + GetLogContext(destinationClient vms.DestinationClient) []zap.Field + // GetMessageRoutingInfo returns the source chain ID, origin sender address, // destination chain ID, and destination address. GetMessageRoutingInfo() ( diff --git a/messages/off-chain-registry/message_handler.go b/messages/off-chain-registry/message_handler.go index 9d37934b..20dce0c6 100644 --- a/messages/off-chain-registry/message_handler.go +++ b/messages/off-chain-registry/message_handler.go @@ -216,3 +216,7 @@ func (m *messageHandler) GetMessageRoutingInfo() ( m.factory.registryAddress, nil } + +func (m *messageHandler) GetLogContext(destinationClient vms.DestinationClient) []zap.Field { + return []zap.Field{zap.String("unsignedWarpMessageID", m.unsignedMessage.ID().String())} +} diff --git a/messages/teleporter/message_handler.go b/messages/teleporter/message_handler.go index 0f5b2d48..e4489ca4 100644 --- a/messages/teleporter/message_handler.go +++ b/messages/teleporter/message_handler.go @@ -340,6 +340,20 @@ func (m *messageHandler) SendMessage( return txHash, nil } +func (m *messageHandler) GetLogContext(destinationClient vms.DestinationClient) []zap.Field { + destinationBlockchainID := destinationClient.DestinationBlockchainID() + teleporterMessageID, _ := teleporterUtils.CalculateMessageID( + m.factory.protocolAddress, + m.unsignedMessage.SourceChainID, + destinationBlockchainID, + m.teleporterMessage.MessageNonce, + ) + return []zap.Field{ + zap.String("unsignedWarpMessageID", m.unsignedMessage.ID().String()), + zap.String("teleporterMessageID", teleporterMessageID.String()), + } +} + func (m *messageHandler) waitForReceipt( signedMessage *warp.Message, destinationClient vms.DestinationClient, diff --git a/relayer/application_relayer.go b/relayer/application_relayer.go index ad114b31..add1bffb 100644 --- a/relayer/application_relayer.go +++ b/relayer/application_relayer.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" avalancheWarp "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "github.com/ava-labs/icm-services/database" "github.com/ava-labs/icm-services/messages" "github.com/ava-labs/icm-services/peers" @@ -205,7 +206,9 @@ func (r *ApplicationRelayer) ProcessMessage(handler messages.MessageHandler) (co // sourceWarpSignatureClient is nil iff the source blockchain is configured to fetch signatures via AppRequest if r.sourceWarpSignatureClient == nil { + logContext := handler.GetLogContext(r.destinationClient) signedMessage, err = r.signatureAggregator.CreateSignedMessage( + r.logger.With(logContext...), unsignedMessage, nil, r.signingSubnetID, diff --git a/signature-aggregator/aggregator/aggregator.go b/signature-aggregator/aggregator/aggregator.go index 3a52a460..9c208888 100644 --- a/signature-aggregator/aggregator/aggregator.go +++ b/signature-aggregator/aggregator/aggregator.go @@ -54,7 +54,6 @@ type SignatureAggregator struct { network peers.AppRequestNetwork // protected by subnetsMapLock subnetIDsByBlockchainID map[ids.ID]ids.ID - logger logging.Logger messageCreator message.Creator currentRequestID atomic.Uint32 subnetsMapLock sync.RWMutex @@ -79,7 +78,6 @@ func NewSignatureAggregator( sa := SignatureAggregator{ network: network, subnetIDsByBlockchainID: map[ids.ID]ids.ID{}, - logger: logger, metrics: metrics, currentRequestID: atomic.Uint32{}, cache: cache, @@ -94,6 +92,7 @@ func (s *SignatureAggregator) Shutdown() { } func (s *SignatureAggregator) connectToQuorumValidators( + log logging.Logger, signingSubnet ids.ID, quorumPercentage uint64, ) (*peers.ConnectedCanonicalValidators, error) { @@ -105,7 +104,7 @@ func (s *SignatureAggregator) connectToQuorumValidators( connectedValidators, err = s.network.GetConnectedCanonicalValidators(signingSubnet) if err != nil { msg := "Failed to fetch connected canonical validators" - s.logger.Error( + log.Error( msg, zap.Error(err), ) @@ -123,7 +122,7 @@ func (s *SignatureAggregator) connectToQuorumValidators( connectedValidators.ValidatorSet.TotalWeight, quorumPercentage, ) { - s.logger.Warn( + log.Warn( "Failed to connect to a threshold of stake", zap.Uint64("connectedWeight", connectedValidators.ConnectedWeight), zap.Uint64("totalValidatorWeight", connectedValidators.ValidatorSet.TotalWeight), @@ -134,7 +133,7 @@ func (s *SignatureAggregator) connectToQuorumValidators( } return nil } - err = utils.WithRetriesTimeout(s.logger, connectOp, connectToValidatorsTimeout) + err = utils.WithRetriesTimeout(log, connectOp, connectToValidatorsTimeout) if err != nil { return nil, err } @@ -142,16 +141,17 @@ func (s *SignatureAggregator) connectToQuorumValidators( } func (s *SignatureAggregator) CreateSignedMessage( + log logging.Logger, unsignedMessage *avalancheWarp.UnsignedMessage, justification []byte, inputSigningSubnet ids.ID, quorumPercentage uint64, ) (*avalancheWarp.Message, error) { - s.logger.Debug("Creating signed message", zap.String("warpMessageID", unsignedMessage.ID().String())) + log.Debug("Creating signed message", zap.String("warpMessageID", unsignedMessage.ID().String())) var signingSubnet ids.ID var err error // If signingSubnet is not set we default to the subnet of the source blockchain - sourceSubnet, err := s.getSubnetID(unsignedMessage.SourceChainID) + sourceSubnet, err := s.getSubnetID(log, unsignedMessage.SourceChainID) if err != nil { return nil, fmt.Errorf( "source message subnet not found for chainID %s", @@ -163,15 +163,15 @@ func (s *SignatureAggregator) CreateSignedMessage( } else { signingSubnet = inputSigningSubnet } - s.logger.Debug( + log.Debug( "Creating signed message with signing subnet", zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Stringer("signingSubnet", signingSubnet), ) - connectedValidators, err := s.connectToQuorumValidators(signingSubnet, quorumPercentage) + connectedValidators, err := s.connectToQuorumValidators(log, signingSubnet, quorumPercentage) if err != nil { - s.logger.Error( + log.Error( "Failed to fetch quorum of connected canonical validators", zap.Stringer("signingSubnet", signingSubnet), zap.Error(err), @@ -195,6 +195,7 @@ func (s *SignatureAggregator) CreateSignedMessage( s.metrics.SignatureCacheHits.Add(float64(len(signatureMap))) } if signedMsg, err := s.aggregateIfSufficientWeight( + log, unsignedMessage, signatureMap, accumulatedSignatureWeight, @@ -214,7 +215,7 @@ func (s *SignatureAggregator) CreateSignedMessage( reqBytes, err := s.marshalRequest(unsignedMessage, justification, sourceSubnet) if err != nil { msg := "Failed to marshal request bytes" - s.logger.Error( + log.Error( msg, zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Error(err), @@ -232,7 +233,7 @@ func (s *SignatureAggregator) CreateSignedMessage( ) if err != nil { msg := "Failed to create app request message" - s.logger.Error( + log.Error( msg, zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Error(err), @@ -244,7 +245,7 @@ func (s *SignatureAggregator) CreateSignedMessage( // Query the validators with retries. On each retry, query one node per unique BLS pubkey operation := func() error { responsesExpected := len(connectedValidators.ValidatorSet.Validators) - len(signatureMap) - s.logger.Debug( + log.Debug( "Aggregator collecting signatures from peers.", zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), zap.String("signingSubnetID", signingSubnet.String()), @@ -263,7 +264,7 @@ func (s *SignatureAggregator) CreateSignedMessage( // TODO: Track failures and iterate through the validator's node list on subsequent query attempts nodeID := vdr.NodeIDs[0] vdrSet.Add(nodeID) - s.logger.Debug( + log.Debug( "Added node ID to query.", zap.String("nodeID", nodeID.String()), zap.String("warpMessageID", unsignedMessage.ID().String()), @@ -283,7 +284,7 @@ func (s *SignatureAggregator) CreateSignedMessage( sentTo := s.network.Send(outMsg, vdrSet, sourceSubnet, subnets.NoOpAllower) s.metrics.AppRequestCount.Inc() - s.logger.Debug( + log.Debug( "Sent signature request to network", zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Any("sentTo", sentTo), @@ -293,7 +294,7 @@ func (s *SignatureAggregator) CreateSignedMessage( ) for nodeID := range vdrSet { if !sentTo.Contains(nodeID) { - s.logger.Warn( + log.Warn( "Failed to make async request to node", zap.String("nodeID", nodeID.String()), zap.Error(err), @@ -306,7 +307,7 @@ func (s *SignatureAggregator) CreateSignedMessage( responseCount := 0 if responsesExpected > 0 { for response := range responseChan { - s.logger.Debug( + log.Debug( "Processing response from node", zap.String("nodeID", response.NodeID().String()), zap.String("warpMessageID", unsignedMessage.ID().String()), @@ -314,6 +315,7 @@ func (s *SignatureAggregator) CreateSignedMessage( ) var relevant bool signedMsg, relevant, err = s.handleResponse( + log, response, sentTo, requestID, @@ -336,7 +338,7 @@ func (s *SignatureAggregator) CreateSignedMessage( } // If we have sufficient signatures, return here. if signedMsg != nil { - s.logger.Info( + log.Info( "Created signed message.", zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Uint64("signatureWeight", accumulatedSignatureWeight.Uint64()), @@ -353,9 +355,9 @@ func (s *SignatureAggregator) CreateSignedMessage( return errNotEnoughSignatures } - err = utils.WithRetriesTimeout(s.logger, operation, signatureRequestTimeout) + err = utils.WithRetriesTimeout(log, operation, signatureRequestTimeout) if err != nil { - s.logger.Warn( + log.Warn( "Failed to collect a threshold of signatures", zap.String("warpMessageID", unsignedMessage.ID().String()), zap.Uint64("accumulatedWeight", accumulatedSignatureWeight.Uint64()), @@ -366,14 +368,14 @@ func (s *SignatureAggregator) CreateSignedMessage( return signedMsg, nil } -func (s *SignatureAggregator) getSubnetID(blockchainID ids.ID) (ids.ID, error) { +func (s *SignatureAggregator) getSubnetID(log logging.Logger, blockchainID ids.ID) (ids.ID, error) { s.subnetsMapLock.RLock() subnetID, ok := s.subnetIDsByBlockchainID[blockchainID] s.subnetsMapLock.RUnlock() if ok { return subnetID, nil } - s.logger.Info("Signing subnet not found, requesting from PChain", zap.String("blockchainID", blockchainID.String())) + log.Info("Signing subnet not found, requesting from PChain", zap.String("blockchainID", blockchainID.String())) subnetID, err := s.network.GetSubnetID(blockchainID) if err != nil { return ids.ID{}, fmt.Errorf("source blockchain not found for chain ID %s", blockchainID) @@ -394,6 +396,7 @@ func (s *SignatureAggregator) setSubnetID(blockchainID ids.ID, subnetID ids.ID) // aggregation request. Returns an error only if a non-recoverable error occurs, otherwise returns a nil error // to continue processing responses. func (s *SignatureAggregator) handleResponse( + log logging.Logger, response message.InboundMessage, sentTo set.Set[ids.NodeID], requestID uint32, @@ -411,27 +414,27 @@ func (s *SignatureAggregator) handleResponse( rcvReqID, ok := message.GetRequestID(m) if !ok { // This should never occur, since inbound message validity is already checked by the inbound handler - s.logger.Error("Could not get requestID from message") + log.Error("Could not get requestID from message") return nil, false, nil } nodeID := response.NodeID() if !sentTo.Contains(nodeID) || rcvReqID != requestID { - s.logger.Debug("Skipping irrelevant app response") + log.Debug("Skipping irrelevant app response") return nil, false, nil } // If we receive an AppRequestFailed, then the request timed out. // This is still a relevant response, since we are no longer expecting a response from that node. if response.Op() == message.AppErrorOp { - s.logger.Debug("Request timed out") + log.Debug("Request timed out") s.metrics.ValidatorTimeouts.Inc() return nil, true, nil } validator, vdrIndex := connectedValidators.GetValidator(nodeID) - signature, valid := s.isValidSignatureResponse(unsignedMessage, response, validator.PublicKey) + signature, valid := s.isValidSignatureResponse(log, unsignedMessage, response, validator.PublicKey) if valid { - s.logger.Debug( + log.Debug( "Got valid signature response", zap.String("nodeID", nodeID.String()), zap.Uint64("stakeWeight", validator.Weight), @@ -446,7 +449,7 @@ func (s *SignatureAggregator) handleResponse( ) accumulatedSignatureWeight.Add(accumulatedSignatureWeight, new(big.Int).SetUint64(validator.Weight)) } else { - s.logger.Debug( + log.Debug( "Got invalid signature response", zap.String("nodeID", nodeID.String()), zap.Uint64("stakeWeight", validator.Weight), @@ -458,6 +461,7 @@ func (s *SignatureAggregator) handleResponse( } if signedMsg, err := s.aggregateIfSufficientWeight( + log, unsignedMessage, signatureMap, accumulatedSignatureWeight, @@ -474,6 +478,7 @@ func (s *SignatureAggregator) handleResponse( } func (s *SignatureAggregator) aggregateIfSufficientWeight( + log logging.Logger, unsignedMessage *avalancheWarp.UnsignedMessage, signatureMap map[int][bls.SignatureLen]byte, accumulatedSignatureWeight *big.Int, @@ -489,10 +494,10 @@ func (s *SignatureAggregator) aggregateIfSufficientWeight( // Not enough signatures, continue processing messages return nil, nil } - aggSig, vdrBitSet, err := s.aggregateSignatures(signatureMap) + aggSig, vdrBitSet, err := s.aggregateSignatures(log, signatureMap) if err != nil { msg := "Failed to aggregate signature." - s.logger.Error( + log.Error( msg, zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), zap.String("warpMessageID", unsignedMessage.ID().String()), @@ -510,7 +515,7 @@ func (s *SignatureAggregator) aggregateIfSufficientWeight( ) if err != nil { msg := "Failed to create new signed message" - s.logger.Error( + log.Error( msg, zap.String("sourceBlockchainID", unsignedMessage.SourceChainID.String()), zap.String("warpMessageID", unsignedMessage.ID().String()), @@ -525,13 +530,14 @@ func (s *SignatureAggregator) aggregateIfSufficientWeight( // the signature against the node's public key. If we are unable to generate the signature or verify // correctly, false will be returned to indicate no valid signature was found in response. func (s *SignatureAggregator) isValidSignatureResponse( + log logging.Logger, unsignedMessage *avalancheWarp.UnsignedMessage, response message.InboundMessage, pubKey *bls.PublicKey, ) (blsSignatureBuf, bool) { // If the handler returned an error response, count the response and continue if response.Op() == message.AppErrorOp { - s.logger.Debug( + log.Debug( "Relayer async response failed", zap.String("nodeID", response.NodeID().String()), ) @@ -540,7 +546,7 @@ func (s *SignatureAggregator) isValidSignatureResponse( appResponse, ok := response.Message().(*p2p.AppResponse) if !ok { - s.logger.Debug( + log.Debug( "Relayer async response was not an AppResponse", zap.String("nodeID", response.NodeID().String()), ) @@ -549,7 +555,7 @@ func (s *SignatureAggregator) isValidSignatureResponse( signature, err := s.unmarshalResponse(appResponse.GetAppBytes()) if err != nil { - s.logger.Error( + log.Error( "Error unmarshaling signature response", zap.Error(err), ) @@ -558,7 +564,7 @@ func (s *SignatureAggregator) isValidSignatureResponse( // If the node returned an empty signature, then it has not yet seen the warp message. Retry later. emptySignature := blsSignatureBuf{} if bytes.Equal(signature[:], emptySignature[:]) { - s.logger.Debug( + log.Debug( "Response contained an empty signature", zap.String("nodeID", response.NodeID().String()), ) @@ -566,7 +572,7 @@ func (s *SignatureAggregator) isValidSignatureResponse( } if len(signature) != bls.SignatureLen { - s.logger.Debug( + log.Debug( "Response signature has incorrect length", zap.Int("actual", len(signature)), zap.Int("expected", bls.SignatureLen), @@ -576,14 +582,14 @@ func (s *SignatureAggregator) isValidSignatureResponse( sig, err := bls.SignatureFromBytes(signature[:]) if err != nil { - s.logger.Debug( + log.Debug( "Failed to create signature from response", ) return blsSignatureBuf{}, false } if !bls.Verify(pubKey, sig, unsignedMessage.Bytes()) { - s.logger.Debug( + log.Debug( "Failed verification for signature", zap.String("pubKey", hex.EncodeToString(bls.PublicKeyToUncompressedBytes(pubKey))), ) @@ -597,6 +603,7 @@ func (s *SignatureAggregator) isValidSignatureResponse( // returns a bit set representing the validators that are represented in the aggregate signature. The bit // set is in canonical validator order. func (s *SignatureAggregator) aggregateSignatures( + log logging.Logger, signatureMap map[int][bls.SignatureLen]byte, ) (*bls.Signature, set.Bits, error) { // Aggregate the signatures @@ -607,7 +614,7 @@ func (s *SignatureAggregator) aggregateSignatures( sig, err := bls.SignatureFromBytes(sigBytes[:]) if err != nil { msg := "Failed to unmarshal signature" - s.logger.Error(msg, zap.Error(err)) + log.Error(msg, zap.Error(err)) return nil, set.Bits{}, fmt.Errorf("%s: %w", msg, err) } signatures = append(signatures, sig) @@ -617,7 +624,7 @@ func (s *SignatureAggregator) aggregateSignatures( aggSig, err := bls.AggregateSignatures(signatures) if err != nil { msg := "Failed to aggregate signatures" - s.logger.Error(msg, zap.Error(err)) + log.Error(msg, zap.Error(err)) return nil, set.Bits{}, fmt.Errorf("%s: %w", msg, err) } return aggSig, vdrBitSet, nil diff --git a/signature-aggregator/aggregator/aggregator_test.go b/signature-aggregator/aggregator/aggregator_test.go index 1c3e7ad8..eb68825f 100644 --- a/signature-aggregator/aggregator/aggregator_test.go +++ b/signature-aggregator/aggregator/aggregator_test.go @@ -145,7 +145,7 @@ func TestCreateSignedMessageFailsWithNoValidators(t *testing.T) { }, nil, ) - _, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80) + _, err = aggregator.CreateSignedMessage(logging.NoLog{}, msg, nil, ids.Empty, 80) require.ErrorContains(t, err, "no signatures") } @@ -165,7 +165,7 @@ func TestCreateSignedMessageFailsWithoutSufficientConnectedStake(t *testing.T) { }, nil, ).AnyTimes() - _, err = aggregator.CreateSignedMessage(msg, nil, ids.Empty, 80) + _, err = aggregator.CreateSignedMessage(logging.NoLog{}, msg, nil, ids.Empty, 80) require.ErrorContains( t, err, @@ -245,7 +245,7 @@ func TestCreateSignedMessageRetriesAndFailsWithoutP2PResponses(t *testing.T) { subnets.NoOpAllower, ).AnyTimes() - _, err = aggregator.CreateSignedMessage(msg, nil, subnetID, 80) + _, err = aggregator.CreateSignedMessage(logging.NoLog{}, msg, nil, subnetID, 80) require.ErrorIs( t, err, @@ -330,6 +330,7 @@ func TestCreateSignedMessageSucceeds(t *testing.T) { // aggregate the signatures: var quorumPercentage uint64 = 80 signedMessage, err := aggregator.CreateSignedMessage( + logging.NoLog{}, msg, nil, subnetID, diff --git a/signature-aggregator/api/api.go b/signature-aggregator/api/api.go index ea782817..a727d357 100644 --- a/signature-aggregator/api/api.go +++ b/signature-aggregator/api/api.go @@ -178,6 +178,7 @@ func signatureAggregationAPIHandler( } signedMessage, err := aggregator.CreateSignedMessage( + logger, message, justification, signingSubnetID,