Skip to content

Commit

Permalink
738 handle grandpa messages in new class (#739)
Browse files Browse the repository at this point in the history
# Description

- Introduced new GrandpaMessageHandler class, where all the messages,
received from GrandpaEngine will be passed.

- It will handle the grandpa messages.

- Extracted logic, related to grandpa messages from WarpSyncState to
GrandpaMessageHandler.

- Fixed tests.

Fixes #738 >

---------

Co-authored-by: Hristiyan Mitov <hristiyan.mitov@limechain.tech>
Co-authored-by: Georgi Grigorov <72125420+Grigorov-Georgi@users.noreply.github.com>
Co-authored-by: osrib <oleksandr.sribnyi@limechain.tech>
  • Loading branch information
4 people authored Feb 3, 2025
1 parent f561ce7 commit 6de5daa
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 312 deletions.
145 changes: 0 additions & 145 deletions src/main/java/com/limechain/grandpa/state/GrandpaSetState.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,14 @@
import com.limechain.ServiceConsensusState;
import com.limechain.chain.lightsyncstate.Authority;
import com.limechain.chain.lightsyncstate.LightSyncState;
import com.limechain.exception.grandpa.GrandpaGenericException;
import com.limechain.grandpa.round.GrandpaRound;
import com.limechain.grandpa.round.RoundCache;
import com.limechain.grandpa.vote.SignedVote;
import com.limechain.grandpa.vote.SubRound;
import com.limechain.grandpa.vote.Vote;
import com.limechain.network.PeerMessageCoordinator;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage;
import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage;
import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage;
import com.limechain.network.protocol.grandpa.messages.vote.FullVote;
import com.limechain.network.protocol.grandpa.messages.vote.FullVoteScaleWriter;
import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage;
import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.runtime.Runtime;
import com.limechain.runtime.hostapi.dto.Key;
import com.limechain.runtime.hostapi.dto.VerifySignature;
import com.limechain.state.AbstractState;
import com.limechain.storage.DBConstants;
import com.limechain.storage.KVRepository;
Expand All @@ -32,10 +21,7 @@
import com.limechain.sync.warpsync.dto.AuthoritySetChange;
import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange;
import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange;
import com.limechain.utils.Ed25519Utils;
import com.limechain.utils.scale.ScaleUtils;
import io.emeraldpay.polkaj.types.Hash256;
import io.libp2p.core.PeerId;
import io.libp2p.core.crypto.PubKey;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -51,8 +37,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;

/**
Expand All @@ -68,7 +52,6 @@
public class GrandpaSetState extends AbstractState implements ServiceConsensusState {

private static final BigInteger THRESHOLD_DENOMINATOR = BigInteger.valueOf(3);
private static final BigInteger CATCH_UP_THRESHOLD = BigInteger.TWO;

private List<Authority> authorities;
private BigInteger disabledAuthority;
Expand Down Expand Up @@ -253,62 +236,6 @@ public void handleGrandpaConsensusMessage(GrandpaConsensusMessage consensusMessa
log.fine(String.format("Updated grandpa set config: %s", consensusMessage.getFormat().toString()));
}

public void handleVoteMessage(VoteMessage voteMessage) {
BigInteger voteMessageSetId = voteMessage.getSetId();
BigInteger voteMessageRoundNumber = voteMessage.getRound();
SignedMessage signedMessage = voteMessage.getMessage();

SignedVote signedVote = new SignedVote();
signedVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber()));
signedVote.setSignature(signedMessage.getSignature());
signedVote.setAuthorityPublicKey(signedMessage.getAuthorityPublicKey());

if (!isValidMessageSignature(voteMessage)) {
log.warning(
String.format("Vote message signature is invalid for round %s, set %s, block hash %s, block number %s",
voteMessageSetId, voteMessageSetId, signedMessage.getBlockHash(), signedMessage.getBlockNumber()));
return;
}

GrandpaRound round = roundCache.getRound(voteMessageSetId, voteMessageRoundNumber);
if (round == null) {
round = new GrandpaRound();
round.setRoundNumber(voteMessageRoundNumber);
roundCache.addRound(voteMessageSetId, round);
}

SubRound subround = signedMessage.getStage();
switch (subround) {
case PRE_VOTE -> round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote);
case PRE_COMMIT -> round.getPreCommits().put(signedMessage.getAuthorityPublicKey(), signedVote);
case PRIMARY_PROPOSAL -> {
round.setPrimaryVote(signedVote);
round.getPreVotes().put(signedMessage.getAuthorityPublicKey(), signedVote);
}
default -> throw new GrandpaGenericException("Unknown subround: " + subround);
}
}

private boolean isValidMessageSignature(VoteMessage voteMessage) {
SignedMessage signedMessage = voteMessage.getMessage();

FullVote fullVote = new FullVote();
fullVote.setRound(voteMessage.getRound());
fullVote.setSetId(voteMessage.getSetId());
fullVote.setVote(new Vote(signedMessage.getBlockHash(), signedMessage.getBlockNumber()));
fullVote.setStage(signedMessage.getStage());

byte[] encodedFullVote = ScaleUtils.Encode.encode(FullVoteScaleWriter.getInstance(), fullVote);

VerifySignature verifySignature = new VerifySignature(
signedMessage.getSignature().getBytes(),
encodedFullVote,
signedMessage.getAuthorityPublicKey().getBytes(),
Key.ED25519);

return Ed25519Utils.verifySignature(verifySignature);
}

private void updateAuthorityStatus() {
Optional<Pair<byte[], byte[]>> keyPair = authorities.stream()
.map(a -> keyStore.getKeyPair(KeyType.GRANDPA, a.getPublicKey()))
Expand All @@ -318,76 +245,4 @@ private void updateAuthorityStatus() {

keyPair.ifPresentOrElse(AbstractState::setAuthorityStatus, AbstractState::clearAuthorityStatus);
}

public void initiateAndSendCatchUpRequest(NeighbourMessage neighbourMessage, PeerId peerId) {
// If peer has the same voter set id
if (neighbourMessage.getSetId().equals(setId)) {

// Check if needed to catch-up peer
if (neighbourMessage.getRound().compareTo(
fetchLatestRound().getRoundNumber().add(CATCH_UP_THRESHOLD)) >= 0) {
log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead.");

CatchUpReqMessage catchUpReqMessage = CatchUpReqMessage.builder()
.round(neighbourMessage.getRound())
.setId(neighbourMessage.getSetId()).build();

messageCoordinator.sendCatchUpRequestToPeer(peerId, catchUpReqMessage);
}
}
}

public void initiateAndSendCatchUpResponse(PeerId peerId,
CatchUpReqMessage catchUpReqMessage,
Supplier<Set<PeerId>> peerIds) {

if (!peerIds.get().contains(peerId)) {
throw new GrandpaGenericException("Requesting catching up from a non-peer.");
}

if (!catchUpReqMessage.getSetId().equals(setId)) {
throw new GrandpaGenericException("Catch up message has a different setId.");
}

if (catchUpReqMessage.getRound().compareTo(fetchLatestRound().getRoundNumber()) > 0) {
throw new GrandpaGenericException("Catching up on a round in the future.");
}

GrandpaRound selectedGrandpaRound = selectRound(catchUpReqMessage.getRound(), catchUpReqMessage.getSetId())
.orElseThrow(() -> new GrandpaGenericException("Target round was no found."));

SignedVote[] preCommits = selectedGrandpaRound.getPreCommits().values().toArray(SignedVote[]::new);
SignedVote[] preVotes = selectedGrandpaRound.getPreVotes().values().toArray(SignedVote[]::new);

BlockHeader finalizedBlockHeader = selectedGrandpaRound.getFinalizedBlock();

CatchUpResMessage catchUpResMessage = CatchUpResMessage.builder()
.round(selectedGrandpaRound.getRoundNumber())
.setId(setId)
.preCommits(preCommits)
.preVotes(preVotes)
.blockHash(finalizedBlockHeader.getHash())
.blockNumber(finalizedBlockHeader.getBlockNumber())
.build();

messageCoordinator.sendCatchUpResponseToPeer(peerId, catchUpResMessage);
}

private Optional<GrandpaRound> selectRound(BigInteger peerRoundNumber, BigInteger peerSetId) {
GrandpaRound round = roundCache.getLatestRound(setId);

while (round != null) {
// Round found
// Check voter set
if (round.getRoundNumber().equals(peerRoundNumber)) {
if (round.getCommitMessagesArchive().getFirst().getSetId().equals(peerSetId)) {
break;
}
}
// Go to the previous round
round = round.getPrevious();
}

return Optional.ofNullable(round);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ public void sendCatchUpResponse(byte[] encodedCatchUpResMessage) {
* Sends a vote message over the controller stream.
*/
public void sendVoteMessage(byte[] encodedVoteMessage) {
engine.writeCommitMessage(stream, encodedVoteMessage);
engine.writeVoteMessage(stream, encodedVoteMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.limechain.exception.scale.ScaleEncodingException;
import com.limechain.grandpa.GrandpaService;
import com.limechain.grandpa.state.GrandpaSetState;
import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder;
import com.limechain.network.protocol.grandpa.messages.GrandpaMessageType;
Expand Down Expand Up @@ -41,19 +40,14 @@
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class GrandpaEngine {
private static final int HANDSHAKE_LENGTH = 1;

private final WarpSyncState warpSyncState;

protected ConnectionManager connectionManager;
protected BlockAnnounceHandshakeBuilder handshakeBuilder;
protected GrandpaSetState grandpaSetState;
protected GrandpaMessageHandler grandpaMessageHandler;

public GrandpaEngine() {
warpSyncState = AppBean.getBean(WarpSyncState.class);

connectionManager = ConnectionManager.getInstance();
handshakeBuilder = new BlockAnnounceHandshakeBuilder();
grandpaSetState = AppBean.getBean(GrandpaSetState.class);
grandpaMessageHandler = AppBean.getBean(GrandpaMessageHandler.class);
}

/**
Expand Down Expand Up @@ -147,25 +141,25 @@ private void handleNeighbourMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
NeighbourMessage neighbourMessage = reader.read(NeighbourMessageScaleReader.getInstance());
log.log(Level.FINE, "Received neighbour message from Peer " + peerId + "\n" + neighbourMessage);
new Thread(() -> warpSyncState.syncNeighbourMessage(neighbourMessage, peerId)).start();
new Thread(() -> grandpaMessageHandler.handleNeighbourMessage(neighbourMessage, peerId)).start();

if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) {
grandpaSetState.initiateAndSendCatchUpRequest(neighbourMessage, peerId);
grandpaMessageHandler.initiateAndSendCatchUpRequest(neighbourMessage, peerId);
}
}

private void handleVoteMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
VoteMessage voteMessage = reader.read(VoteMessageScaleReader.getInstance());
grandpaSetState.handleVoteMessage(voteMessage);
//todo: handle vote message (authoring node responsibility?)
log.log(Level.INFO, "Received vote message from Peer " + peerId + "\n" + voteMessage);
//Maybe we need to add possible roundNumber check
grandpaMessageHandler.handleVoteMessage(voteMessage);
}

private void handleCommitMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
CommitMessage commitMessage = reader.read(CommitMessageScaleReader.getInstance());
warpSyncState.syncCommit(commitMessage, peerId);
grandpaMessageHandler.handleCommitMessage(commitMessage, peerId);
}

private void handleCatchupRequestMessage(byte[] message, PeerId peerId) {
Expand All @@ -174,7 +168,7 @@ private void handleCatchupRequestMessage(byte[] message, PeerId peerId) {
log.log(Level.INFO, "Received catch up request message from Peer " + peerId + "\n" + catchUpReqMessage);

if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) {
grandpaSetState.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds);
grandpaMessageHandler.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds);
}
}

Expand Down Expand Up @@ -254,7 +248,7 @@ public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage)
/**
* Send our GRANDPA vote message from {@link GrandpaService} on a given <b>responder</b> stream.
*
* @param stream <b>responder</b> stream to write the message to
* @param stream <b>responder</b> stream to write the message to
* @param encodedVoteMessage scale encoded VoteMessage object
*/
public void writeVoteMessage(Stream stream, byte[] encodedVoteMessage) {
Expand Down
Loading

0 comments on commit 6de5daa

Please sign in to comment.