Skip to content

Commit

Permalink
Merge branch 'main' into specify-block-height-exit
Browse files Browse the repository at this point in the history
  • Loading branch information
cam-schultz committed Jan 17, 2024
2 parents 03fd535 + 4776c97 commit 58defa5
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 161 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MessageProtocolConfig struct {
MessageFormat string `mapstructure:"message-format" json:"message-format"`
Settings map[string]interface{} `mapstructure:"settings" json:"settings"`
}

type SourceSubnet struct {
SubnetID string `mapstructure:"subnet-id" json:"subnet-id"`
BlockchainID string `mapstructure:"blockchain-id" json:"blockchain-id"`
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ require (
go.opentelemetry.io/otel/sdk v1.11.0 // indirect
go.opentelemetry.io/otel/trace v1.11.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.5.0
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ github.com/ava-labs/coreth v0.12.9-rc.9 h1:mvYxABdyPByXwwwIxnTBCiNO23dsE1Kfnd5H1
github.com/ava-labs/coreth v0.12.9-rc.9/go.mod h1:yrf2vEah4Fgj6sJ4UpHewo4DLolwdpf2bJuLRT80PGw=
github.com/ava-labs/subnet-evm v0.5.10 h1:ed9BxoiuXRnB/qKakKzYKtZzV/gVjOB2LxuDegpLs9g=
github.com/ava-labs/subnet-evm v0.5.10/go.mod h1:wln8B4siQ1Osch+elW9vW1XJGjj5PYxQETkzFyDEMjk=
github.com/ava-labs/teleporter v0.0.0-20231221165433-826fa59bed3c h1:vnMlfP4SHFoatRufgUma/eGwvVzWdwMo17ADdrh6YYQ=
github.com/ava-labs/teleporter v0.0.0-20231221165433-826fa59bed3c/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/ava-labs/teleporter v0.0.0-20240104215757-839006a992f2 h1:gqO87g7c6Gy5ZjEzz/oQ3KgnGqvdy73hypnQ+gvmTbQ=
github.com/ava-labs/teleporter v0.0.0-20240104215757-839006a992f2/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/ava-labs/teleporter v0.0.0-20240105220309-160c7b8bce4b h1:tBUZfmBqdAykjTcL771SZcE5iQ5kgRZ6nz9Q/VQNTK0=
github.com/ava-labs/teleporter v0.0.0-20240105220309-160c7b8bce4b/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/ava-labs/teleporter v0.0.0-20240105221051-581342d9f521 h1:rHbDNvhen/qDQyOuFkdkQ91MjPSNOcmbrUtaTEjriH8=
github.com/ava-labs/teleporter v0.0.0-20240105221051-581342d9f521/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/ava-labs/teleporter v0.0.0-20240108172200-f03f526e5312 h1:rG9xkvCXRU4FBi1IBIXPxnTwxRv7mM6j0PX7FQss32g=
github.com/ava-labs/teleporter v0.0.0-20240108172200-f03f526e5312/go.mod h1:qeclhkPTO4R2McXNrXXca4JmiRSgQ0gJ0KtJWzQGGPE=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down Expand Up @@ -621,6 +613,8 @@ go.opentelemetry.io/otel/trace v1.11.0/go.mod h1:nyYjis9jy0gytE9LXGU+/m1sHTKbRY0
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
Expand Down
115 changes: 45 additions & 70 deletions main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ package main

import (
"context"
"encoding/hex"
"fmt"
"log"
"net/http"
"os"
"sync"
"sync/atomic"

"github.com/alexliesenfeld/health"
"github.com/ava-labs/avalanchego/api/metrics"
Expand All @@ -27,7 +24,9 @@ import (
"github.com/ava-labs/awm-relayer/vms"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const (
Expand Down Expand Up @@ -111,16 +110,23 @@ func main() {
return
}

// Create a health check server that polls a single atomic bool, settable by any relayer goroutine on failure
healthy := atomic.Bool{}
healthy.Store(true)
// Each goroutine will have an atomic bool that it can set to false if it ever disconnects from its subscription.
relayerHealth := make(map[ids.ID]*atomic.Bool)

checker := health.NewChecker(
health.WithCheck(health.Check{
Name: "relayers-all",
Check: func(context.Context) error {
if !healthy.Load() {
return fmt.Errorf("relayer is unhealthy")
// Store the IDs as the cb58 encoding
var unhealthyRelayers []string
for id, health := range relayerHealth {
if !health.Load() {
unhealthyRelayers = append(unhealthyRelayers, id.String())
}
}

if len(unhealthyRelayers) > 0 {
return fmt.Errorf("relayers are unhealthy for blockchains %v", unhealthyRelayers)
}
return nil
},
Expand Down Expand Up @@ -159,8 +165,7 @@ func main() {
}

// Create relayers for each of the subnets configured as a source
// On exit, the relayer goroutine will set the healthy flag to false
var wg sync.WaitGroup
errGroup, ctx := errgroup.WithContext(context.Background())
for _, s := range cfg.SourceSubnets {
blockchainID, err := ids.FromString(s.BlockchainID)
if err != nil {
Expand All @@ -170,14 +175,16 @@ func main() {
)
return
}
wg.Add(1)
subnetInfo := s
go func() {
defer func() {
wg.Done()
healthy.Store(false)
}()
runRelayer(

health := atomic.NewBool(true)
relayerHealth[blockchainID] = health

// errgroup will cancel the context when the first goroutine returns an error
errGroup.Go(func() error {
// runRelayer runs until it errors or the context is cancelled by another goroutine
return runRelayer(
ctx,
logger,
metrics,
db,
Expand All @@ -188,18 +195,21 @@ func main() {
destinationClients,
messageCreator,
cfg.ProcessMissedBlocks,
health,
)
logger.Info(
"Relayer exiting.",
zap.String("blockchainID", blockchainID.String()),
)
}()
})
}
wg.Wait()
err = errGroup.Wait()
logger.Error(
"Relayer exiting.",
zap.Error(err),
)
}

// runRelayer creates a relayer instance for a subnet. It listens for warp messages on that subnet, and handles delivery to the destination
func runRelayer(logger logging.Logger,
func runRelayer(
ctx context.Context,
logger logging.Logger,
metrics *relayer.MessageRelayerMetrics,
db database.RelayerDatabase,
sourceSubnetInfo config.SourceSubnet,
Expand All @@ -208,73 +218,38 @@ func runRelayer(logger logging.Logger,
responseChan chan message.InboundMessage,
destinationClients map[ids.ID]vms.DestinationClient,
messageCreator message.Creator,
processMissedBlocks bool,
) {
shouldProcessMissedBlocks bool,
relayerHealth *atomic.Bool,
) error {
logger.Info(
"Creating relayer",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
)

relayer, subscriber, err := relayer.NewRelayer(
relayer, err := relayer.NewRelayer(
logger,
metrics,
db,
sourceSubnetInfo,
pChainClient,
network,
responseChan,
destinationClients,
processMissedBlocks,
messageCreator,
shouldProcessMissedBlocks,
relayerHealth,
)
if err != nil {
logger.Error(
"Failed to create relayer instance",
zap.Error(err),
)
return
return fmt.Errorf("Failed to create relayer instance: %w", err)
}
logger.Info(
"Created relayer. Listening for messages to relay.",
zap.String("blockchainID", sourceSubnetInfo.BlockchainID),
)

// Wait for logs from the subscribed node
for {
select {
case txLog := <-subscriber.Logs():
logger.Info(
"Handling Teleporter submit message log.",
zap.String("txId", hex.EncodeToString(txLog.SourceTxID)),
zap.String("originChainId", sourceSubnetInfo.BlockchainID),
zap.String("sourceAddress", txLog.SourceAddress.String()),
)

// Relay the message to the destination chain. Continue on failure.
err = relayer.RelayMessage(&txLog, metrics, messageCreator)
if err != nil {
logger.Error(
"Error relaying message",
zap.String("originChainID", sourceSubnetInfo.BlockchainID),
zap.Error(err),
)
continue
}
case err := <-subscriber.Err():
logger.Error(
"Received error from subscribed node",
zap.String("originChainID", sourceSubnetInfo.BlockchainID),
zap.Error(err),
)
err = subscriber.Subscribe()
if err != nil {
logger.Error(
"Failed to resubscribe to node. Relayer goroutine exiting.",
zap.String("originChainID", sourceSubnetInfo.BlockchainID),
zap.Error(err),
)
return
}
}
}
// Will only return on error or context cancellation
return relayer.ProcessLogs(ctx)
}

func startMetricsServer(logger logging.Logger, gatherer prometheus.Gatherer, port uint32) {
Expand Down
Loading

0 comments on commit 58defa5

Please sign in to comment.