From 9223e1111d52b4d4e450ceaca1fb2b5285c2b6bd Mon Sep 17 00:00:00 2001 From: Lisandro Date: Fri, 29 Sep 2017 11:52:38 -0300 Subject: [PATCH] reorder some classes and split some parts of SyncProcessor --- COPYING | 2 +- .../main/java/co/rsk/net/SyncProcessor.java | 173 +++++------ .../co/rsk/net/sync/DecidingSyncState.java | 40 +++ .../co/rsk/net/sync/DecidingSyncStatus.java | 50 ---- .../FindingConnectionPointSyncStatus.java | 11 - .../co/rsk/net/sync/PeersInformation.java | 34 +-- .../co/rsk/net/sync/SyncConfiguration.java | 47 ++- .../co/rsk/net/sync/SyncEventsHandler.java | 6 + .../SyncPeerProcessor.java} | 274 +++++++----------- .../java/co/rsk/net/sync/SyncPeerStatus.java | 62 ++++ .../main/java/co/rsk/net/sync/SyncState.java | 22 ++ .../java/co/rsk/net/sync/SyncStatesIds.java | 6 + .../main/java/co/rsk/net/sync/SyncStatus.java | 19 -- .../co/rsk/net/sync/SyncStatusHandler.java | 54 ---- .../java/co/rsk/net/sync/SyncStatusIds.java | 6 - .../co/rsk/net/sync/SyncStatusSetter.java | 5 - .../net/sync/SyncingWithPeerSyncState.java | 44 +++ .../co/rsk/net/NodeMessageHandlerTest.java | 1 - .../java/co/rsk/net/SyncPeerStatusTest.java | 7 +- .../java/co/rsk/net/SyncProcessorTest.java | 181 +++++------- .../ThreeAsyncNodeUsingSyncProcessorTest.java | 60 ++-- .../TwoAsyncNodeUsingSyncProcessorTest.java | 20 +- .../rsk/net/simples/SimpleMessageChannel.java | 13 +- .../rsk/net/simples/SimpleStatusHandler.java | 18 -- .../rsk/net/sync/DecidingSyncStateTest.java | 93 ++++++ .../rsk/net/sync/DecidingSyncStatusTest.java | 85 ------ .../FindingConnectionPointSyncStatusTest.java | 17 -- .../rsk/net/sync/SimpleSyncEventsHandler.java | 20 ++ .../sync/SyncingWithPeerSyncStateTest.java | 26 ++ .../java/co/rsk/net/utils/StatusUtils.java | 15 + 30 files changed, 714 insertions(+), 697 deletions(-) create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncState.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncStatus.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/FindingConnectionPointSyncStatus.java create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java rename rskj-core/src/main/java/co/rsk/net/{SyncPeerStatus.java => sync/SyncPeerProcessor.java} (65%) create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncPeerStatus.java create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncState.java create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncStatesIds.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncStatus.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncStatusHandler.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncStatusIds.java delete mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncStatusSetter.java create mode 100644 rskj-core/src/main/java/co/rsk/net/sync/SyncingWithPeerSyncState.java delete mode 100644 rskj-core/src/test/java/co/rsk/net/simples/SimpleStatusHandler.java create mode 100644 rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStateTest.java delete mode 100644 rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStatusTest.java delete mode 100644 rskj-core/src/test/java/co/rsk/net/sync/FindingConnectionPointSyncStatusTest.java create mode 100644 rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java create mode 100644 rskj-core/src/test/java/co/rsk/net/sync/SyncingWithPeerSyncStateTest.java create mode 100644 rskj-core/src/test/java/co/rsk/net/utils/StatusUtils.java diff --git a/COPYING b/COPYING index 632eb8f6505..3213004689d 100644 --- a/COPYING +++ b/COPYING @@ -301,7 +301,7 @@ or household purposes, or (2) anything designed or sold for incorporation into a dwelling. In determining whether a product is a consumer product, doubtful cases shall be resolved in favor of coverage. For a particular product received by a particular user, "normally used" refers to a -typical or common use of that class of product, regardless of the status +typical or common use of that class of product, regardless of the syncState of the particular user or of the way in which the particular user actually uses, or expects or is expected to use, the product. A product is a consumer product regardless of whether the product has substantial diff --git a/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java b/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java index cc5174868cc..7f74e6f3338 100644 --- a/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java +++ b/rskj-core/src/main/java/co/rsk/net/SyncProcessor.java @@ -2,9 +2,7 @@ import co.rsk.core.bc.BlockChainStatus; import co.rsk.net.messages.*; -import co.rsk.net.sync.PeersInformation; -import co.rsk.net.sync.SyncConfiguration; -import co.rsk.net.sync.SyncStatusHandler; +import co.rsk.net.sync.*; import co.rsk.validators.BlockDifficultyRule; import co.rsk.validators.BlockParentDependantValidationRule; import co.rsk.validators.BlockValidationRule; @@ -25,55 +23,56 @@ * Created by ajlopez on 29/08/2017. * This class' methods are executed one at a time because NodeMessageHandler is synchronized. */ -public class SyncProcessor { +public class SyncProcessor implements SyncEventsHandler { private static final Logger logger = LoggerFactory.getLogger("syncprocessor"); private static BlockValidationRule blockValidationRule = new ProofOfWorkRule(); private static BlockParentDependantValidationRule blockParentValidationRule = new BlockDifficultyRule(); + private final SyncConfiguration syncConfiguration; private long lastRequestId; private Blockchain blockchain; private BlockSyncService blockSyncService; - private PeersInformation peerStatuses = new PeersInformation(); + private PeersInformation peerStatuses; private Map pendingBodyResponses = new HashMap<>(); private Map senders = new HashMap<>(); - private SyncStatusHandler statusHandler; + private SyncState syncState; + private SyncPeerProcessor syncPeerProcessor; + private MessageChannel selectedPeer; public SyncProcessor(Blockchain blockchain, BlockSyncService blockSyncService, SyncConfiguration syncConfiguration) { + // TODO(mc) implement FollowBestChain this.blockchain = blockchain; this.blockSyncService = blockSyncService; - // TODO(mc) implement FollowBestChain - this.statusHandler = new SyncStatusHandler(peerStatuses, syncConfiguration); - this.statusHandler.onFinishedWaitingForPeers(this::findConnectionPointOfBestPeer); + this.syncConfiguration = syncConfiguration; + this.peerStatuses = new PeersInformation(syncConfiguration); + this.syncState = new DecidingSyncState(this.syncConfiguration, this, peerStatuses); + this.syncPeerProcessor = new SyncPeerProcessor(); } public void processStatus(MessageChannel sender, Status status) { - logger.trace("Receiving status from node {} block {} {}", sender.getPeerNodeID(), status.getBestBlockNumber(), Hex.toHexString(status.getBestBlockHash()).substring(0, 6), status.getBestBlockHash()); - - // save sender because we can be called back to findConnectionPointOfBestPeer - senders.put(sender.getPeerNodeID(), sender); - statusHandler.newPeerStatus(sender.getPeerNodeID(), status); + logger.trace("Receiving syncState from node {} block {} {}", sender.getPeerNodeID(), status.getBestBlockNumber(), Hex.toHexString(status.getBestBlockHash()).substring(0, 6), status.getBestBlockHash()); + this.peerStatuses.getOrRegisterPeer(sender).setStatus(status); + this.syncState.newPeerStatus(); } - public void sendSkeletonRequestTo(MessageChannel peer, long height) { logger.trace("Send skeleton request to node {} height {}", peer.getPeerNodeID(), height); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(peer); + syncState.messageSent(); peer.sendMessage(new SkeletonRequestMessage(++lastRequestId, height)); - peerStatus.registerExpectedResponse(lastRequestId, MessageType.SKELETON_RESPONSE_MESSAGE); + syncPeerProcessor.registerExpectedResponse(lastRequestId, MessageType.SKELETON_RESPONSE_MESSAGE); } public void processSkeletonResponse(MessageChannel sender, SkeletonResponseMessage message) { logger.trace("Process skeleton response from node {}", sender.getPeerNodeID()); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(sender); + this.getPeerStatusAndSaveSender(sender); - if (!peerStatus.isExpectedResponse(message.getId(), message.getMessageType())) + if (!syncPeerProcessor.isExpectedResponse(message.getId(), message.getMessageType())) return; - peerStatus.setSkeleton(message.getBlockIdentifiers()); - - this.sendNextBlockHeadersRequestTo(sender, peerStatus); + syncPeerProcessor.setSkeleton(message.getBlockIdentifiers()); + this.sendNextBlockHeadersRequestTo(sender); } @VisibleForTesting @@ -91,38 +90,22 @@ int getNoAdvancedPeers() { return this.peerStatuses.countIf(s -> chainStatus.hasLowerDifficulty(s.getStatus())); } - private void findConnectionPointOfBestPeer() { - Optional bestPeerIDOptional = this.peerStatuses.getBestPeerID(); - Optional bestPeerOptional = bestPeerIDOptional.map(senders::get); - - bestPeerOptional.ifPresent(bestPeer -> { - SyncPeerStatus peerStatus = getPeerStatusAndSaveSender(bestPeer); - Status status = peerStatus.getStatus(); - if (this.blockchain.getStatus().hasLowerDifficulty(status)) { - this.findConnectionPointOf(bestPeer, status); - } - else { - statusHandler.finishedDownloadingBlocks(); - } - }); - } - - private void sendNextBlockHeadersRequestTo(MessageChannel peer, SyncPeerStatus peerStatus) { - if (!peerStatus.hasSkeleton()) + private void sendNextBlockHeadersRequestTo(MessageChannel peer) { + if (!syncPeerProcessor.hasSkeleton()) return; - List skeleton = peerStatus.getSkeleton(); + List skeleton = syncPeerProcessor.getSkeleton(); if (skeleton.isEmpty()) return; - long connectionPoint = peerStatus.getConnectionPoint().orElseGet(() -> { + long connectionPoint = syncPeerProcessor.getConnectionPoint().orElseGet(() -> { logger.error("Sending BlockHeaders request to peer {} but the connection point is missing", peer.getPeerNodeID()); return 0L; }); - if (peerStatus.getLastRequestedLinkIndex().isPresent()) { - int index = peerStatus.getLastRequestedLinkIndex().get(); + if (syncPeerProcessor.getLastRequestedLinkIndex().isPresent()) { + int index = syncPeerProcessor.getLastRequestedLinkIndex().get(); byte[] hash = skeleton.get(index).getHash(); if (this.blockSyncService.getBlockFromStoreOrBlockchain(hash) == null) @@ -130,7 +113,7 @@ private void sendNextBlockHeadersRequestTo(MessageChannel peer, SyncPeerStatus p } // We use -1 so we start iterarting from the first element - int linkIndex = peerStatus.getLastRequestedLinkIndex().orElse(-1) + 1; + int linkIndex = syncPeerProcessor.getLastRequestedLinkIndex().orElse(-1) + 1; for (int k = linkIndex; k < skeleton.size(); k++) { byte[] hash = skeleton.get(k).getHash(); @@ -146,61 +129,60 @@ private void sendNextBlockHeadersRequestTo(MessageChannel peer, SyncPeerStatus p int count = (int)(height - previousKnownHeight); logger.trace("Send headers request to node {}", peer.getPeerNodeID()); + syncState.messageSent(); peer.sendMessage(new BlockHeadersRequestMessage(++lastRequestId, hash, count)); - peerStatus.registerExpectedResponse(lastRequestId, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); - peerStatus.setLastRequestedLinkIndex(k); + syncPeerProcessor.registerExpectedResponse(lastRequestId, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + syncPeerProcessor.setLastRequestedLinkIndex(k); return; } logger.trace("Finished syncing with node {}", peer.getPeerNodeID()); - peerStatus.stopSyncing(); - statusHandler.finishedDownloadingBlocks(); + this.stopSyncing(); } public void sendBlockHashRequestTo(MessageChannel peer, long height) { logger.trace("Send hash request to node {} height {}", peer.getPeerNodeID(), height); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(peer); + syncState.messageSent(); peer.sendMessage(new BlockHashRequestMessage(++lastRequestId, height)); - peerStatus.registerExpectedResponse(lastRequestId, MessageType.BLOCK_HASH_RESPONSE_MESSAGE); + syncPeerProcessor.registerExpectedResponse(lastRequestId, MessageType.BLOCK_HASH_RESPONSE_MESSAGE); } - public void findConnectionPointOf(MessageChannel peer, Status status) { - logger.trace("Find connection point with node {}", peer.getPeerNodeID()); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(peer); - - peerStatus.startFindConnectionPoint(status.getBestBlockNumber()); - this.sendBlockHashRequestTo(peer, peerStatus.getFindingHeight()); + private void findConnectionPointOf(NodeID nodeID, Status status) { + logger.trace("Find connection point with node {}", nodeID); + syncPeerProcessor.startFindConnectionPoint(status.getBestBlockNumber()); + MessageChannel channel = peerStatuses.getPeer(nodeID).getMessageChannel(); + this.sendBlockHashRequestTo(channel, syncPeerProcessor.getFindingHeight()); } public void processBlockHashResponse(MessageChannel sender, BlockHashResponseMessage message) { logger.trace("Process block hash response from node {} hash {}", sender.getPeerNodeID(), Hex.toHexString(message.getHash()).substring(0, 6)); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(sender); + this.getPeerStatusAndSaveSender(sender); - if (!peerStatus.isExpectedResponse(message.getId(), message.getMessageType())) + if (!syncPeerProcessor.isExpectedResponse(message.getId(), message.getMessageType())) return; Block block = this.blockchain.getBlockByHash(message.getHash()); if (block != null) - peerStatus.updateFound(); + syncPeerProcessor.updateFound(); else - peerStatus.updateNotFound(); + syncPeerProcessor.updateNotFound(); - Optional cp = peerStatus.getConnectionPoint(); + Optional cp = syncPeerProcessor.getConnectionPoint(); if (cp.isPresent()) { sendSkeletonRequestTo(sender, cp.get()); return; } - sendBlockHashRequestTo(sender, peerStatus.getFindingHeight()); + sendBlockHashRequestTo(sender, syncPeerProcessor.getFindingHeight()); } public void processBlockHeadersResponse(MessageChannel sender, BlockHeadersResponseMessage message) { logger.trace("Process block headers response from node {}", sender.getPeerNodeID()); SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(sender); - if (!peerStatus.isExpectedResponse(message.getId(), message.getMessageType())) + if (!syncPeerProcessor.isExpectedResponse(message.getId(), message.getMessageType())) return; // to validate: @@ -226,7 +208,7 @@ public void processBlockHeadersResponse(MessageChannel sender, BlockHeadersRespo logger.trace("Send body request to node {} hash {}", sender.getPeerNodeID(), Hex.toHexString(header.getHash()).substring(0, 6)); sender.sendMessage(new BodyRequestMessage(++lastRequestId, header.getHash())); - peerStatus.registerExpectedResponse(lastRequestId, MessageType.BODY_RESPONSE_MESSAGE); + syncPeerProcessor.registerExpectedResponse(lastRequestId, MessageType.BODY_RESPONSE_MESSAGE); pendingBodyResponses.put(lastRequestId, new PendingBodyResponse(sender.getPeerNodeID(), header)); parent = block; @@ -256,7 +238,7 @@ public void processBodyResponse(MessageChannel sender, BodyResponseMessage messa logger.trace("Process body response from node {}", sender.getPeerNodeID()); SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(sender); - if (!peerStatus.isExpectedResponse(message.getId(), message.getMessageType())) + if (!syncPeerProcessor.isExpectedResponse(message.getId(), message.getMessageType())) return; PendingBodyResponse expected = pendingBodyResponses.get(message.getId()); @@ -268,15 +250,15 @@ public void processBodyResponse(MessageChannel sender, BodyResponseMessage messa // TODO(mc): validate transactions and uncles are part of this block (with header) blockSyncService.processBlock(sender, Block.fromValidData(expected.header, message.getTransactions(), message.getUncles())); - - this.sendNextBlockHeadersRequestTo(sender, this.getPeerStatusAndSaveSender(sender)); + this.getPeerStatusAndSaveSender(sender); + this.sendNextBlockHeadersRequestTo(sender); } public void processBlockResponse(MessageChannel sender, BlockResponseMessage message) { logger.trace("Process block response from node {} block {} {}", sender.getPeerNodeID(), message.getBlock().getNumber(), message.getBlock().getShortHash()); - SyncPeerStatus peerStatus = this.getPeerStatusAndSaveSender(sender); + this.getPeerStatusAndSaveSender(sender); - if (!peerStatus.isExpectedResponse(message.getId(), message.getMessageType())) + if (!syncPeerProcessor.isExpectedResponse(message.getId(), message.getMessageType())) return; blockSyncService.processBlock(sender, message.getBlock()); @@ -290,7 +272,8 @@ public void processNewBlockHash(MessageChannel sender, NewBlockHashMessage messa return; sender.sendMessage(new BlockRequestMessage(++lastRequestId, hash)); - this.getPeerStatusAndSaveSender(sender).registerExpectedResponse(lastRequestId, MessageType.BLOCK_RESPONSE_MESSAGE); + this.getPeerStatusAndSaveSender(sender); + syncPeerProcessor.registerExpectedResponse(lastRequestId, MessageType.BLOCK_RESPONSE_MESSAGE); } public Set getKnownPeersNodeIDs() { @@ -301,15 +284,19 @@ public SyncPeerStatus getPeerStatus(NodeID nodeID) { return this.peerStatuses.getPeer(nodeID); } + @VisibleForTesting + public SyncPeerProcessor getSyncPeerProcessor() { + return syncPeerProcessor; + } + public SyncPeerStatus getPeerStatusAndSaveSender(MessageChannel sender) { - SyncPeerStatus peerStatus = peerStatuses.getOrRegisterPeer(sender.getPeerNodeID()); - senders.put(sender.getPeerNodeID(), sender); + SyncPeerStatus peerStatus = peerStatuses.getOrRegisterPeer(sender); return peerStatus; } @VisibleForTesting - public boolean peerIsSyncing(NodeID nodeID) { - return this.peerStatuses.isKnownPeer(nodeID) && getPeerStatus(nodeID).isSyncing(); + public boolean isPeerSyncing(NodeID nodeID) { + return syncState.isSyncing() && selectedPeer.getPeerNodeID() == nodeID; } @VisibleForTesting @@ -319,16 +306,38 @@ public void expectBodyResponseFor(long requestId, NodeID nodeID, BlockHeader hea @VisibleForTesting public Map getExpectedBodyResponses() { - Map map = new HashMap<>(); - for (NodeID peerID : peerStatuses.knownNodeIds()) { - SyncPeerStatus peer = peerStatuses.getPeer(peerID); - map.putAll(peer.getExpectedResponses()); - } - return map; + return this.syncPeerProcessor.getExpectedResponses(); + } + + @VisibleForTesting + public SyncState getSyncState() { + return syncState; + } + + public void setSyncState(SyncState syncState) { + this.syncState = syncState; } public void onTimePassed(Duration timePassed) { - this.statusHandler.tick(timePassed); + this.syncState.tick(timePassed); + } + + public void canStartSyncing() { + Optional bestPeerOptional = this.peerStatuses.getBestPeer(); + bestPeerOptional.ifPresent(bp -> { + Status status = getPeerStatus(bp.getPeerNodeID()).getStatus(); + if (this.blockchain.getStatus().hasLowerDifficulty(status)) { + setSyncState(new SyncingWithPeerSyncState(this.syncConfiguration, this)); + this.syncPeerProcessor = new SyncPeerProcessor(); + this.selectedPeer = bp; + this.findConnectionPointOf(bp.getPeerNodeID(), status); + } + }); + } + + public void stopSyncing() { + this.selectedPeer = null; + setSyncState(new DecidingSyncState(this.syncConfiguration, this, peerStatuses)); } private static class PendingBodyResponse { diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncState.java new file mode 100644 index 00000000000..2b0418c0685 --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncState.java @@ -0,0 +1,40 @@ +package co.rsk.net.sync; + +import javax.annotation.Nonnull; +import java.time.Duration; + +public class DecidingSyncState implements SyncState { + private Duration timeElapsed = Duration.ZERO; + private PeersInformation knownPeers; + private SyncConfiguration syncConfiguration; + private SyncEventsHandler syncEventsHandler; + + public DecidingSyncState(SyncConfiguration syncConfiguration, SyncEventsHandler syncEventsHandler, PeersInformation knownPeers) { + this.syncConfiguration = syncConfiguration; + this.syncEventsHandler = syncEventsHandler; + this.knownPeers = knownPeers; + } + + @Nonnull + @Override + public SyncStatesIds getId() { + return SyncStatesIds.DECIDING; + } + + @Override + public void newPeerStatus() { + if (knownPeers.count() >= syncConfiguration.getExpectedPeers()) { + syncEventsHandler.canStartSyncing(); + } + } + + @Override + public void tick(Duration duration) { + timeElapsed = timeElapsed.plus(duration); + if (knownPeers.count() > 0 && + timeElapsed.compareTo(syncConfiguration.getTimeoutWaitingPeers()) >= 0) { + + syncEventsHandler.canStartSyncing(); + } + } +} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncStatus.java b/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncStatus.java deleted file mode 100644 index 3481ec1a2b7..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncStatus.java +++ /dev/null @@ -1,50 +0,0 @@ -package co.rsk.net.sync; - -import co.rsk.net.NodeID; -import co.rsk.net.Status; - -import javax.annotation.Nonnull; -import java.time.Duration; -import java.util.HashSet; -import java.util.Set; - -public class DecidingSyncStatus implements SyncStatus { - private Duration timeElapsed = Duration.ZERO; - private Set knownPeers = new HashSet<>(); - private SyncConfiguration syncConfiguration; - - public DecidingSyncStatus(SyncConfiguration syncConfiguration) { - this.syncConfiguration = syncConfiguration; - } - - @Nonnull - @Override - public SyncStatusIds getId() { - return SyncStatusIds.DECIDING; - } - - @Override - public void newPeerStatus(SyncStatusSetter statusSetter, NodeID peerID, Status status, Set finishedWaitingForPeersCallbacks) { - if (knownPeers.contains(peerID)) { - return; - } - - knownPeers.add(peerID); - - if (knownPeers.size() == syncConfiguration.getMinimumPeers()) { - statusSetter.setStatus(new FindingConnectionPointSyncStatus()); - finishedWaitingForPeersCallbacks.forEach(Runnable::run); - } - } - - @Override - public void tick(SyncStatusSetter statusSetter, Duration duration, Set finishedWaitingForPeersCallbacks) { - timeElapsed = timeElapsed.plus(duration); - if (!knownPeers.isEmpty() && - timeElapsed.toMinutes() >= syncConfiguration.getTimeoutWaitingPeers()) { - - statusSetter.setStatus(new FindingConnectionPointSyncStatus()); - finishedWaitingForPeersCallbacks.forEach(Runnable::run); - } - } -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/FindingConnectionPointSyncStatus.java b/rskj-core/src/main/java/co/rsk/net/sync/FindingConnectionPointSyncStatus.java deleted file mode 100644 index ec58c64f784..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/FindingConnectionPointSyncStatus.java +++ /dev/null @@ -1,11 +0,0 @@ -package co.rsk.net.sync; - -import javax.annotation.Nonnull; - -public class FindingConnectionPointSyncStatus implements SyncStatus { - @Nonnull - @Override - public SyncStatusIds getId() { - return SyncStatusIds.FINDING_CONNECTION_POINT; - } -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java b/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java index 9da46f90497..9c03194fe6a 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java @@ -1,7 +1,7 @@ package co.rsk.net.sync; +import co.rsk.net.MessageChannel; import co.rsk.net.NodeID; -import co.rsk.net.SyncPeerStatus; import java.time.Duration; import java.util.HashMap; @@ -16,8 +16,12 @@ * things such as the underlying communication channel. */ public class PeersInformation { + private final SyncConfiguration syncCofiguration; private Map peerStatuses = new HashMap<>(); - private static final Duration peerTimeout = Duration.ofMinutes(10); + + public PeersInformation(SyncConfiguration syncConfiguration){ + this.syncCofiguration = syncConfiguration; + } public int count() { return peerStatuses.size(); @@ -30,13 +34,13 @@ public int countIf(Predicate predicate) { return Math.toIntExact(count); } - public SyncPeerStatus getOrRegisterPeer(NodeID nodeID) { - SyncPeerStatus peerStatus = this.peerStatuses.get(nodeID); + public SyncPeerStatus getOrRegisterPeer(MessageChannel messageChannel) { + SyncPeerStatus peerStatus = this.peerStatuses.get(messageChannel.getPeerNodeID()); - if (peerStatus != null && !peerStatus.isExpired(peerTimeout)) + if (peerStatus != null && !peerStatus.isExpired(syncCofiguration.getExpirationTimePeerStatus())) return peerStatus; - return this.registerPeer(nodeID); + return this.registerPeer(messageChannel); } public SyncPeerStatus getPeer(NodeID nodeID) { @@ -44,11 +48,12 @@ public SyncPeerStatus getPeer(NodeID nodeID) { return this.peerStatuses.get(nodeID); } - public Optional getBestPeerID() { + public Optional getBestPeer() { return peerStatuses.entrySet().stream() - .filter(e -> !e.getValue().isExpired(peerTimeout)) + .filter(e -> !e.getValue().isExpired(syncCofiguration.getExpirationTimePeerStatus())) .max(this::bestPeerComparator) - .map(Map.Entry::getKey); + .map(Map.Entry::getValue) + .map(SyncPeerStatus::getMessageChannel); } public Set knownNodeIds() { @@ -56,19 +61,14 @@ public Set knownNodeIds() { } private int bestPeerComparator(Map.Entry left, Map.Entry right) { - // TODO(mc) check expiration return Long.compare( left.getValue().getStatus().getBestBlockNumber(), right.getValue().getStatus().getBestBlockNumber()); } - public SyncPeerStatus registerPeer(NodeID nodeID) { - SyncPeerStatus peerStatus = new SyncPeerStatus(); - peerStatuses.put(nodeID, peerStatus); + public SyncPeerStatus registerPeer(MessageChannel messageChannel) { + SyncPeerStatus peerStatus = new SyncPeerStatus(messageChannel); + peerStatuses.put(messageChannel.getPeerNodeID(), peerStatus); return peerStatus; } - - public boolean isKnownPeer(NodeID peerID) { - return peerStatuses.containsKey(peerID); - } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java index 7805da7f046..5f1a92abc03 100644 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java @@ -3,32 +3,47 @@ import com.google.common.annotations.VisibleForTesting; import javax.annotation.concurrent.Immutable; +import java.time.Duration; @Immutable public final class SyncConfiguration { - public static final SyncConfiguration DEFAULT = new SyncConfiguration(5, 2); - @VisibleForTesting - public static final SyncConfiguration IMMEDIATE_FOR_TESTING = new SyncConfiguration(1, 2); + public static final SyncConfiguration DEFAULT = new SyncConfiguration(5, 2, 30, 10); - private final int minimumPeers; - private final int timeoutWaitingPeers; + @VisibleForTesting + public static final SyncConfiguration IMMEDIATE_FOR_TESTING = new SyncConfiguration(1, 2, 1, 10); - public SyncConfiguration(int minimumPeers, int timeoutWaitingPeers) { - this.minimumPeers = minimumPeers; - this.timeoutWaitingPeers = timeoutWaitingPeers; - } + private final int expectedPeers; + private final Duration timeoutWaitingPeers; + private final Duration timeoutWaitingRequest; + private final Duration expirationTimePeerStatus; /** - * @return Ideally, the minimum number of peers we would want to start finding a connection point. + * + * @param expectedPeers The expected number of peers we would want to start finding a connection point. + * @param timeoutWaitingPeers Timeout in minutes to start finding the connection point when we have at least one peer + * @param timeoutWaitingRequest Timeout in seconds to wait for syncing requests + * @param expirationTimePeerStatus Expiration time in minutes for peer status */ - public final int getMinimumPeers() { - return minimumPeers; + public SyncConfiguration(int expectedPeers, int timeoutWaitingPeers, int timeoutWaitingRequest, int expirationTimePeerStatus) { + this.expectedPeers = expectedPeers; + this.timeoutWaitingPeers = Duration.ofMinutes(timeoutWaitingPeers); + this.timeoutWaitingRequest = Duration.ofSeconds(timeoutWaitingRequest); + this.expirationTimePeerStatus = Duration.ofMinutes(expirationTimePeerStatus); } - /** - * @return Timeout in minutes to start finding the connection point when we have at least one peer. - */ - public final int getTimeoutWaitingPeers() { + public final int getExpectedPeers() { + return expectedPeers; + } + + public final Duration getTimeoutWaitingPeers() { return timeoutWaitingPeers; } + + public final Duration getTimeoutWaitingRequest() { + return timeoutWaitingRequest; + } + + public final Duration getExpirationTimePeerStatus() { + return expirationTimePeerStatus; + } } diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java new file mode 100644 index 00000000000..bf06347e869 --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncEventsHandler.java @@ -0,0 +1,6 @@ +package co.rsk.net.sync; + +public interface SyncEventsHandler { + void canStartSyncing(); + void stopSyncing(); +} diff --git a/rskj-core/src/main/java/co/rsk/net/SyncPeerStatus.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncPeerProcessor.java similarity index 65% rename from rskj-core/src/main/java/co/rsk/net/SyncPeerStatus.java rename to rskj-core/src/main/java/co/rsk/net/sync/SyncPeerProcessor.java index fec7221dd25..6ae0908fab2 100644 --- a/rskj-core/src/main/java/co/rsk/net/SyncPeerStatus.java +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncPeerProcessor.java @@ -1,166 +1,108 @@ -package co.rsk.net; - -import co.rsk.net.messages.MessageType; -import com.google.common.annotations.VisibleForTesting; -import org.ethereum.core.BlockIdentifier; - -import javax.annotation.Nonnull; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -/** - * Created by ajlopez on 29/08/2017. - */ -public class SyncPeerStatus { - // Peer status - private Status status; - - // Is syncinc - private boolean syncing; - - // Status used to find connection point - private long findingHeight; - private long findingInterval; - private boolean finding; - - // Connection point found or not - private Optional connectionPoint = Optional.empty(); - - // Block identifiers retrieved in skeleton - private Optional> skeleton = Optional.empty(); - private Optional lastRequestedLinkIndex; - - // Expected response - private Map expectedResponses = new HashMap<>(); - - private final Clock clock = Clock.systemUTC(); - private Instant lastActivity; - - public SyncPeerStatus() { - this.updateActivity(); - } - - private void updateActivity() { - this.lastActivity = clock.instant(); - } - - public void setStatus(Status status) { - this.updateActivity(); - this.status = status; - } - - public Status getStatus() { - return this.status; - } - - public boolean isSyncing() { return this.syncing; } - - public void startSyncing() { this.syncing = true; } - - public void stopSyncing() { this.syncing = false; } - - public void startFindConnectionPoint(long height) { - this.updateActivity(); - this.startSyncing(); - this.findingInterval = height / 2; - this.findingHeight = height - this.findingInterval; - this.connectionPoint = Optional.empty(); - this.finding = true; - } - - public void setConnectionPoint(long height) { - this.connectionPoint = Optional.of(height); - this.finding = false; - } - - public boolean isFindingConnectionPoint() { return this.finding; } - - public Optional getConnectionPoint() { - return this.connectionPoint; - } - - public long getFindingHeight() { return this.findingHeight; } - - public void updateFound() { - if (this.findingInterval == -1) { - this.setConnectionPoint(this.findingHeight); - return; - } - - this.findingInterval = Math.abs(this.findingInterval / 2); - - if (this.findingInterval <= 1) - this.findingInterval = 2; - - this.findingHeight += this.findingInterval; - } - - public void updateNotFound() { - if (this.findingInterval == 1) { - this.setConnectionPoint(this.findingHeight - 1); - return; - } - - this.findingInterval = -Math.abs(this.findingInterval / 2); - - if (this.findingInterval == 0) - this.findingInterval = -1; - - this.findingHeight += this.findingInterval; - } - - public boolean hasSkeleton() { - return this.skeleton.isPresent(); - } - - @Nonnull - public List getSkeleton() { - return this.skeleton.orElseThrow(IllegalStateException::new); - } - - public void setSkeleton(@Nonnull List skeleton) { - this.skeleton = Optional.of(skeleton); - this.lastRequestedLinkIndex = Optional.empty(); - } - - public Optional getLastRequestedLinkIndex() { return this.lastRequestedLinkIndex; } - - public void setLastRequestedLinkIndex(int index) { this.lastRequestedLinkIndex = Optional.of(index); } - - public void registerExpectedResponse(long responseId, MessageType type) { - this.expectedResponses.put(responseId, type); - } - - public boolean isExpectedResponse(long responseId, MessageType type) { - this.updateActivity(); - - if (!this.expectedResponses.containsKey(responseId) || this.expectedResponses.get(responseId) != type) - return false; - - this.expectedResponses.remove(responseId); - - return true; - } - - @VisibleForTesting - public Map getExpectedResponses() { - return this.expectedResponses; - } - - /** - * It returns true or false depending on the comparison of last activity time - * plus timeout and current time - * - * @param timeout time in milliseconds - * @return true if the time since last activity plus timeout is less than current time in milliseconds - */ - public boolean isExpired(Duration timeout) { - return clock.instant().isAfter(this.lastActivity.plus(timeout)); - } -} - +package co.rsk.net.sync; + +import co.rsk.net.messages.MessageType; +import com.google.common.annotations.VisibleForTesting; +import org.ethereum.core.BlockIdentifier; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class SyncPeerProcessor { + + // Status used to find connection point + private long findingHeight; + private long findingInterval; + + // Connection point found or not + private Optional connectionPoint = Optional.empty(); + + // Block identifiers retrieved in skeleton + private Optional> skeleton = Optional.empty(); + private Optional lastRequestedLinkIndex; + + // Expected response + private Map expectedResponses = new HashMap<>(); + + public void startFindConnectionPoint(long height) { + this.findingInterval = height / 2; + this.findingHeight = height - this.findingInterval; + this.connectionPoint = Optional.empty(); + } + + public void setConnectionPoint(long height) { + this.connectionPoint = Optional.of(height); + } + + public Optional getConnectionPoint() { + return this.connectionPoint; + } + + public long getFindingHeight() { return this.findingHeight; } + + public void updateFound() { + if (this.findingInterval == -1) { + this.setConnectionPoint(this.findingHeight); + return; + } + + this.findingInterval = Math.abs(this.findingInterval / 2); + + if (this.findingInterval <= 1) + this.findingInterval = 2; + + this.findingHeight += this.findingInterval; + } + + public void updateNotFound() { + if (this.findingInterval == 1) { + this.setConnectionPoint(this.findingHeight - 1); + return; + } + + this.findingInterval = -Math.abs(this.findingInterval / 2); + + if (this.findingInterval == 0) + this.findingInterval = -1; + + this.findingHeight += this.findingInterval; + } + + public boolean hasSkeleton() { + return this.skeleton.isPresent(); + } + + @Nonnull + public List getSkeleton() { + return this.skeleton.orElseThrow(IllegalStateException::new); + } + + public void setSkeleton(@Nonnull List skeleton) { + this.skeleton = Optional.of(skeleton); + this.lastRequestedLinkIndex = Optional.empty(); + } + + public Optional getLastRequestedLinkIndex() { return this.lastRequestedLinkIndex; } + + public void setLastRequestedLinkIndex(int index) { this.lastRequestedLinkIndex = Optional.of(index); } + + public void registerExpectedResponse(long responseId, MessageType type) { + this.expectedResponses.put(responseId, type); + } + + public boolean isExpectedResponse(long responseId, MessageType type) { + if (!this.expectedResponses.containsKey(responseId) || this.expectedResponses.get(responseId) != type) + return false; + + this.expectedResponses.remove(responseId); + + return true; + } + + @VisibleForTesting + public Map getExpectedResponses() { + return this.expectedResponses; + } +} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncPeerStatus.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncPeerStatus.java new file mode 100644 index 00000000000..ec2465d19b7 --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncPeerStatus.java @@ -0,0 +1,62 @@ +package co.rsk.net.sync; + +import co.rsk.net.MessageChannel; +import co.rsk.net.Status; +import co.rsk.net.messages.MessageType; +import com.google.common.annotations.VisibleForTesting; +import org.ethereum.core.BlockIdentifier; + +import javax.annotation.Nonnull; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Created by ajlopez on 29/08/2017. + */ +public class SyncPeerStatus { + // Peer status + private Status status; + private MessageChannel messageChannel; + + private final Clock clock = Clock.systemUTC(); + private Instant lastActivity; + + public SyncPeerStatus(MessageChannel messageChannel) { + this.messageChannel = messageChannel; + this.updateActivity(); + } + + private void updateActivity() { + this.lastActivity = clock.instant(); + } + + public void setStatus(Status status) { + this.status = status; + this.updateActivity(); + } + + public Status getStatus() { + return this.status; + } + + public MessageChannel getMessageChannel() { + return messageChannel; + } + + /** + * It returns true or false depending on the comparison of last activity time + * plus timeout and current time + * + * @param timeout time in milliseconds + * @return true if the time since last activity plus timeout is less than current time in milliseconds + */ + public boolean isExpired(Duration timeout) { + return clock.instant().isAfter(this.lastActivity.plus(timeout)); + } +} + diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java new file mode 100644 index 00000000000..95b579646cc --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncState.java @@ -0,0 +1,22 @@ +package co.rsk.net.sync; + +import javax.annotation.Nonnull; +import java.time.Duration; + +public interface SyncState { + @Nonnull + SyncStatesIds getId(); + + /** + * should only be called when a new peer arrives + */ + default void newPeerStatus() {} + + default void tick(Duration duration) {} + + default void messageSent() {} + + default boolean isSyncing(){ + return false; + } +} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatesIds.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatesIds.java new file mode 100644 index 00000000000..abe5f07641e --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatesIds.java @@ -0,0 +1,6 @@ +package co.rsk.net.sync; + +public enum SyncStatesIds { + DECIDING, + SYNC_WITH_PEER, +} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatus.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatus.java deleted file mode 100644 index c67b72deccc..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatus.java +++ /dev/null @@ -1,19 +0,0 @@ -package co.rsk.net.sync; - -import co.rsk.net.NodeID; -import co.rsk.net.Status; - -import javax.annotation.Nonnull; -import java.time.Duration; -import java.util.Set; - -public interface SyncStatus { - @Nonnull - SyncStatusIds getId(); - - default void newPeerStatus(SyncStatusSetter statusSetter, NodeID peerID, Status status, Set finishedWaitingForPeersCallbacks) { - } - - default void tick(SyncStatusSetter statusSetter, Duration duration, Set finishedWaitingForPeersCallbacks) { - } -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusHandler.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusHandler.java deleted file mode 100644 index 36215cc1c20..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusHandler.java +++ /dev/null @@ -1,54 +0,0 @@ -package co.rsk.net.sync; - -import co.rsk.net.NodeID; -import co.rsk.net.Status; - -import java.time.Duration; -import java.util.HashSet; -import java.util.Set; - -/** - * This only runs inside a SyncProcessor instance. - * Messages are synchronized by NodeMessageHandler, therefore this code effectively runs in one thread and we don't need syncing. - */ -public class SyncStatusHandler implements SyncStatusSetter { - private final PeersInformation peerStatuses; - private final SyncConfiguration syncConfiguration; - private Set finishedWaitingForPeersCallbacks = new HashSet<>(); - private SyncStatus status; - - public SyncStatusHandler(PeersInformation peerStatuses, SyncConfiguration syncConfiguration) { - this.peerStatuses = peerStatuses; - this.syncConfiguration = syncConfiguration; - status = new DecidingSyncStatus(this.syncConfiguration); - } - - public SyncStatusIds getStatus() { - return status.getId(); - } - - public void newPeerStatus(NodeID peerID, Status status) { - this.peerStatuses.getOrRegisterPeer(peerID).setStatus(status); - this.status.newPeerStatus(this, peerID, status, finishedWaitingForPeersCallbacks); - } - - public void tick(Duration duration) { - status.tick(this, duration, finishedWaitingForPeersCallbacks); - } - - /** - * This event is raised when we transition from Waiting For Peers to Finding Connection Point. - * @param callback a callback to be executed. - */ - public void onFinishedWaitingForPeers(Runnable callback) { - this.finishedWaitingForPeersCallbacks.add(callback); - } - - public void finishedDownloadingBlocks() { - setStatus(new DecidingSyncStatus(this.syncConfiguration)); - } - - public void setStatus(SyncStatus status) { - this.status = status; - } -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusIds.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusIds.java deleted file mode 100644 index 0e1500a44a7..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusIds.java +++ /dev/null @@ -1,6 +0,0 @@ -package co.rsk.net.sync; - -public enum SyncStatusIds { - DECIDING, - FINDING_CONNECTION_POINT, -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusSetter.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusSetter.java deleted file mode 100644 index ff90a5730d2..00000000000 --- a/rskj-core/src/main/java/co/rsk/net/sync/SyncStatusSetter.java +++ /dev/null @@ -1,5 +0,0 @@ -package co.rsk.net.sync; - -public interface SyncStatusSetter { - void setStatus(SyncStatus status); -} diff --git a/rskj-core/src/main/java/co/rsk/net/sync/SyncingWithPeerSyncState.java b/rskj-core/src/main/java/co/rsk/net/sync/SyncingWithPeerSyncState.java new file mode 100644 index 00000000000..08827ea1295 --- /dev/null +++ b/rskj-core/src/main/java/co/rsk/net/sync/SyncingWithPeerSyncState.java @@ -0,0 +1,44 @@ +package co.rsk.net.sync; + +import javax.annotation.Nonnull; +import java.time.Duration; + +public class SyncingWithPeerSyncState implements SyncState { + private Duration timeElapsed; + private SyncConfiguration syncConfiguration; + private SyncEventsHandler syncEventsHandler; + + public SyncingWithPeerSyncState(SyncConfiguration syncConfiguration, SyncEventsHandler syncEventsHandler) { + this.syncConfiguration = syncConfiguration; + this.syncEventsHandler = syncEventsHandler; + this.resetTimeElapsed(); + } + + private void resetTimeElapsed() { + timeElapsed = Duration.ZERO; + } + + @Nonnull + @Override + public SyncStatesIds getId() { + return SyncStatesIds.SYNC_WITH_PEER; + } + + @Override + public void tick(Duration duration) { + timeElapsed = timeElapsed.plus(duration); + if (timeElapsed.compareTo(syncConfiguration.getTimeoutWaitingRequest()) >= 0) { + syncEventsHandler.stopSyncing(); + } + } + + @Override + public void messageSent(){ + this.resetTimeElapsed(); + } + + @Override + public boolean isSyncing(){ + return true; + } +} diff --git a/rskj-core/src/test/java/co/rsk/net/NodeMessageHandlerTest.java b/rskj-core/src/test/java/co/rsk/net/NodeMessageHandlerTest.java index 60b805933db..5405f5a7a3c 100644 --- a/rskj-core/src/test/java/co/rsk/net/NodeMessageHandlerTest.java +++ b/rskj-core/src/test/java/co/rsk/net/NodeMessageHandlerTest.java @@ -261,7 +261,6 @@ public void processStatusMessageUsingSyncProcessor() throws UnknownHostException Assert.assertNotNull(sender.getGetBlockMessages()); Assert.assertTrue(sender.getGetBlockMessages().isEmpty()); Assert.assertNotNull(sender.getMessages()); - Assert.assertFalse(sender.getMessages().isEmpty()); Assert.assertEquals(1, sender.getMessages().size()); Message request = sender.getMessages().get(0); diff --git a/rskj-core/src/test/java/co/rsk/net/SyncPeerStatusTest.java b/rskj-core/src/test/java/co/rsk/net/SyncPeerStatusTest.java index 910ede58a38..9076faffb15 100644 --- a/rskj-core/src/test/java/co/rsk/net/SyncPeerStatusTest.java +++ b/rskj-core/src/test/java/co/rsk/net/SyncPeerStatusTest.java @@ -1,5 +1,6 @@ package co.rsk.net; +import co.rsk.net.sync.SyncPeerStatus; import org.junit.Assert; import org.junit.Test; @@ -12,14 +13,14 @@ public class SyncPeerStatusTest { @Test public void justCreatedIsNotExpired() { - SyncPeerStatus status = new SyncPeerStatus(); + SyncPeerStatus status = new SyncPeerStatus(null); Assert.assertFalse(status.isExpired(Duration.ofMillis(1000))); } @Test public void isExpiredAfterTimeout() throws InterruptedException { - SyncPeerStatus status = new SyncPeerStatus(); + SyncPeerStatus status = new SyncPeerStatus(null); TimeUnit.MILLISECONDS.sleep(1000); @@ -28,7 +29,7 @@ public void isExpiredAfterTimeout() throws InterruptedException { @Test public void isNotExpiredAfterShortTimeout() throws InterruptedException { - SyncPeerStatus status = new SyncPeerStatus(); + SyncPeerStatus status = new SyncPeerStatus(null); TimeUnit.MILLISECONDS.sleep(100); diff --git a/rskj-core/src/test/java/co/rsk/net/SyncProcessorTest.java b/rskj-core/src/test/java/co/rsk/net/SyncProcessorTest.java index c0fe45c03fc..534afdbdcfa 100644 --- a/rskj-core/src/test/java/co/rsk/net/SyncProcessorTest.java +++ b/rskj-core/src/test/java/co/rsk/net/SyncProcessorTest.java @@ -5,6 +5,9 @@ import co.rsk.net.messages.*; import co.rsk.net.simples.SimpleMessageChannel; import co.rsk.net.sync.SyncConfiguration; +import co.rsk.net.sync.SyncPeerProcessor; +import co.rsk.net.sync.SyncPeerStatus; +import co.rsk.net.utils.StatusUtils; import co.rsk.test.builders.BlockChainBuilder; import org.ethereum.core.*; import org.ethereum.crypto.ECKey; @@ -18,7 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * Created by ajlopez on 29/08/2017. @@ -156,9 +158,9 @@ public void syncWithAdvancedStatusAnd5Peers() { List senders = new ArrayList(); - int lessPeers = SyncConfiguration.DEFAULT.getMinimumPeers() - 1; + int lessPeers = SyncConfiguration.DEFAULT.getExpectedPeers() - 1; for (int i = 0; i < lessPeers; i++) { - senders.add(new SimpleMessageChannel(HashUtil.randomPeerId())); + senders.add(new SimpleMessageChannel()); } senders.forEach(s -> Assert.assertTrue(s.getMessages().isEmpty())); @@ -168,21 +170,28 @@ public void syncWithAdvancedStatusAnd5Peers() { Assert.assertEquals(lessPeers, processor.getNoAdvancedPeers()); Set ids = processor.getKnownPeersNodeIDs(); - Assert.assertTrue(ids.containsAll(senders.stream(). - map(s -> (s.getPeerNodeID())).collect(Collectors.toList()))); + senders.stream() + .map(SimpleMessageChannel::getPeerNodeID) + .forEach(peerId -> Assert.assertTrue(ids.contains(peerId))); - SimpleMessageChannel lastSender = new SimpleMessageChannel(HashUtil.randomPeerId()); + SimpleMessageChannel lastSender = new SimpleMessageChannel(); Assert.assertFalse(ids.contains(lastSender.getPeerNodeID())); processor.processStatus(lastSender, status); + // now test with all senders + senders.add(lastSender); Assert.assertTrue(ids.contains(lastSender.getPeerNodeID())); Assert.assertFalse(senders.stream().allMatch(s -> s.getMessages().isEmpty())); - Assert.assertEquals(1, senders.stream().mapToInt(s->s.getMessages().size()).sum()); + Assert.assertEquals(1, senders.stream() + .map(SimpleMessageChannel::getMessages) + .mapToInt(List::size) + .sum()); - Message message = senders.stream().filter(s -> !s.getMessages().isEmpty()). - collect(Collectors.toList()). - get(0).getMessages().get(0); + Message message = senders.stream().filter(s -> !s.getMessages().isEmpty()) + .findFirst() + .map(SimpleMessageChannel::getMessages) + .get().get(0); Assert.assertEquals(MessageType.BLOCK_HASH_REQUEST_MESSAGE, message.getMessageType()); @@ -211,8 +220,8 @@ public void processStatusWithPeerWithSameDifficulty() { Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); - Assert.assertFalse(processor.getPeerStatus(sender.getPeerNodeID()).isSyncing()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); + Assert.assertFalse(processor.isPeerSyncing(sender.getPeerNodeID())); } @Test @@ -238,7 +247,7 @@ public void sendSkeletonRequest() { Assert.assertNotEquals(0, request.getId()); Assert.assertEquals(0, request.getStartNumber()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -263,65 +272,17 @@ public void sendBlockHashRequest() { Assert.assertNotEquals(0, request.getId()); Assert.assertEquals(100, request.getHeight()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); - Assert.assertFalse(processor.getPeerStatus(sender.getPeerNodeID()).isSyncing()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); + Assert.assertFalse(processor.isPeerSyncing(sender.getPeerNodeID())); } - @Test - public void findConnectionPointSendingFirstMessage() { - Blockchain blockchain = BlockChainBuilder.ofSize(0); - SimpleMessageChannel sender = new SimpleMessageChannel(new byte[] { 0x01 }); - - SyncProcessor processor = new SyncProcessor(blockchain, null, SyncConfiguration.IMMEDIATE_FOR_TESTING); - - processor.findConnectionPointOf(sender, new Status(100, null)); - - Assert.assertFalse(sender.getMessages().isEmpty()); - Assert.assertEquals(1, sender.getMessages().size()); - - Message message = sender.getMessages().get(0); - - Assert.assertNotNull(message); - Assert.assertEquals(MessageType.BLOCK_HASH_REQUEST_MESSAGE, message.getMessageType()); - - BlockHashRequestMessage request = (BlockHashRequestMessage)message; - - Assert.assertNotEquals(0, request.getId()); - Assert.assertEquals(50, request.getHeight()); - - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).isSyncing()); - } - - @Test + @Test(expected = Exception.class) public void processBlockHashResponseWithUnknownHash() { Blockchain blockchain = BlockChainBuilder.ofSize(0); SimpleMessageChannel sender = new SimpleMessageChannel(new byte[] { 0x01 }); SyncProcessor processor = new SyncProcessor(blockchain, null, SyncConfiguration.IMMEDIATE_FOR_TESTING); - - processor.findConnectionPointOf(sender, new Status(100, null)); - - long requestId = ((BlockHashRequestMessage)sender.getMessages().get(0)).getId(); - - BlockHashResponseMessage response = new BlockHashResponseMessage(requestId, HashUtil.randomHash()); - - processor.processBlockHashResponse(sender, response); - - Assert.assertEquals(2, sender.getMessages().size()); - - Message message2 = sender.getMessages().get(1); - - Assert.assertNotNull(message2); - Assert.assertEquals(MessageType.BLOCK_HASH_REQUEST_MESSAGE, message2.getMessageType()); - - BlockHashRequestMessage request = (BlockHashRequestMessage)message2; - - Assert.assertNotEquals(0, request.getId()); - Assert.assertEquals(25, request.getHeight()); - - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).isSyncing()); + processor.processStatus(sender, new Status(100, null)); } @Test @@ -334,11 +295,12 @@ public void processBlockHeadersResponseWithEmptyList() { List headers = new ArrayList<>(); BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(98, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(98, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(98, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -353,11 +315,12 @@ public void processBlockHeadersResponseRejectsNonSolicitedMessages() { headers.add(block.getHeader()); BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(100, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(100, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(100, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -373,7 +336,8 @@ public void processBlockHeadersResponseWithOneHeader() { headers.add(block.getHeader()); BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(99, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertEquals(1, sender.getMessages().size()); @@ -387,7 +351,7 @@ public void processBlockHeadersResponseWithOneHeader() { Assert.assertNotEquals(0, request.getId()); Assert.assertArrayEquals(block.getHash(), request.getBlockHash()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -407,7 +371,8 @@ public void processBlockHeadersResponseWithManyHeaders() { BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(99, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertEquals(10, sender.getMessages().size()); @@ -421,7 +386,7 @@ public void processBlockHeadersResponseWithManyHeaders() { Assert.assertArrayEquals(otherBlockchain.getBlockByNumber(k + 1).getHash(), request.getBlockHash()); } - Assert.assertEquals(10, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(10, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -439,10 +404,11 @@ public void processBlockHeadersResponseWithManyHeadersMissingFirstParent() { BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(99, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(99, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertEquals(0, sender.getMessages().size()); - Assert.assertEquals(0, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(0, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -457,10 +423,11 @@ public void processBlockHeadersResponseWithOneExistingHeader() { headers.add(block.getHeader()); BlockHeadersResponseMessage response = new BlockHeadersResponseMessage(97, headers); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(97, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(97, MessageType.BLOCK_HEADERS_RESPONSE_MESSAGE); processor.processBlockHeadersResponse(sender, response); Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -472,10 +439,11 @@ public void processBodyResponseRejectsNonSolicitedMessages() { BodyResponseMessage response = new BodyResponseMessage(100, null, null); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(100, MessageType.BODY_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(100, MessageType.BODY_RESPONSE_MESSAGE); processor.processBodyResponse(sender, response); Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -498,7 +466,8 @@ public void processBodyResponseAddsToBlockchain() { List uncles = blockchain.getBestBlock().getUncleList(); BodyResponseMessage response = new BodyResponseMessage(96, transactions, uncles); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); processor.expectBodyResponseFor(96, sender.getPeerNodeID(), block.getHeader()); processor.processBodyResponse(sender, response); @@ -506,7 +475,7 @@ public void processBodyResponseAddsToBlockchain() { Assert.assertEquals(11, blockchain.getBestBlock().getNumber()); Assert.assertArrayEquals(block.getHash(), blockchain.getBestBlockHash()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -529,13 +498,14 @@ public void processBodyResponseAddsToBlockchainAndRequestHeaders() { List uncles = blockchain.getBestBlock().getUncleList(); BodyResponseMessage response = new BodyResponseMessage(96, transactions, uncles); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); processor.expectBodyResponseFor(96, sender.getPeerNodeID(), block.getHeader()); List blockIdentifiers = new ArrayList<>(); blockIdentifiers.add(new BlockIdentifier(block.getHash(), block.getNumber())); blockIdentifiers.add(new BlockIdentifier(HashUtil.randomHash(), block.getNumber() + 192)); - processor.getPeerStatus(sender.getPeerNodeID()).setSkeleton(blockIdentifiers); + processor.getSyncPeerProcessor().setSkeleton(blockIdentifiers); processor.processBodyResponse(sender, response); @@ -554,7 +524,7 @@ public void processBodyResponseAddsToBlockchainAndRequestHeaders() { Assert.assertEquals(11, blockchain.getBestBlock().getNumber()); Assert.assertArrayEquals(block.getHash(), blockchain.getBestBlockHash()); - Assert.assertFalse(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertFalse(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -600,7 +570,8 @@ public void processBodyResponseWithTransactionAddsToBlockchain() { List uncles = block.getUncleList(); BodyResponseMessage response = new BodyResponseMessage(96, transactions, uncles); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(96, MessageType.BODY_RESPONSE_MESSAGE); processor.expectBodyResponseFor(96, sender.getPeerNodeID(), block.getHeader()); processor.processBodyResponse(sender, response); @@ -608,7 +579,7 @@ public void processBodyResponseWithTransactionAddsToBlockchain() { Assert.assertEquals(1, blockchain.getBestBlock().getNumber()); Assert.assertArrayEquals(block.getHash(), blockchain.getBestBlockHash()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -629,12 +600,13 @@ public void processBlockResponseAddsToBlockchain() { SyncProcessor processor = new SyncProcessor(blockchain, blockSyncService, SyncConfiguration.IMMEDIATE_FOR_TESTING); BlockResponseMessage response = new BlockResponseMessage(96, block); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(96, MessageType.BLOCK_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(96, MessageType.BLOCK_RESPONSE_MESSAGE); processor.processBlockResponse(sender, response); Assert.assertEquals(11, blockchain.getBestBlock().getNumber()); Assert.assertArrayEquals(block.getHash(), blockchain.getBestBlockHash()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -646,7 +618,7 @@ public void findConnectionPointBlockchainWithGenesisVsBlockchainWith100Blocks() SyncProcessor processor = new SyncProcessor(blockchain, null, SyncConfiguration.IMMEDIATE_FOR_TESTING); - processor.findConnectionPointOf(sender, new Status(100, null)); + processor.processStatus(sender, StatusUtils.fromBlockchain(advancedBlockchain)); long []expectedHeights = new long[] { 50, 25, 13, 7, 4, 3, 2, 1, 0 }; @@ -672,7 +644,7 @@ public void findConnectionPointBlockchainWithGenesisVsBlockchainWith100Blocks() SkeletonRequestMessage request = (SkeletonRequestMessage)message; Assert.assertEquals(0, request.getStartNumber()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -684,7 +656,8 @@ public void findConnectionPointBlockchainWith30BlocksVsBlockchainWith100Blocks() SyncProcessor processor = new SyncProcessor(blockchain, null, SyncConfiguration.IMMEDIATE_FOR_TESTING); - processor.findConnectionPointOf(sender, new Status(100, null)); + Status status = StatusUtils.fromBlockchain(advancedBlockchain); + processor.processStatus(sender, status); long []expectedHeights = new long[] { 50, 25, 25 + 12, 25 + 12 - 6, 25 + 12 - 6 - 3, 25 + 12 - 6 - 3 + 2, 25 + 12 - 6 - 3 + 2 + 2, 25 + 12 - 6 - 3 + 2 + 2 - 1, 25 + 12 - 6 - 3 + 2 + 2 - 1 - 1 }; @@ -710,7 +683,7 @@ public void findConnectionPointBlockchainWith30BlocksVsBlockchainWith100Blocks() SkeletonRequestMessage request = (SkeletonRequestMessage)message; Assert.assertEquals(30, request.getStartNumber()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -729,8 +702,9 @@ public void processSkeletonResponseWithTenBlockIdentifiers() { for (int k = 0; k < 10; k++) bids.add(new BlockIdentifier(HashUtil.randomHash(), (k + 1) * 10)); - processor.getPeerStatusAndSaveSender(sender).setConnectionPoint(0); - processor.getPeerStatus(sender.getPeerNodeID()).registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().setConnectionPoint(0); + processor.getSyncPeerProcessor().registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); processor.processSkeletonResponse(sender, new SkeletonResponseMessage(1, bids)); Assert.assertFalse(sender.getMessages().isEmpty()); @@ -746,11 +720,12 @@ public void processSkeletonResponseWithTenBlockIdentifiers() { Assert.assertEquals(10, request.getCount()); SyncPeerStatus peerStatus = processor.getPeerStatus(sender.getPeerNodeID()); + SyncPeerProcessor syncPeerProcessor = processor.getSyncPeerProcessor(); Assert.assertNotNull(peerStatus); - Assert.assertTrue(peerStatus.hasSkeleton()); - Assert.assertEquals(10, peerStatus.getSkeleton().size()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertTrue(syncPeerProcessor.hasSkeleton()); + Assert.assertEquals(10, syncPeerProcessor.getSkeleton().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } @Test @@ -763,11 +738,12 @@ public void processSkeletonResponseWithoutBlockIdentifiers() { List bids = new ArrayList<>(); - processor.getPeerStatusAndSaveSender(sender).registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); processor.processSkeletonResponse(sender, new SkeletonResponseMessage(1, bids)); Assert.assertTrue(sender.getMessages().isEmpty()); - Assert.assertTrue(processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().isEmpty()); + Assert.assertTrue(processor.getSyncPeerProcessor().getExpectedResponses().isEmpty()); } @Test @@ -780,7 +756,8 @@ public void processSkeletonResponseWithConnectionPoint() { SimpleMessageChannel sender = new SimpleMessageChannel(new byte[] { 0x01 }); SyncProcessor processor = new SyncProcessor(blockchain, blockSyncService, SyncConfiguration.IMMEDIATE_FOR_TESTING); - processor.getPeerStatusAndSaveSender(sender).setConnectionPoint(25); + processor.getPeerStatusAndSaveSender(sender); + processor.getSyncPeerProcessor().setConnectionPoint(25); List bids = new ArrayList<>(); @@ -798,7 +775,7 @@ public void processSkeletonResponseWithConnectionPoint() { bids.add(new BlockIdentifier(hash, number)); } - processor.getPeerStatus(sender.getPeerNodeID()).registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); + processor.getSyncPeerProcessor().registerExpectedResponse(1, MessageType.SKELETON_RESPONSE_MESSAGE); processor.processSkeletonResponse(sender, new SkeletonResponseMessage(1, bids)); Assert.assertFalse(sender.getMessages().isEmpty()); @@ -812,7 +789,7 @@ public void processSkeletonResponseWithConnectionPoint() { Assert.assertEquals(5, request.getCount()); Assert.assertArrayEquals(bids.get(2).getHash(), request.getHash()); - Assert.assertEquals(1, processor.getPeerStatus(sender.getPeerNodeID()).getExpectedResponses().size()); + Assert.assertEquals(1, processor.getSyncPeerProcessor().getExpectedResponses().size()); } private static Transaction createTransaction(Account sender, Account receiver, BigInteger value, BigInteger nonce) { diff --git a/rskj-core/src/test/java/co/rsk/net/ThreeAsyncNodeUsingSyncProcessorTest.java b/rskj-core/src/test/java/co/rsk/net/ThreeAsyncNodeUsingSyncProcessorTest.java index c1f5b7ff4ab..1ec718762e1 100644 --- a/rskj-core/src/test/java/co/rsk/net/ThreeAsyncNodeUsingSyncProcessorTest.java +++ b/rskj-core/src/test/java/co/rsk/net/ThreeAsyncNodeUsingSyncProcessorTest.java @@ -70,12 +70,12 @@ public void synchronizeNewNodesInAChain() throws InterruptedException { node2.joinWithTimeout(); node3.joinWithTimeout(); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node2.getNodeID())); } @Test @@ -122,12 +122,12 @@ public void synchronizeNewNodeWithBestChain() throws InterruptedException { node2.joinWithTimeout(); node3.joinWithTimeout(); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node2.getNodeID())); } @Test @@ -177,12 +177,12 @@ public void synchronizeNewNodeWithTwoPeers() throws InterruptedException { node2.joinWithTimeout(); node3.joinWithTimeout(); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node2.getNodeID())); } @Test @@ -225,12 +225,12 @@ public void dontSynchronizeNodeWithShorterChain() throws InterruptedException { node2.joinWithTimeout(); node3.joinWithTimeout(); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node2.getNodeID())); } @Test @@ -277,11 +277,11 @@ public void dontSynchronizeNodeWithShorterChainAndThenSynchronizeWithNewPeer() t node2.joinWithTimeout(); node3.joinWithTimeout(); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node3.getNodeID())); - Assert.assertFalse(node3.getSyncProcessor().peerIsSyncing(node1.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node3.getNodeID())); + Assert.assertFalse(node3.getSyncProcessor().isPeerSyncing(node1.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node2.getNodeID())); } } diff --git a/rskj-core/src/test/java/co/rsk/net/TwoAsyncNodeUsingSyncProcessorTest.java b/rskj-core/src/test/java/co/rsk/net/TwoAsyncNodeUsingSyncProcessorTest.java index 3b9f31c5b8b..b857228e47b 100644 --- a/rskj-core/src/test/java/co/rsk/net/TwoAsyncNodeUsingSyncProcessorTest.java +++ b/rskj-core/src/test/java/co/rsk/net/TwoAsyncNodeUsingSyncProcessorTest.java @@ -61,8 +61,8 @@ public void buildBlockchainAndSynchronize() throws InterruptedException { Assert.assertTrue(node1.getExpectedResponses().isEmpty()); Assert.assertTrue(node2.getExpectedResponses().isEmpty()); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); } @Test @@ -86,8 +86,8 @@ public void buildBlockchainAndSynchronize400Blocks() throws InterruptedException Assert.assertTrue(node1.getExpectedResponses().isEmpty()); Assert.assertTrue(node2.getExpectedResponses().isEmpty()); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); } @Test @@ -113,8 +113,8 @@ public void buildBlockchainWithUnclesAndSynchronize() throws InterruptedExceptio Assert.assertTrue(node1.getExpectedResponses().isEmpty()); Assert.assertTrue(node2.getExpectedResponses().isEmpty()); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); } @Test @@ -160,8 +160,8 @@ public void buildBlockchainPartialAndSynchronize() throws InterruptedException { Assert.assertTrue(node1.getExpectedResponses().isEmpty()); Assert.assertTrue(node2.getExpectedResponses().isEmpty()); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); } @Test @@ -185,7 +185,7 @@ public void sendNewBlock() throws InterruptedException { Assert.assertEquals(1, node2.getBestBlock().getNumber()); Assert.assertArrayEquals(node1.getBestBlock().getHash(), node2.getBestBlock().getHash()); - Assert.assertFalse(node1.getSyncProcessor().peerIsSyncing(node2.getNodeID())); - Assert.assertFalse(node2.getSyncProcessor().peerIsSyncing(node1.getNodeID())); + Assert.assertFalse(node1.getSyncProcessor().isPeerSyncing(node2.getNodeID())); + Assert.assertFalse(node2.getSyncProcessor().isPeerSyncing(node1.getNodeID())); } } diff --git a/rskj-core/src/test/java/co/rsk/net/simples/SimpleMessageChannel.java b/rskj-core/src/test/java/co/rsk/net/simples/SimpleMessageChannel.java index 7c4affbfbae..6b1a01c336f 100644 --- a/rskj-core/src/test/java/co/rsk/net/simples/SimpleMessageChannel.java +++ b/rskj-core/src/test/java/co/rsk/net/simples/SimpleMessageChannel.java @@ -24,6 +24,7 @@ import co.rsk.net.messages.Message; import co.rsk.net.messages.MessageType; import org.ethereum.db.ByteArrayWrapper; +import org.junit.Assert; import java.net.InetAddress; import java.net.UnknownHostException; @@ -41,14 +42,18 @@ public class SimpleMessageChannel implements MessageChannel { private NodeID nodeID; private InetAddress address; - public SimpleMessageChannel() throws UnknownHostException { + public SimpleMessageChannel() { byte[] bytes = new byte[32]; random.nextBytes(bytes); this.nodeID = new NodeID(bytes); - byte[] addressBytes = new byte[4]; - random.nextBytes(bytes); - this.address = InetAddress.getByAddress(addressBytes); + try { + byte[] addressBytes = new byte[4]; + random.nextBytes(bytes); + this.address = InetAddress.getByAddress(addressBytes); + } catch (UnknownHostException e) { + Assert.fail("SimpleMessageChannel creation failed"); + } } public SimpleMessageChannel(byte[] nodeID) { diff --git a/rskj-core/src/test/java/co/rsk/net/simples/SimpleStatusHandler.java b/rskj-core/src/test/java/co/rsk/net/simples/SimpleStatusHandler.java deleted file mode 100644 index 96f6cd30312..00000000000 --- a/rskj-core/src/test/java/co/rsk/net/simples/SimpleStatusHandler.java +++ /dev/null @@ -1,18 +0,0 @@ -package co.rsk.net.simples; - -import co.rsk.net.sync.SyncStatus; -import co.rsk.net.sync.SyncStatusSetter; - -public class SimpleStatusHandler implements SyncStatusSetter { - - private SyncStatus status; - - @Override - public void setStatus(SyncStatus status) { - this.status = status; - } - - public SyncStatus getStatus() { - return this.status; - } -} diff --git a/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStateTest.java new file mode 100644 index 00000000000..29be8f089b3 --- /dev/null +++ b/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStateTest.java @@ -0,0 +1,93 @@ +package co.rsk.net.sync; + +import co.rsk.net.NodeMessageHandlerUtil; +import co.rsk.net.SyncProcessor; +import co.rsk.net.simples.SimpleMessageChannel; +import co.rsk.net.simples.SimpleNode; +import co.rsk.test.builders.BlockChainBuilder; +import org.ethereum.core.Blockchain; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class DecidingSyncStateTest { + + @Test + public void switchesToDecidingWith5Peers() { + SyncConfiguration syncConfiguration = SyncConfiguration.DEFAULT; + SimpleSyncEventsHandler syncEventsHandler = new SimpleSyncEventsHandler(); + PeersInformation knownPeers = new PeersInformation(syncConfiguration); + SyncState syncState = new DecidingSyncState(syncConfiguration, syncEventsHandler, knownPeers); + + for (int i = 0; i < 5; i++) { + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + knownPeers.registerPeer(new SimpleMessageChannel()); + syncState.newPeerStatus(); + } + + Assert.assertTrue(syncEventsHandler.canStartSyncingWasCalled()); + } + + @Test + public void switchesToDecidingWith5NonRepeatedPeers() { + SyncConfiguration syncConfiguration = SyncConfiguration.DEFAULT; + SimpleSyncEventsHandler syncEventsHandler = new SimpleSyncEventsHandler(); + PeersInformation knownPeers = new PeersInformation(syncConfiguration); + SyncState syncState = new DecidingSyncState(syncConfiguration, syncEventsHandler, knownPeers); + + SimpleMessageChannel peerToRepeat = new SimpleMessageChannel(); + for (int i = 0; i < 10; i++) { + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + knownPeers.registerPeer(peerToRepeat); + syncState.newPeerStatus(); + } + + for (int i = 0; i < 4; i++) { + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + knownPeers.registerPeer(new SimpleMessageChannel()); + syncState.newPeerStatus(); + } + + Assert.assertTrue(syncEventsHandler.canStartSyncingWasCalled()); + } + + @Test + public void doesntSwitchWithNoPeersAfter2Minutes() { + SyncConfiguration syncConfiguration = SyncConfiguration.DEFAULT; + SimpleSyncEventsHandler syncEventsHandler = new SimpleSyncEventsHandler(); + PeersInformation knownPeers = new PeersInformation(syncConfiguration); + SyncState syncState = new DecidingSyncState(syncConfiguration, syncEventsHandler, knownPeers); + + syncState.tick(Duration.ofMinutes(2)); + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + } + + @Test + public void switchesToDecidingWith1PeerAfter2Minutes() { + SyncConfiguration syncConfiguration = SyncConfiguration.DEFAULT; + SimpleSyncEventsHandler syncEventsHandler = new SimpleSyncEventsHandler(); + PeersInformation knownPeers = new PeersInformation(syncConfiguration); + SyncState syncState = new DecidingSyncState(syncConfiguration, syncEventsHandler, knownPeers); + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + + knownPeers.registerPeer(new SimpleMessageChannel()); + syncState.newPeerStatus(); + syncState.tick(Duration.ofMinutes(2)); + Assert.assertTrue(syncEventsHandler.canStartSyncingWasCalled()); + } + + @Test + public void doesntSwitchToDecidingWith1PeerAfter119Seconds() { + SyncConfiguration syncConfiguration = SyncConfiguration.DEFAULT; + SimpleSyncEventsHandler syncEventsHandler = new SimpleSyncEventsHandler(); + PeersInformation knownPeers = new PeersInformation(syncConfiguration); + SyncState syncState = new DecidingSyncState(syncConfiguration, syncEventsHandler, knownPeers); + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + + knownPeers.registerPeer(new SimpleMessageChannel()); + syncState.newPeerStatus(); + syncState.tick(Duration.ofSeconds(119)); + Assert.assertFalse(syncEventsHandler.canStartSyncingWasCalled()); + } +} diff --git a/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStatusTest.java b/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStatusTest.java deleted file mode 100644 index 7836200f6e1..00000000000 --- a/rskj-core/src/test/java/co/rsk/net/sync/DecidingSyncStatusTest.java +++ /dev/null @@ -1,85 +0,0 @@ -package co.rsk.net.sync; - -import co.rsk.net.simples.SimpleNode; -import co.rsk.net.simples.SimpleStatusHandler; -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; -import java.util.Collections; - -public class DecidingSyncStatusTest { - - @Test - public void switchesToDecidingWith5Peers() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new DecidingSyncStatus(SyncConfiguration.DEFAULT)); - for (int i = 0; i < 5; i++) { - SyncStatus status = statusHandler.getStatus(); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - SimpleNode peer = SimpleNode.createNode(); - status.newPeerStatus( - statusHandler, - peer.getNodeID(), - peer.getFullStatus(), - Collections.emptySet()); - } - - Assert.assertEquals(SyncStatusIds.FINDING_CONNECTION_POINT, statusHandler.getStatus().getId()); - } - - @Test - public void switchesToDecidingWith5NonRepeatedPeers() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new DecidingSyncStatus(SyncConfiguration.DEFAULT)); - SimpleNode peerToRepeat = SimpleNode.createNode(); - for (int i = 0; i < 10; i++) { - SyncStatus status = statusHandler.getStatus(); - Assert.assertEquals(SyncStatusIds.DECIDING, status.getId()); - status.newPeerStatus(statusHandler, peerToRepeat.getNodeID(), peerToRepeat.getFullStatus(), Collections.emptySet()); - } - - for (int i = 0; i < 4; i++) { - SyncStatus status = statusHandler.getStatus(); - Assert.assertEquals(SyncStatusIds.DECIDING, status.getId()); - SimpleNode peer = SimpleNode.createNode(); - status.newPeerStatus(statusHandler, peer.getNodeID(), peer.getFullStatus(), Collections.emptySet()); - } - - Assert.assertEquals(SyncStatusIds.FINDING_CONNECTION_POINT, statusHandler.getStatus().getId()); - } - - @Test - public void doesntSwitchWithNoPeersAfter2Minutes() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new DecidingSyncStatus(SyncConfiguration.DEFAULT)); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - - statusHandler.getStatus().tick(statusHandler, Duration.ofMinutes(2), Collections.emptySet()); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - } - - @Test - public void switchesToDecidingWith1PeerAfter2Minutes() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new DecidingSyncStatus(SyncConfiguration.DEFAULT)); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - - SimpleNode peer = SimpleNode.createNode(); - statusHandler.getStatus().newPeerStatus(statusHandler, peer.getNodeID(), peer.getFullStatus(), Collections.emptySet()); - statusHandler.getStatus().tick(statusHandler, Duration.ofMinutes(2), Collections.emptySet()); - Assert.assertEquals(SyncStatusIds.FINDING_CONNECTION_POINT, statusHandler.getStatus().getId()); - } - - @Test - public void doesntSwitchToDecidingWith1PeerAfter119Seconds() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new DecidingSyncStatus(SyncConfiguration.DEFAULT)); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - - SimpleNode peer = SimpleNode.createNode(); - statusHandler.getStatus().newPeerStatus(statusHandler, peer.getNodeID(), peer.getFullStatus(), Collections.emptySet()); - statusHandler.getStatus().tick(statusHandler, Duration.ofSeconds(119), Collections.emptySet()); - Assert.assertEquals(SyncStatusIds.DECIDING, statusHandler.getStatus().getId()); - } -} diff --git a/rskj-core/src/test/java/co/rsk/net/sync/FindingConnectionPointSyncStatusTest.java b/rskj-core/src/test/java/co/rsk/net/sync/FindingConnectionPointSyncStatusTest.java deleted file mode 100644 index 28463981342..00000000000 --- a/rskj-core/src/test/java/co/rsk/net/sync/FindingConnectionPointSyncStatusTest.java +++ /dev/null @@ -1,17 +0,0 @@ -package co.rsk.net.sync; - -import co.rsk.net.simples.SimpleStatusHandler; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Collections; - -public class FindingConnectionPointSyncStatusTest { - @Test - public void itIgnoresNewPeerInformation() { - SimpleStatusHandler statusHandler = new SimpleStatusHandler(); - statusHandler.setStatus(new FindingConnectionPointSyncStatus()); - statusHandler.getStatus().newPeerStatus(statusHandler, null, null, Collections.emptySet()); - Assert.assertEquals(SyncStatusIds.FINDING_CONNECTION_POINT, statusHandler.getStatus().getId()); - } -} diff --git a/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java b/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java new file mode 100644 index 00000000000..4c15519e43a --- /dev/null +++ b/rskj-core/src/test/java/co/rsk/net/sync/SimpleSyncEventsHandler.java @@ -0,0 +1,20 @@ +package co.rsk.net.sync; + + +public class SimpleSyncEventsHandler implements SyncEventsHandler { + private boolean canStartSyncingWasCalled_; + + @Override + public void canStartSyncing() { + this.canStartSyncingWasCalled_ = true; + } + + @Override + public void stopSyncing() { + + } + + public boolean canStartSyncingWasCalled() { + return canStartSyncingWasCalled_; + } +} \ No newline at end of file diff --git a/rskj-core/src/test/java/co/rsk/net/sync/SyncingWithPeerSyncStateTest.java b/rskj-core/src/test/java/co/rsk/net/sync/SyncingWithPeerSyncStateTest.java new file mode 100644 index 00000000000..cadb684b5c1 --- /dev/null +++ b/rskj-core/src/test/java/co/rsk/net/sync/SyncingWithPeerSyncStateTest.java @@ -0,0 +1,26 @@ +package co.rsk.net.sync; + +import co.rsk.net.SyncProcessor; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class SyncingWithPeerSyncStateTest { + @Test + public void itIgnoresNewPeerInformation() { + SyncProcessor syncProcessor = new SyncProcessor(null,null, SyncConfiguration.IMMEDIATE_FOR_TESTING); + syncProcessor.setSyncState(new SyncingWithPeerSyncState(SyncConfiguration.IMMEDIATE_FOR_TESTING, syncProcessor)); + syncProcessor.getSyncState().newPeerStatus(); + Assert.assertEquals(SyncStatesIds.SYNC_WITH_PEER, syncProcessor.getSyncState().getId()); + } + + @Test + public void itTimeoutsWhenWaitingForRequest() { + SyncProcessor syncProcessor = new SyncProcessor(null,null, SyncConfiguration.IMMEDIATE_FOR_TESTING); + syncProcessor.setSyncState(new SyncingWithPeerSyncState(SyncConfiguration.IMMEDIATE_FOR_TESTING, syncProcessor)); + syncProcessor.getSyncState().tick(Duration.ofMinutes(1)); + Assert.assertEquals(SyncStatesIds.DECIDING, syncProcessor.getSyncState().getId()); + } + +} diff --git a/rskj-core/src/test/java/co/rsk/net/utils/StatusUtils.java b/rskj-core/src/test/java/co/rsk/net/utils/StatusUtils.java new file mode 100644 index 00000000000..7173fd81e17 --- /dev/null +++ b/rskj-core/src/test/java/co/rsk/net/utils/StatusUtils.java @@ -0,0 +1,15 @@ +package co.rsk.net.utils; + +import co.rsk.net.Status; +import org.ethereum.core.Blockchain; + +public class StatusUtils { + public static Status fromBlockchain(Blockchain blockchain) { + return new Status( + blockchain.getBestBlock().getNumber(), + blockchain.getBestBlockHash(), + blockchain.getBestBlock().getParentHash(), + blockchain.getTotalDifficulty() + ); + } +}