From 6de5daaf62b023681137158246d9a29360e73f04 Mon Sep 17 00:00:00 2001 From: Hristiyan Mitov <67628947+hMitov@users.noreply.github.com> Date: Mon, 3 Feb 2025 11:33:35 +0200 Subject: [PATCH] 738 handle grandpa messages in new class (#739) # Description - Introduced new GrandpaMessageHandler class, where all the messages, received from GrandpaEngine will be passed. - It will handle the grandpa messages. - Extracted logic, related to grandpa messages from WarpSyncState to GrandpaMessageHandler. - Fixed tests. Fixes #738 > --------- Co-authored-by: Hristiyan Mitov Co-authored-by: Georgi Grigorov <72125420+Grigorov-Georgi@users.noreply.github.com> Co-authored-by: osrib --- .../grandpa/state/GrandpaSetState.java | 145 -------- .../protocol/grandpa/GrandpaController.java | 2 +- .../protocol/grandpa/GrandpaEngine.java | 24 +- .../grandpa/GrandpaMessageHandler.java | 316 ++++++++++++++++++ .../limechain/rpc/config/CommonConfig.java | 5 +- .../sync/warpsync/WarpSyncState.java | 142 +------- .../action/VerifyJustificationAction.java | 11 +- .../protocol/grandpa/GrandpaEngineTest.java | 21 +- 8 files changed, 354 insertions(+), 312 deletions(-) create mode 100644 src/main/java/com/limechain/network/protocol/grandpa/GrandpaMessageHandler.java diff --git a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java index d006da162..69c2ad538 100644 --- a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java +++ b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java @@ -3,25 +3,14 @@ import com.limechain.ServiceConsensusState; import com.limechain.chain.lightsyncstate.Authority; import com.limechain.chain.lightsyncstate.LightSyncState; -import com.limechain.exception.grandpa.GrandpaGenericException; import com.limechain.grandpa.round.GrandpaRound; import com.limechain.grandpa.round.RoundCache; import com.limechain.grandpa.vote.SignedVote; -import com.limechain.grandpa.vote.SubRound; import com.limechain.grandpa.vote.Vote; import com.limechain.network.PeerMessageCoordinator; -import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; -import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage; -import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; -import com.limechain.network.protocol.grandpa.messages.vote.FullVote; -import com.limechain.network.protocol.grandpa.messages.vote.FullVoteScaleWriter; -import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage; -import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.runtime.Runtime; -import com.limechain.runtime.hostapi.dto.Key; -import com.limechain.runtime.hostapi.dto.VerifySignature; import com.limechain.state.AbstractState; import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; @@ -32,10 +21,7 @@ import com.limechain.sync.warpsync.dto.AuthoritySetChange; import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange; import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange; -import com.limechain.utils.Ed25519Utils; -import com.limechain.utils.scale.ScaleUtils; import io.emeraldpay.polkaj.types.Hash256; -import io.libp2p.core.PeerId; import io.libp2p.core.crypto.PubKey; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -51,8 +37,6 @@ import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; -import java.util.Set; -import java.util.function.Supplier; import java.util.logging.Level; /** @@ -68,7 +52,6 @@ public class GrandpaSetState extends AbstractState implements ServiceConsensusState { private static final BigInteger THRESHOLD_DENOMINATOR = BigInteger.valueOf(3); - private static final BigInteger CATCH_UP_THRESHOLD = BigInteger.TWO; private List authorities; private BigInteger disabledAuthority; @@ -253,62 +236,6 @@ public void handleGrandpaConsensusMessage(GrandpaConsensusMessage consensusMessa log.fine(String.format("Updated grandpa set config: %s", consensusMessage.getFormat().toString())); } - public void handleVoteMessage(VoteMessage voteMessage) { - BigInteger voteMessageSetId = voteMessage.getSetId(); - BigInteger voteMessageRoundNumber = voteMessage.getRound(); - SignedMessage signedMessage = voteMessage.getMessage(); - - SignedVote signedVote = new SignedVote(); - signedVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber())); - signedVote.setSignature(signedMessage.getSignature()); - signedVote.setAuthorityPublicKey(signedMessage.getAuthorityPublicKey()); - - if (!isValidMessageSignature(voteMessage)) { - log.warning( - String.format("Vote message signature is invalid for round %s, set %s, block hash %s, block number %s", - voteMessageSetId, voteMessageSetId, signedMessage.getBlockHash(), signedMessage.getBlockNumber())); - return; - } - - GrandpaRound round = roundCache.getRound(voteMessageSetId, voteMessageRoundNumber); - if (round == null) { - round = new GrandpaRound(); - round.setRoundNumber(voteMessageRoundNumber); - roundCache.addRound(voteMessageSetId, round); - } - - SubRound subround = signedMessage.getStage(); - switch (subround) { - case PRE_VOTE -> round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote); - case PRE_COMMIT -> round.getPreCommits().put(signedMessage.getAuthorityPublicKey(), signedVote); - case PRIMARY_PROPOSAL -> { - round.setPrimaryVote(signedVote); - round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote); - } - default -> throw new GrandpaGenericException("Unknown subround: " + subround); - } - } - - private boolean isValidMessageSignature(VoteMessage voteMessage) { - SignedMessage signedMessage = voteMessage.getMessage(); - - FullVote fullVote = new FullVote(); - fullVote.setRound(voteMessage.getRound()); - fullVote.setSetId(voteMessage.getSetId()); - fullVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber())); - fullVote.setStage(signedMessage.getStage()); - - byte[] encodedFullVote = ScaleUtils.Encode.encode(FullVoteScaleWriter.getInstance(), fullVote); - - VerifySignature verifySignature = new VerifySignature( - signedMessage.getSignature().getBytes(), - encodedFullVote, - signedMessage.getAuthorityPublicKey().getBytes(), - Key.ED25519); - - return Ed25519Utils.verifySignature(verifySignature); - } - private void updateAuthorityStatus() { Optional> keyPair = authorities.stream() .map(a -> keyStore.getKeyPair(KeyType.GRANDPA, a.getPublicKey())) @@ -318,76 +245,4 @@ private void updateAuthorityStatus() { keyPair.ifPresentOrElse(AbstractState::setAuthorityStatus, AbstractState::clearAuthorityStatus); } - - public void initiateAndSendCatchUpRequest(NeighbourMessage neighbourMessage, PeerId peerId) { - // If peer has the same voter set id - if (neighbourMessage.getSetId().equals(setId)) { - - // Check if needed to catch-up peer - if (neighbourMessage.getRound().compareTo( - fetchLatestRound().getRoundNumber().add(CATCH_UP_THRESHOLD)) >= 0) { - log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead."); - - CatchUpReqMessage catchUpReqMessage = CatchUpReqMessage.builder() - .round(neighbourMessage.getRound()) - .setId(neighbourMessage.getSetId()).build(); - - messageCoordinator.sendCatchUpRequestToPeer(peerId, catchUpReqMessage); - } - } - } - - public void initiateAndSendCatchUpResponse(PeerId peerId, - CatchUpReqMessage catchUpReqMessage, - Supplier> peerIds) { - - if (!peerIds.get().contains(peerId)) { - throw new GrandpaGenericException("Requesting catching up from a non-peer."); - } - - if (!catchUpReqMessage.getSetId().equals(setId)) { - throw new GrandpaGenericException("Catch up message has a different setId."); - } - - if (catchUpReqMessage.getRound().compareTo(fetchLatestRound().getRoundNumber()) > 0) { - throw new GrandpaGenericException("Catching up on a round in the future."); - } - - GrandpaRound selectedGrandpaRound = selectRound(catchUpReqMessage.getRound(), catchUpReqMessage.getSetId()) - .orElseThrow(() -> new GrandpaGenericException("Target round was no found.")); - - SignedVote[] preCommits = selectedGrandpaRound.getPreCommits().values().toArray(SignedVote[]::new); - SignedVote[] preVotes = selectedGrandpaRound.getPreVotes().values().toArray(SignedVote[]::new); - - BlockHeader finalizedBlockHeader = selectedGrandpaRound.getFinalizedBlock(); - - CatchUpResMessage catchUpResMessage = CatchUpResMessage.builder() - .round(selectedGrandpaRound.getRoundNumber()) - .setId(setId) - .preCommits(preCommits) - .preVotes(preVotes) - .blockHash(finalizedBlockHeader.getHash()) - .blockNumber(finalizedBlockHeader.getBlockNumber()) - .build(); - - messageCoordinator.sendCatchUpResponseToPeer(peerId, catchUpResMessage); - } - - private Optional selectRound(BigInteger peerRoundNumber, BigInteger peerSetId) { - GrandpaRound round = roundCache.getLatestRound(setId); - - while (round != null) { - // Round found - // Check voter set - if (round.getRoundNumber().equals(peerRoundNumber)) { - if (round.getCommitMessagesArchive().getFirst().getSetId().equals(peerSetId)) { - break; - } - } - // Go to the previous round - round = round.getPrevious(); - } - - return Optional.ofNullable(round); - } } \ No newline at end of file diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java index 422ede4c1..0919df059 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java @@ -52,6 +52,6 @@ public void sendCatchUpResponse(byte[] encodedCatchUpResMessage) { * Sends a vote message over the controller stream. */ public void sendVoteMessage(byte[] encodedVoteMessage) { - engine.writeCommitMessage(stream, encodedVoteMessage); + engine.writeVoteMessage(stream, encodedVoteMessage); } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java index 9d750e054..46c1850dc 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java @@ -2,7 +2,6 @@ import com.limechain.exception.scale.ScaleEncodingException; import com.limechain.grandpa.GrandpaService; -import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import com.limechain.network.protocol.grandpa.messages.GrandpaMessageType; @@ -41,19 +40,14 @@ @AllArgsConstructor(access = AccessLevel.PROTECTED) public class GrandpaEngine { private static final int HANDSHAKE_LENGTH = 1; - - private final WarpSyncState warpSyncState; - protected ConnectionManager connectionManager; protected BlockAnnounceHandshakeBuilder handshakeBuilder; - protected GrandpaSetState grandpaSetState; + protected GrandpaMessageHandler grandpaMessageHandler; public GrandpaEngine() { - warpSyncState = AppBean.getBean(WarpSyncState.class); - connectionManager = ConnectionManager.getInstance(); handshakeBuilder = new BlockAnnounceHandshakeBuilder(); - grandpaSetState = AppBean.getBean(GrandpaSetState.class); + grandpaMessageHandler = AppBean.getBean(GrandpaMessageHandler.class); } /** @@ -147,25 +141,25 @@ private void handleNeighbourMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); NeighbourMessage neighbourMessage = reader.read(NeighbourMessageScaleReader.getInstance()); log.log(Level.FINE, "Received neighbour message from Peer " + peerId + "\n" + neighbourMessage); - new Thread(() -> warpSyncState.syncNeighbourMessage(neighbourMessage, peerId)).start(); + new Thread(() -> grandpaMessageHandler.handleNeighbourMessage(neighbourMessage, peerId)).start(); if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) { - grandpaSetState.initiateAndSendCatchUpRequest(neighbourMessage, peerId); + grandpaMessageHandler.initiateAndSendCatchUpRequest(neighbourMessage, peerId); } } private void handleVoteMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); VoteMessage voteMessage = reader.read(VoteMessageScaleReader.getInstance()); - grandpaSetState.handleVoteMessage(voteMessage); - //todo: handle vote message (authoring node responsibility?) log.log(Level.INFO, "Received vote message from Peer " + peerId + "\n" + voteMessage); + //Maybe we need to add possible roundNumber check + grandpaMessageHandler.handleVoteMessage(voteMessage); } private void handleCommitMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); CommitMessage commitMessage = reader.read(CommitMessageScaleReader.getInstance()); - warpSyncState.syncCommit(commitMessage, peerId); + grandpaMessageHandler.handleCommitMessage(commitMessage, peerId); } private void handleCatchupRequestMessage(byte[] message, PeerId peerId) { @@ -174,7 +168,7 @@ private void handleCatchupRequestMessage(byte[] message, PeerId peerId) { log.log(Level.INFO, "Received catch up request message from Peer " + peerId + "\n" + catchUpReqMessage); if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) { - grandpaSetState.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds); + grandpaMessageHandler.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds); } } @@ -254,7 +248,7 @@ public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage) /** * Send our GRANDPA vote message from {@link GrandpaService} on a given responder stream. * - * @param stream responder stream to write the message to + * @param stream responder stream to write the message to * @param encodedVoteMessage scale encoded VoteMessage object */ public void writeVoteMessage(Stream stream, byte[] encodedVoteMessage) { diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaMessageHandler.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaMessageHandler.java new file mode 100644 index 000000000..1afcdff32 --- /dev/null +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaMessageHandler.java @@ -0,0 +1,316 @@ +package com.limechain.network.protocol.grandpa; + +import com.limechain.exception.grandpa.GrandpaGenericException; +import com.limechain.grandpa.round.GrandpaRound; +import com.limechain.grandpa.round.RoundCache; +import com.limechain.grandpa.state.GrandpaSetState; +import com.limechain.grandpa.vote.SignedVote; +import com.limechain.grandpa.vote.SubRound; +import com.limechain.grandpa.vote.Vote; +import com.limechain.network.PeerMessageCoordinator; +import com.limechain.network.PeerRequester; +import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; +import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; +import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; +import com.limechain.network.protocol.grandpa.messages.vote.FullVote; +import com.limechain.network.protocol.grandpa.messages.vote.FullVoteScaleWriter; +import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage; +import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage; +import com.limechain.network.protocol.sync.BlockRequestField; +import com.limechain.network.protocol.sync.pb.SyncMessage; +import com.limechain.network.protocol.warp.DigestHelper; +import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.network.protocol.warp.dto.Justification; +import com.limechain.network.protocol.warp.scale.reader.BlockHeaderReader; +import com.limechain.network.protocol.warp.scale.reader.JustificationReader; +import com.limechain.runtime.hostapi.dto.Key; +import com.limechain.runtime.hostapi.dto.VerifySignature; +import com.limechain.state.AbstractState; +import com.limechain.state.StateManager; +import com.limechain.sync.JustificationVerifier; +import com.limechain.sync.state.SyncState; +import com.limechain.sync.warpsync.WarpSyncState; +import com.limechain.utils.Ed25519Utils; +import com.limechain.utils.scale.ScaleUtils; +import io.emeraldpay.polkaj.scale.ScaleCodecReader; +import io.libp2p.core.PeerId; +import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; +import org.springframework.stereotype.Component; + +import java.math.BigInteger; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.logging.Level; + +@Log +@RequiredArgsConstructor +@Component +public class GrandpaMessageHandler { + private static final BigInteger CATCH_UP_THRESHOLD = BigInteger.TWO; + + private final StateManager stateManager; + private final PeerMessageCoordinator messageCoordinator; + private final WarpSyncState warpSyncState; + private final PeerRequester requester; + + /** + * Handles a vote message, extracts signed vote, associates it with the correct round. + * Depending on the subround type, the vote is added to pre-votes, pre-commits, or marked as the primary proposal. + * + * @param voteMessage received vote message. + */ + public void handleVoteMessage(VoteMessage voteMessage) { + RoundCache roundCache = stateManager.getGrandpaSetState().getRoundCache(); + BigInteger voteMessageSetId = voteMessage.getSetId(); + BigInteger voteMessageRoundNumber = voteMessage.getRound(); + SignedMessage signedMessage = voteMessage.getMessage(); + + SignedVote signedVote = new SignedVote(); + signedVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber())); + signedVote.setSignature(signedMessage.getSignature()); + signedVote.setAuthorityPublicKey(signedMessage.getAuthorityPublicKey()); + + if (!isValidMessageSignature(voteMessage)) { + log.warning( + String.format("Vote message signature is invalid for round %s, set %s, block hash %s, block number %s", + voteMessageSetId, voteMessageSetId, signedMessage.getBlockHash(), signedMessage.getBlockNumber())); + return; + } + + GrandpaRound round = roundCache.getRound(voteMessageSetId, voteMessageRoundNumber); + if (round == null) { + round = new GrandpaRound(); + round.setRoundNumber(voteMessageRoundNumber); + roundCache.addRound(voteMessageSetId, round); + } + + SubRound subround = signedMessage.getStage(); + switch (subround) { + case PRE_VOTE -> round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote); + case PRE_COMMIT -> round.getPreCommits().put(signedMessage.getAuthorityPublicKey(), signedVote); + case PRIMARY_PROPOSAL -> { + round.setPrimaryVote(signedVote); + round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote); + } + default -> throw new GrandpaGenericException("Unknown subround: " + subround); + } + } + + private boolean isValidMessageSignature(VoteMessage voteMessage) { + SignedMessage signedMessage = voteMessage.getMessage(); + + FullVote fullVote = new FullVote(); + fullVote.setRound(voteMessage.getRound()); + fullVote.setSetId(voteMessage.getSetId()); + fullVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber())); + fullVote.setStage(signedMessage.getStage()); + + byte[] encodedFullVote = ScaleUtils.Encode.encode(FullVoteScaleWriter.getInstance(), fullVote); + + VerifySignature verifySignature = new VerifySignature( + signedMessage.getSignature().getBytes(), + encodedFullVote, + signedMessage.getAuthorityPublicKey().getBytes(), + Key.ED25519); + + return Ed25519Utils.verifySignature(verifySignature); + } + + /** + * Updates the Host's state with information from a commit message. + * Synchronized to avoid race condition between checking and updating latest block + * Scheduled runtime updates for synchronized blocks are executed. + * + * @param commitMessage received commit message + * @param peerId sender of the message + */ + public synchronized void handleCommitMessage(CommitMessage commitMessage, PeerId peerId) { + if (commitMessage.getVote().getBlockNumber().compareTo( + stateManager.getSyncState().getLastFinalizedBlockNumber()) <= 0) { + log.log(Level.FINE, String.format("Received commit message for finalized block %d from peer %s", + commitMessage.getVote().getBlockNumber(), peerId)); + return; + } + + log.log(Level.FINE, "Received commit message from peer " + peerId + + " for block #" + commitMessage.getVote().getBlockNumber() + + " with hash " + commitMessage.getVote().getBlockHash() + + " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber() + + " with " + commitMessage.getPreCommits().length + " voters"); + + boolean verified = JustificationVerifier.verify(Justification.fromCommitMessage(commitMessage)); + if (!verified) { + log.log(Level.WARNING, "Could not verify commit from peer: " + peerId); + return; + } + + GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); + Optional.ofNullable(grandpaSetState.getRoundCache() + .getRound(commitMessage.getSetId(), commitMessage.getRoundNumber())) + .ifPresent(round -> round.addCommitMessageToArchive(commitMessage)); + + if (warpSyncState.isWarpSyncFinished() && !AbstractState.isActiveAuthority()) { + updateSyncStateAndRuntime(commitMessage); + } + } + + private void updateSyncStateAndRuntime(CommitMessage commitMessage) { + SyncState syncState = stateManager.getSyncState(); + BigInteger lastFinalizedBlockNumber = syncState.getLastFinalizedBlockNumber(); + if (commitMessage.getVote().getBlockNumber().compareTo(lastFinalizedBlockNumber) < 1) { + return; + } + syncState.finalizedCommitMessage(commitMessage); + + new Thread(() -> warpSyncState.updateRuntime(lastFinalizedBlockNumber)).start(); + } + + /** + * Updates the Host's state with information from a neighbour message. + * Tries to update Host's set data (id and authorities) if neighbour has a greater set id than the Host. + * Synchronized to avoid race condition between checking and updating set id + * + * @param neighbourMessage received neighbour message + * @param peerId sender of message + */ + public void handleNeighbourMessage(NeighbourMessage neighbourMessage, PeerId peerId) { + messageCoordinator.sendNeighbourMessageToPeer(peerId); + if (warpSyncState.isWarpSyncFinished() && neighbourMessage.getSetId() + .compareTo(stateManager.getGrandpaSetState().getSetId()) > 0) { + BigInteger setChangeBlock = neighbourMessage.getLastFinalizedBlock().add(BigInteger.ONE); + + List response = requester.requestBlockData( + BlockRequestField.ALL, + setChangeBlock.intValueExact(), + 1 + ).join(); + + SyncMessage.BlockData block = response.getFirst(); + + if (block.getIsEmptyJustification()) { + log.log(Level.WARNING, "No justification for block " + setChangeBlock); + return; + } + + Justification justification = JustificationReader.getInstance().read( + new ScaleCodecReader(block.getJustification().toByteArray())); + + boolean verified = JustificationVerifier.verify(justification); + + if (verified) { + processNeighbourUpdates(block); + } + } + } + + private void processNeighbourUpdates(SyncMessage.BlockData block) { + BlockHeader header = BlockHeaderReader.getInstance().read(new ScaleCodecReader(block.getHeader().toByteArray())); + + stateManager.getSyncState().finalizeHeader(header); + + DigestHelper.getGrandpaConsensusMessage(header.getDigest()) + .ifPresent(cm -> stateManager.getGrandpaSetState() + .handleGrandpaConsensusMessage(cm, header.getBlockNumber())); + + // Executes scheduled or forced authority changes for the last finalized block. + boolean changeInAuthoritySet = stateManager.getGrandpaSetState().handleAuthoritySetChange( + stateManager.getSyncState().getLastFinalizedBlockNumber()); + + if (warpSyncState.isWarpSyncFinished() && changeInAuthoritySet) { + new Thread(messageCoordinator::sendMessagesToPeers).start(); + } + } + + /** + * Initiates and sends a catch-up request to a specific peer. + * + * @param neighbourMessage received neighbour message + * @param peerId peer to send the catch-up message to + */ + public void initiateAndSendCatchUpRequest(NeighbourMessage neighbourMessage, PeerId peerId) { + GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); + // If peer has the same voter set id + if (neighbourMessage.getSetId().equals(grandpaSetState.getSetId())) { + + // Check if needed to catch-up peer + if (neighbourMessage.getRound().compareTo( + grandpaSetState.fetchLatestRound().getRoundNumber().add(CATCH_UP_THRESHOLD)) >= 0) { + log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead."); + + CatchUpReqMessage catchUpReqMessage = CatchUpReqMessage.builder() + .round(neighbourMessage.getRound()) + .setId(neighbourMessage.getSetId()).build(); + + messageCoordinator.sendCatchUpRequestToPeer(peerId, catchUpReqMessage); + } + } + } + + /** + * Handles a catch-up request from a peer, initiating and sending corresponding catch-up response. + * + * @param peerId peer requesting catch-up message + * @param catchUpReqMessage received catch-up request message + * @param peerIds set of connected peer ids + */ + public void initiateAndSendCatchUpResponse(PeerId peerId, + CatchUpReqMessage catchUpReqMessage, + Supplier> peerIds) { + + GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); + if (!peerIds.get().contains(peerId)) { + throw new GrandpaGenericException("Requesting catching up from a non-peer."); + } + + if (!catchUpReqMessage.getSetId().equals(grandpaSetState.getSetId())) { + throw new GrandpaGenericException("Catch up message has a different setId."); + } + + if (catchUpReqMessage.getRound().compareTo(grandpaSetState.fetchLatestRound().getRoundNumber()) > 0) { + throw new GrandpaGenericException("Catching up on a round in the future."); + } + + GrandpaRound selectedGrandpaRound = selectRound(catchUpReqMessage.getRound(), catchUpReqMessage.getSetId()) + .orElseThrow(() -> new GrandpaGenericException("Target round was no found.")); + + SignedVote[] preCommits = selectedGrandpaRound.getPreCommits().values().toArray(SignedVote[]::new); + SignedVote[] preVotes = selectedGrandpaRound.getPreVotes().values().toArray(SignedVote[]::new); + + BlockHeader finalizedBlockHeader = selectedGrandpaRound.getFinalizedBlock(); + + CatchUpResMessage catchUpResMessage = CatchUpResMessage.builder() + .round(selectedGrandpaRound.getRoundNumber()) + .setId(grandpaSetState.getSetId()) + .preCommits(preCommits) + .preVotes(preVotes) + .blockHash(finalizedBlockHeader.getHash()) + .blockNumber(finalizedBlockHeader.getBlockNumber()) + .build(); + + messageCoordinator.sendCatchUpResponseToPeer(peerId, catchUpResMessage); + } + + private Optional selectRound(BigInteger peerRoundNumber, BigInteger peerSetId) { + GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); + + GrandpaRound round = grandpaSetState.getRoundCache().getLatestRound(grandpaSetState.getSetId()); + + while (round != null) { + // Round found + // Check voter set + if (round.getRoundNumber().equals(peerRoundNumber)) { + if (round.getCommitMessagesArchive().getFirst().getSetId().equals(peerSetId)) { + break; + } + } + // Go to the previous round + round = round.getPrevious(); + } + + return Optional.ofNullable(round); + } +} diff --git a/src/main/java/com/limechain/rpc/config/CommonConfig.java b/src/main/java/com/limechain/rpc/config/CommonConfig.java index a90327f17..ee56947ed 100644 --- a/src/main/java/com/limechain/rpc/config/CommonConfig.java +++ b/src/main/java/com/limechain/rpc/config/CommonConfig.java @@ -104,9 +104,8 @@ public NetworkService networkService(ChainService chainService, public WarpSyncState warpSyncState(StateManager stateManager, KVRepository repository, RuntimeBuilder runtimeBuilder, - PeerRequester requester, - PeerMessageCoordinator messageCoordinator) { - return new WarpSyncState(stateManager, repository, runtimeBuilder, requester, messageCoordinator); + PeerRequester requester) { + return new WarpSyncState(stateManager, repository, runtimeBuilder, requester); } @Bean diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index cae4f0cb1..e940f4047 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -2,28 +2,15 @@ import com.limechain.exception.global.RuntimeCodeException; import com.limechain.exception.trie.TrieDecoderException; -import com.limechain.grandpa.state.GrandpaSetState; -import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.PeerRequester; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; -import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; -import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; import com.limechain.network.protocol.lightclient.pb.LightClientMessage; -import com.limechain.network.protocol.sync.BlockRequestField; -import com.limechain.network.protocol.sync.pb.SyncMessage.BlockData; -import com.limechain.network.protocol.warp.DigestHelper; -import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.network.protocol.warp.dto.DigestType; -import com.limechain.network.protocol.warp.dto.Justification; -import com.limechain.network.protocol.warp.scale.reader.BlockHeaderReader; -import com.limechain.network.protocol.warp.scale.reader.JustificationReader; import com.limechain.runtime.Runtime; import com.limechain.runtime.RuntimeBuilder; -import com.limechain.state.AbstractState; import com.limechain.state.StateManager; import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; -import com.limechain.sync.JustificationVerifier; import com.limechain.sync.state.SyncState; import com.limechain.trie.decoded.Trie; import com.limechain.trie.decoded.TrieVerifier; @@ -31,7 +18,6 @@ import com.limechain.utils.StringUtils; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.emeraldpay.polkaj.types.Hash256; -import io.libp2p.core.PeerId; import lombok.Getter; import lombok.Setter; import lombok.extern.java.Log; @@ -39,7 +25,6 @@ import java.math.BigInteger; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.logging.Level; @@ -52,7 +37,6 @@ public class WarpSyncState { private final StateManager stateManager; private final PeerRequester requester; - private final PeerMessageCoordinator messageCoordinator; private final KVRepository db; public static final String CODE_KEY = StringUtils.toHex(":code"); @@ -60,7 +44,7 @@ public class WarpSyncState { @Getter private boolean warpSyncFragmentsFinished; @Getter - private boolean warpSyncFinished; + private boolean warpSyncFinished = false; @Getter private Runtime runtime; @@ -74,30 +58,26 @@ public class WarpSyncState { public WarpSyncState(StateManager stateManager, KVRepository db, RuntimeBuilder runtimeBuilder, - PeerRequester requester, - PeerMessageCoordinator messageCoordinator) { + PeerRequester requester) { this(stateManager, db, runtimeBuilder, new HashSet<>(), - requester, - messageCoordinator + requester ); } public WarpSyncState(StateManager stateManager, KVRepository db, RuntimeBuilder runtimeBuilder, Set scheduledRuntimeUpdateBlocks, - PeerRequester requester, - PeerMessageCoordinator messageCoordinator) { + PeerRequester requester) { this.stateManager = stateManager; this.db = db; this.runtimeBuilder = runtimeBuilder; this.scheduledRuntimeUpdateBlocks = scheduledRuntimeUpdateBlocks; this.requester = requester; - this.messageCoordinator = messageCoordinator; } /** @@ -115,59 +95,10 @@ public void syncBlockAnnounce(BlockAnnounceMessage blockAnnounceMessage) { } } - /** - * Updates the Host's state with information from a commit message. - * Synchronized to avoid race condition between checking and updating latest block - * Scheduled runtime updates for synchronized blocks are executed. - * - * @param commitMessage received commit message - * @param peerId sender of the message - */ - public synchronized void syncCommit(CommitMessage commitMessage, PeerId peerId) { - if (commitMessage.getVote().getBlockNumber().compareTo( - stateManager.getSyncState().getLastFinalizedBlockNumber()) <= 0) { - log.log(Level.FINE, String.format("Received commit message for finalized block %d from peer %s", - commitMessage.getVote().getBlockNumber(), peerId)); - return; - } - - log.log(Level.FINE, "Received commit message from peer " + peerId - + " for block #" + commitMessage.getVote().getBlockNumber() - + " with hash " + commitMessage.getVote().getBlockHash() - + " with setId " + commitMessage.getSetId() + " and round " + commitMessage.getRoundNumber() - + " with " + commitMessage.getPreCommits().length + " voters"); - - boolean verified = JustificationVerifier.verify(Justification.fromCommitMessage(commitMessage)); - - if (!verified) { - log.log(Level.WARNING, "Could not verify commit from peer: " + peerId); - return; - } - - GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); - grandpaSetState.getRoundCache() - .getRound(commitMessage.getSetId(), commitMessage.getRoundNumber()) - .addCommitMessageToArchive(commitMessage); - - if (warpSyncFinished && !AbstractState.isActiveAuthority()) { - updateState(commitMessage); - } - } - - private void updateState(CommitMessage commitMessage) { - SyncState syncState = stateManager.getSyncState(); - BigInteger lastFinalizedBlockNumber = syncState.getLastFinalizedBlockNumber(); - if (commitMessage.getVote().getBlockNumber().compareTo(lastFinalizedBlockNumber) < 1) { + public void updateRuntime(BigInteger blockNumber) { + if (!scheduledRuntimeUpdateBlocks.contains(blockNumber)) { return; } - syncState.finalizedCommitMessage(commitMessage); - - if (warpSyncFinished && scheduledRuntimeUpdateBlocks.contains(lastFinalizedBlockNumber)) { - new Thread(this::updateRuntime).start(); - } - } - - private void updateRuntime() { updateRuntimeCode(); buildRuntime(); BigInteger lastFinalizedBlockNumber = stateManager.getSyncState().getLastFinalizedBlockNumber(); @@ -260,65 +191,4 @@ public void loadSavedRuntimeCode() { this.runtimeCode = (byte[]) db.find(DBConstants.RUNTIME_CODE) .orElseThrow(() -> new RuntimeCodeException("No available runtime code")); } - - /** - * Updates the Host's state with information from a neighbour message. - * Tries to update Host's set data (id and authorities) if neighbour has a greater set id than the Host. - * Synchronized to avoid race condition between checking and updating set id - * - * @param neighbourMessage received neighbour message - * @param peerId sender of message - */ - public void syncNeighbourMessage(NeighbourMessage neighbourMessage, PeerId peerId) { - messageCoordinator.sendNeighbourMessageToPeer(peerId); - if (warpSyncFinished && neighbourMessage.getSetId() - .compareTo(stateManager.getGrandpaSetState().getSetId()) > 0) { - updateSetData(neighbourMessage.getLastFinalizedBlock().add(BigInteger.ONE)); - } - } - - private void updateSetData(BigInteger setChangeBlock) { - - List response = requester.requestBlockData( - BlockRequestField.ALL, - setChangeBlock.intValueExact(), - 1 - ).join(); - - BlockData block = response.getFirst(); - - if (block.getIsEmptyJustification()) { - log.log(Level.WARNING, "No justification for block " + setChangeBlock); - return; - } - - Justification justification = JustificationReader.getInstance().read( - new ScaleCodecReader(block.getJustification().toByteArray())); - - boolean verified = JustificationVerifier.verify(justification); - - if (verified) { - BlockHeader header = BlockHeaderReader.getInstance().read(new ScaleCodecReader(block.getHeader().toByteArray())); - - stateManager.getSyncState().finalizeHeader(header); - - DigestHelper.getGrandpaConsensusMessage(header.getDigest()) - .ifPresent(cm -> stateManager.getGrandpaSetState() - .handleGrandpaConsensusMessage(cm, header.getBlockNumber())); - - handleScheduledEvents(); - } - } - - /** - * Executes scheduled or forced authority changes for the last finalized block. - */ - public void handleScheduledEvents() { - boolean updated = stateManager.getGrandpaSetState().handleAuthoritySetChange( - stateManager.getSyncState().getLastFinalizedBlockNumber()); - - if (warpSyncFinished && updated) { - new Thread(messageCoordinator::sendMessagesToPeers).start(); - } - } } \ No newline at end of file diff --git a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java index 5b0e18d6b..966ed4f71 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java @@ -1,6 +1,7 @@ package com.limechain.sync.warpsync.action; import com.limechain.exception.sync.JustificationVerificationException; +import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.protocol.warp.DigestHelper; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.network.protocol.warp.dto.WarpSyncFragment; @@ -22,10 +23,12 @@ public class VerifyJustificationAction implements WarpSyncAction { private final WarpSyncState warpSyncState; private final StateManager stateManager; private Exception error; + private final PeerMessageCoordinator messageCoordinator; public VerifyJustificationAction() { this.stateManager = AppBean.getBean(StateManager.class); this.warpSyncState = AppBean.getBean(WarpSyncState.class); + this.messageCoordinator = AppBean.getBean(PeerMessageCoordinator.class); } @Override @@ -48,7 +51,13 @@ public void next(WarpSyncMachine sync) { @Override public void handle(WarpSyncMachine sync) { try { - warpSyncState.handleScheduledEvents(); + // Executes scheduled or forced authority changes for the last finalized block. + boolean changeInAuthoritySet = stateManager.getGrandpaSetState().handleAuthoritySetChange( + stateManager.getSyncState().getLastFinalizedBlockNumber()); + + if (warpSyncState.isWarpSyncFinished() && changeInAuthoritySet) { + new Thread(messageCoordinator::sendMessagesToPeers).start(); + } WarpSyncFragment fragment = sync.getFragmentsQueue().poll(); log.log(Level.INFO, "Verifying justification..."); diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java index ff2c1aaf2..fefb21f65 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java @@ -19,7 +19,6 @@ import com.limechain.network.protocol.message.ProtocolMessageBuilder; import com.limechain.state.AbstractState; import com.limechain.sync.SyncMode; -import com.limechain.sync.warpsync.WarpSyncState; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.libp2p.core.PeerId; import io.libp2p.core.Stream; @@ -55,7 +54,7 @@ class GrandpaEngineTest { @Mock private ConnectionManager connectionManager; @Mock - private WarpSyncState warpSyncState; + private GrandpaMessageHandler grandpaMessageHandler; @Mock private GrandpaSetState grandpaSetState; @Mock @@ -77,7 +76,7 @@ void receiveRequestWithUnknownGrandpaTypeShouldLogAndIgnore() { grandpaEngine.receiveRequest(unknownTypeMessage, stream); verifyNoInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); } // INITIATOR STREAM @@ -91,7 +90,7 @@ void receiveNonHandshakeRequestOnInitiatorStreamShouldLogAndIgnore() { grandpaEngine.receiveRequest(message, stream); verifyNoInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); } @Test @@ -132,7 +131,7 @@ void receiveNonHandshakeRequestOnResponderStreamWhenNotConnectedShouldLogAndClos grandpaEngine.receiveRequest(message, stream); verifyNoMoreInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); verify(stream).close(); } @@ -148,7 +147,7 @@ void receiveHandshakeRequestOnResponderStreamWhenAlreadyConnectedShouldLogAndClo grandpaEngine.receiveRequest(message, stream); verifyNoMoreInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); verify(stream).close(); } } @@ -208,7 +207,7 @@ void receiveCommitMessageOnResponderStreamWhenShouldSyncCommit() { ) { grandpaEngine.receiveRequest(message, stream); - verify(warpSyncState).syncCommit(commitMessage, peerId); + verify(grandpaMessageHandler).handleCommitMessage(commitMessage, peerId); } } } @@ -227,7 +226,7 @@ void receiveNeighbourMessageOnResponderStreamWhenShouldSyncNeighbourMessage() { (mock, context) -> when(mock.read(any(NeighbourMessageScaleReader.class))).thenReturn(neighbourMessage)) ) { grandpaEngine.receiveRequest(message, stream); - verify(warpSyncState).syncNeighbourMessage(neighbourMessage, peerId); + verify(grandpaMessageHandler).handleNeighbourMessage(neighbourMessage, peerId); } } @@ -246,7 +245,7 @@ void receiveVoteMessageOnResponderStreamShouldDecodeLogAndIgnore() { grandpaEngine.receiveRequest(message, stream); verifyNoMoreInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); } } @@ -265,7 +264,7 @@ void receiveCatchUpRequestMessageOnResponderStreamShouldLogAndIgnore() { grandpaEngine.receiveRequest(message, stream); verifyNoMoreInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); } } @@ -284,7 +283,7 @@ void receiveCatchUpResponseMessageOnResponderStreamShouldLogAndIgnore() { grandpaEngine.receiveRequest(message, stream); verifyNoMoreInteractions(connectionManager); - verifyNoInteractions(warpSyncState); + verifyNoInteractions(grandpaMessageHandler); } }