Skip to content

Commit

Permalink
Separate aggregate signature verification (#11954)
Browse files Browse the repository at this point in the history
We had batch signature verification logic both in
`attestation_service.go` and `aggregate_and_proof_service.go`.

I made that functionality pretty generic so that both services can now
simply "push" `AggregateVerificationData` into `BatchVerifier`.

`BatchVerifier` will process those signature validations in at most 50ms
or when the aggregated number of signatures are more than 100.

In case of success it runs corresponding generic `func()` and publishes
the existing gossip data into the gossip network. In case of failed
signature validation it simply bans corresponding peer. Gossip data,
func() and peer is already passed in `AggregateVerificationData`for
these processing

---------

Co-authored-by: shota.silagadze <shota.silagadze@taal.com>
  • Loading branch information
shotasilagadze and shotasilagadzetaal authored Sep 14, 2024
1 parent 43f787f commit 614c46f
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 288 deletions.
2 changes: 1 addition & 1 deletion cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
},
}

if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil {
if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation in attestation service", "err", err)
failures = append(failures, poolingFailure{
Index: i,
Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
SignedAggregateAndProof: &cltypes.SignedAggregateAndProof{},
}

if err := obj.SignedAggregateAndProof.DecodeSSZ(data.Data, int(version)); err != nil {
if err := obj.SignedAggregateAndProof.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}
return g.aggregateAndProofService.ProcessMessage(ctx, data.SubnetId, obj)
Expand All @@ -233,7 +233,7 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
Attestation: &solid.Attestation{},
}

if err := obj.Attestation.DecodeSSZ(data.Data, int(version)); err != nil {
if err := obj.Attestation.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}

Expand Down
158 changes: 15 additions & 143 deletions cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/erigontech/erigon-lib/log/v3"

sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon/cl/beacon/synced_data"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
Expand All @@ -41,35 +40,18 @@ import (
"github.com/erigontech/erigon/cl/utils"
)

const (
BatchSignatureVerificationThreshold = 100
)

type aggregateJob struct {
aggregate *cltypes.SignedAggregateAndProofData
creationTime time.Time
}

// each AggregateVerification request has sentinel.SentinelClient and *sentinel.GossipData
// to make sure that we can validate it separately and in case of failure we ban corresponding
// GossipData.Peer or simply run F and publish GossipData in case signature verification succeeds.
type AggregateVerificationData struct {
Signatures [][]byte
SignRoots [][]byte
Pks [][]byte
F func()
GossipData *sentinel.GossipData
}

type aggregateAndProofServiceImpl struct {
syncedDataManager *synced_data.SyncedDataManager
forkchoiceStore forkchoice.ForkChoiceStorage
beaconCfg *clparams.BeaconChainConfig
opPool pool.OperationsPool
test bool
sentinel sentinel.SentinelClient
verifyAndExecute chan *AggregateVerificationData
ctx context.Context
syncedDataManager *synced_data.SyncedDataManager
forkchoiceStore forkchoice.ForkChoiceStorage
beaconCfg *clparams.BeaconChainConfig
opPool pool.OperationsPool
test bool
batchSignatureVerifier *BatchSignatureVerifier

// set of aggregates that are scheduled for later processing
aggregatesScheduledForLaterExecution sync.Map
Expand All @@ -81,131 +63,21 @@ func NewAggregateAndProofService(
forkchoiceStore forkchoice.ForkChoiceStorage,
beaconCfg *clparams.BeaconChainConfig,
opPool pool.OperationsPool,
sentinel sentinel.SentinelClient,
test bool,
batchSignatureVerifier *BatchSignatureVerifier,
) AggregateAndProofService {
a := &aggregateAndProofServiceImpl{
ctx: ctx,
syncedDataManager: syncedDataManager,
forkchoiceStore: forkchoiceStore,
beaconCfg: beaconCfg,
opPool: opPool,
test: test,
sentinel: sentinel,
verifyAndExecute: make(chan *AggregateVerificationData, 128),
syncedDataManager: syncedDataManager,
forkchoiceStore: forkchoiceStore,
beaconCfg: beaconCfg,
opPool: opPool,
test: test,
batchSignatureVerifier: batchSignatureVerifier,
}
go a.loop(ctx)
go a.startBatchSignatureVerification()
return a
}

// When receiving SignedAggregateAndProof, we simply collect all the signature verification data
// and verify them together - running all the final functions afterwards
func (a *aggregateAndProofServiceImpl) startBatchSignatureVerification() {
batchCheckInterval := 3 * time.Second
ticker := time.NewTicker(batchCheckInterval)
aggregateVerificationData := make([]*AggregateVerificationData, 0, 128)
for {
select {
case verification := <-a.verifyAndExecute:
aggregateVerificationData = append(aggregateVerificationData, verification)
// *3 because each AggregateVerificationData contains 3 signatures
if len(aggregateVerificationData)*3 > BatchSignatureVerificationThreshold {
a.processSignatureVerification(aggregateVerificationData)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)
ticker.Reset(batchCheckInterval)
}
case <-ticker.C:
if len(aggregateVerificationData) != 0 {
a.processSignatureVerification(aggregateVerificationData)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)
ticker.Reset(batchCheckInterval)
}
}
}
}

