Skip to content

Commit

Permalink
caplin: improvement in signature verification flow (#12000)
Browse files Browse the repository at this point in the history
current proposed block
https://holesky.beaconcha.in/slot/2543846#attestations

- Need to immediately process signature verification for those from http
call (post attestation / post aggregation and proof)
- Separately process signatures from the attestation service and the
aggregateAndProof service.
  • Loading branch information
domiwei authored Sep 17, 2024
1 parent 85e2ac3 commit fa268dc
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 27 deletions.
2 changes: 2 additions & 0 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
Name: gossip.TopicNamePrefixBeaconAttestation,
SubnetId: &subnet,
},
ImmediateProcess: true, // we want to process attestation immediately
}

if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
Expand Down Expand Up @@ -298,6 +299,7 @@ func (a *ApiHandler) PostEthV1ValidatorAggregatesAndProof(w http.ResponseWriter,
if err := a.aggregateAndProofsService.ProcessMessage(r.Context(), nil, &cltypes.SignedAggregateAndProofData{
SignedAggregateAndProof: v,
GossipData: gossipData,
ImmediateProcess: true, // we want to process aggregate and proof immediately
}); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process bls-change", "err", err)
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
Expand Down
1 change: 1 addition & 0 deletions cl/cltypes/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (a *AggregateAndProof) HashSSZ() ([32]byte, error) {
type SignedAggregateAndProofData struct {
SignedAggregateAndProof *SignedAggregateAndProof
GossipData *sentinel.GossipData
ImmediateProcess bool
}

type SignedAggregateAndProof struct {
Expand Down
5 changes: 3 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg)
case gossip.IsTopicBeaconAttestation(data.Name):
obj := &services.AttestationWithGossipData{
GossipData: copyOfSentinelData(data),
Attestation: &solid.Attestation{},
GossipData: copyOfSentinelData(data),
Attestation: &solid.Attestation{},
ImmediateProcess: false,
}

if err := obj.Attestation.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion cl/phase1/network/services/aggregate_and_proof_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,12 @@ func (a *aggregateAndProofServiceImpl) ProcessMessage(
// for this specific request, collect data for potential peer banning or gossip publishing
aggregateVerificationData.GossipData = aggregateAndProof.GossipData

if aggregateAndProof.ImmediateProcess {
return a.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData)
}

// push the signatures to verify asynchronously and run final functions after that.
a.batchSignatureVerifier.AddVerification(aggregateVerificationData)
a.batchSignatureVerifier.AsyncVerifyAggregateProof(aggregateVerificationData)

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
Expand Down
8 changes: 7 additions & 1 deletion cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type attestationService struct {
type AttestationWithGossipData struct {
Attestation *solid.Attestation
GossipData *sentinel.GossipData
// ImmediateProcess indicates whether the attestation should be processed immediately or able to be scheduled for later processing.
ImmediateProcess bool
}

func NewAttestationService(
Expand Down Expand Up @@ -239,8 +241,12 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
},
}

if att.ImmediateProcess {
return s.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData)
}

// push the signatures to verify asynchronously and run final functions after that.
s.batchSignatureVerifier.AddVerification(aggregateVerificationData)
s.batchSignatureVerifier.AsyncVerifyAttestation(aggregateVerificationData)

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
Expand Down
66 changes: 43 additions & 23 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

const (
BatchSignatureVerificationThreshold = 300
batchSignatureVerificationThreshold = 300
reservedSize = 512
)

var (
Expand All @@ -20,9 +21,10 @@ var (
)

type BatchSignatureVerifier struct {
sentinel sentinel.SentinelClient
verifyAndExecute chan *AggregateVerificationData
ctx context.Context
sentinel sentinel.SentinelClient
attVerifyAndExecute chan *AggregateVerificationData
aggregateProofVerify chan *AggregateVerificationData
ctx context.Context
}

// each AggregateVerification request has sentinel.SentinelClient and *sentinel.GossipData
Expand All @@ -38,54 +40,70 @@ type AggregateVerificationData struct {

func NewBatchSignatureVerifier(ctx context.Context, sentinel sentinel.SentinelClient) *BatchSignatureVerifier {
return &BatchSignatureVerifier{
ctx: ctx,
sentinel: sentinel,
verifyAndExecute: make(chan *AggregateVerificationData, 128),
ctx: ctx,
sentinel: sentinel,
attVerifyAndExecute: make(chan *AggregateVerificationData, 1024),
aggregateProofVerify: make(chan *AggregateVerificationData, 1024),
}
}

// AddVerification schedules new verification
func (b *BatchSignatureVerifier) AddVerification(aggregateVerificationData *AggregateVerificationData) {
b.verifyAndExecute <- aggregateVerificationData
// AsyncVerifyAttestation schedules new verification
func (b *BatchSignatureVerifier) AsyncVerifyAttestation(data *AggregateVerificationData) {
b.attVerifyAndExecute <- data
}

func (b *BatchSignatureVerifier) AsyncVerifyAggregateProof(data *AggregateVerificationData) {
b.aggregateProofVerify <- data
}

func (b *BatchSignatureVerifier) ImmediateVerification(data *AggregateVerificationData) error {
return b.processSignatureVerification([]*AggregateVerificationData{data})
}

func (b *BatchSignatureVerifier) Start() {
// separate goroutines for each type of verification
go b.start(b.attVerifyAndExecute)
go b.start(b.aggregateProofVerify)
}

// When receiving AggregateVerificationData, we simply collect all the signature verification data
// and verify them together - running all the final functions afterwards
func (b *BatchSignatureVerifier) Start() {
func (b *BatchSignatureVerifier) start(incoming chan *AggregateVerificationData) {
ticker := time.NewTicker(batchCheckInterval)
defer ticker.Stop()
aggregateVerificationData := make([]*AggregateVerificationData, 0, 128)
aggregateVerificationData := make([]*AggregateVerificationData, 0, reservedSize)
for {
select {
case <-b.ctx.Done():
return
case verification := <-b.verifyAndExecute:
case verification := <-incoming:
aggregateVerificationData = append(aggregateVerificationData, verification)
if len(aggregateVerificationData) > BatchSignatureVerificationThreshold {
if len(aggregateVerificationData) >= batchSignatureVerificationThreshold {
b.processSignatureVerification(aggregateVerificationData)
ticker.Reset(batchCheckInterval)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)
// clear the slice
aggregateVerificationData = make([]*AggregateVerificationData, 0, reservedSize)
}
case <-ticker.C:
if len(aggregateVerificationData) == 0 {
continue
}
b.processSignatureVerification(aggregateVerificationData)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)

// clear the slice
aggregateVerificationData = make([]*AggregateVerificationData, 0, reservedSize)
}
}
}

// processSignatureVerification Runs signature verification for all the signatures altogether, if it
// succeeds we publish all accumulated gossip data. If verification fails, start verifying each AggregateVerificationData one by
// one, publish corresponding gossip data if verification succeeds, if not ban the corresponding peer that sent it.
func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificationData []*AggregateVerificationData) {
func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificationData []*AggregateVerificationData) error {
signatures, signRoots, pks, fns :=
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([]func(), 0, 64)
make([][]byte, 0, reservedSize),
make([][]byte, 0, reservedSize),
make([][]byte, 0, reservedSize),
make([]func(), 0, reservedSize)

for _, v := range aggregateVerificationData {
signatures, signRoots, pks, fns =
Expand All @@ -97,7 +115,7 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat
if err := b.runBatchVerification(signatures, signRoots, pks, fns); err != nil {
b.handleIncorrectSignatures(aggregateVerificationData)
log.Warn(err.Error())
return
return err
}

// Everything went well, run corresponding Fs and send all the gossip data to the network
Expand All @@ -106,9 +124,11 @@ func (b *BatchSignatureVerifier) processSignatureVerification(aggregateVerificat
if b.sentinel != nil && v.GossipData != nil {
if _, err := b.sentinel.PublishGossip(b.ctx, v.GossipData); err != nil {
log.Warn("failed publish gossip", "err", err)
return err
}
}
}
return nil
}

// we could locate failing signature with binary search but for now let's choose simplicity over optimisation.
Expand Down

0 comments on commit fa268dc

Please sign in to comment.