// processSignatureVerification Runs signature verification for all the signatures altogether, if it
// succeeds we publish all accumulated gossip data. If verification fails, start verifying each AggregateVerificationData one by
// one, publish corresponding gossip data if verification succeeds, if not ban the corresponding peer that sent it.
func (a *aggregateAndProofServiceImpl) processSignatureVerification(aggregateVerificationData []*AggregateVerificationData) {
signatures, signRoots, pks, fns :=
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([]func(), 0, 64)

for _, v := range aggregateVerificationData {
signatures, signRoots, pks, fns =
append(signatures, v.Signatures...),
append(signRoots, v.SignRoots...),
append(pks, v.Pks...),
append(fns, v.F)
}
if err := a.runBatchVerification(signatures, signRoots, pks, fns); err != nil {
a.handleIncorrectSignatures(aggregateVerificationData)
log.Warn(err.Error())
return
}

// Everything went well, run corresponding Fs and send all the gossip data to the network
for _, v := range aggregateVerificationData {
v.F()
if v.GossipData != nil {
if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil {
log.Warn("failed publish gossip", "err", err)
}
}
}
}

// we could locate failing signature with binary search but for now let's choose simplicity over optimisation.
func (a *aggregateAndProofServiceImpl) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) {
for _, v := range aggregateVerificationData {
valid, err := bls.VerifyMultipleSignatures(
[][]byte{v.Signatures[0], v.Signatures[1], v.Signatures[2]},
[][]byte{v.SignRoots[0], v.SignRoots[1], v.SignRoots[2]},
[][]byte{v.Pks[0], v.Pks[1], v.Pks[2]})
if err != nil {
log.Warn("aggregate_and_proof_service signature verification failed with the error: " + err.Error())
if v.GossipData != nil && v.GossipData.Peer != nil {
a.sentinel.BanPeer(a.ctx, v.GossipData.Peer)
}
continue
}

if !valid {
log.Warn("aggregate_and_proof_service signature verification failed")
if v.GossipData != nil && v.GossipData.Peer != nil {
a.sentinel.BanPeer(a.ctx, v.GossipData.Peer)
}
continue
}

// run corresponding function and publish the gossip into the network
v.F()

if v.GossipData != nil {
if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil {
log.Warn("failed publish gossip", "err", err)
}
}
}
}

func (a *aggregateAndProofServiceImpl) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error {
valid, err := bls.VerifyMultipleSignatures(signatures, signRoots, pks)
if err != nil {
return errors.New("aggregate_and_proof_service signature verification failed with the error: " + err.Error())
}

if !valid {
return errors.New("aggregate_and_proof_service signature verification failed")
}

return nil
}

func (a *aggregateAndProofServiceImpl) ProcessMessage(
ctx context.Context,
subnet *uint64,
Expand Down Expand Up @@ -312,11 +184,11 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
aggregateVerificationData.GossipData = aggregateAndProof.GossipData

// push the signatures to verify asynchronously and run final functions after that.
a.verifyAndExecute <- aggregateVerificationData
a.batchSignatureVerifier.AddVerification(aggregateVerificationData)

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
// in startBatchSignatureVerification function. After validating signatures, if they are valid we will publish the
// in BatchVerification function. After validating signatures, if they are valid we will publish the
// gossip ourselves or ban the peer which sent that particular invalid signature.
return ErrIgnore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced
forkchoiceMock := mock_services.NewForkChoiceStorageMock(t)
p := pool.OperationsPool{}
p.AttestationsPool = pool.NewOperationPool[libcommon.Bytes96, *solid.Attestation](100, "test")
blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, p, nil, true)
batchSignatureVerifier := NewBatchSignatureVerifier(context.TODO(), nil)
go batchSignatureVerifier.Start()
blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, p, true, batchSignatureVerifier)
return blockService, syncedDataManager, forkchoiceMock
}

Expand Down
Loading

0 comments on commit 614c46f

Please sign in to comment.