Skip to content

Commit

Permalink
reorder some classes and split some parts of SyncProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
lsebrie committed Sep 29, 2017
1 parent 29c7c87 commit 9223e11
Show file tree
Hide file tree
Showing 30 changed files with 714 additions and 697 deletions.
2 changes: 1 addition & 1 deletion COPYING
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ or household purposes, or (2) anything designed or sold for incorporation
into a dwelling. In determining whether a product is a consumer product,
doubtful cases shall be resolved in favor of coverage. For a particular
product received by a particular user, "normally used" refers to a
typical or common use of that class of product, regardless of the status
typical or common use of that class of product, regardless of the syncState
of the particular user or of the way in which the particular user
actually uses, or expects or is expected to use, the product. A product
is a consumer product regardless of whether the product has substantial
Expand Down
173 changes: 91 additions & 82 deletions rskj-core/src/main/java/co/rsk/net/SyncProcessor.java

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package co.rsk.net.sync;

import javax.annotation.Nonnull;
import java.time.Duration;

public class DecidingSyncState implements SyncState {
private Duration timeElapsed = Duration.ZERO;
private PeersInformation knownPeers;
private SyncConfiguration syncConfiguration;
private SyncEventsHandler syncEventsHandler;

public DecidingSyncState(SyncConfiguration syncConfiguration, SyncEventsHandler syncEventsHandler, PeersInformation knownPeers) {
this.syncConfiguration = syncConfiguration;
this.syncEventsHandler = syncEventsHandler;
this.knownPeers = knownPeers;
}

@Nonnull
@Override
public SyncStatesIds getId() {
return SyncStatesIds.DECIDING;
}

@Override
public void newPeerStatus() {
if (knownPeers.count() >= syncConfiguration.getExpectedPeers()) {
syncEventsHandler.canStartSyncing();
}
}

@Override
public void tick(Duration duration) {
timeElapsed = timeElapsed.plus(duration);
if (knownPeers.count() > 0 &&
timeElapsed.compareTo(syncConfiguration.getTimeoutWaitingPeers()) >= 0) {

syncEventsHandler.canStartSyncing();
}
}
}
50 changes: 0 additions & 50 deletions rskj-core/src/main/java/co/rsk/net/sync/DecidingSyncStatus.java

This file was deleted.

This file was deleted.

34 changes: 17 additions & 17 deletions rskj-core/src/main/java/co/rsk/net/sync/PeersInformation.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package co.rsk.net.sync;

import co.rsk.net.MessageChannel;
import co.rsk.net.NodeID;
import co.rsk.net.SyncPeerStatus;

import java.time.Duration;
import java.util.HashMap;
Expand All @@ -16,8 +16,12 @@
* things such as the underlying communication channel.
*/
public class PeersInformation {
private final SyncConfiguration syncCofiguration;
private Map<NodeID, SyncPeerStatus> peerStatuses = new HashMap<>();
private static final Duration peerTimeout = Duration.ofMinutes(10);

public PeersInformation(SyncConfiguration syncConfiguration){
this.syncCofiguration = syncConfiguration;
}

public int count() {
return peerStatuses.size();
Expand All @@ -30,45 +34,41 @@ public int countIf(Predicate<SyncPeerStatus> predicate) {
return Math.toIntExact(count);
}

public SyncPeerStatus getOrRegisterPeer(NodeID nodeID) {
SyncPeerStatus peerStatus = this.peerStatuses.get(nodeID);
public SyncPeerStatus getOrRegisterPeer(MessageChannel messageChannel) {
SyncPeerStatus peerStatus = this.peerStatuses.get(messageChannel.getPeerNodeID());

if (peerStatus != null && !peerStatus.isExpired(peerTimeout))
if (peerStatus != null && !peerStatus.isExpired(syncCofiguration.getExpirationTimePeerStatus()))
return peerStatus;

return this.registerPeer(nodeID);
return this.registerPeer(messageChannel);
}

public SyncPeerStatus getPeer(NodeID nodeID) {
// TODO(mc) check expiration
return this.peerStatuses.get(nodeID);
}

public Optional<NodeID> getBestPeerID() {
public Optional<MessageChannel> getBestPeer() {
return peerStatuses.entrySet().stream()
.filter(e -> !e.getValue().isExpired(peerTimeout))
.filter(e -> !e.getValue().isExpired(syncCofiguration.getExpirationTimePeerStatus()))
.max(this::bestPeerComparator)
.map(Map.Entry::getKey);
.map(Map.Entry::getValue)
.map(SyncPeerStatus::getMessageChannel);
}

public Set<NodeID> knownNodeIds() {
return peerStatuses.keySet();
}

private int bestPeerComparator(Map.Entry<NodeID, SyncPeerStatus> left, Map.Entry<NodeID, SyncPeerStatus> right) {
// TODO(mc) check expiration
return Long.compare(
left.getValue().getStatus().getBestBlockNumber(),
right.getValue().getStatus().getBestBlockNumber());
}

public SyncPeerStatus registerPeer(NodeID nodeID) {
SyncPeerStatus peerStatus = new SyncPeerStatus();
peerStatuses.put(nodeID, peerStatus);
public SyncPeerStatus registerPeer(MessageChannel messageChannel) {
SyncPeerStatus peerStatus = new SyncPeerStatus(messageChannel);
peerStatuses.put(messageChannel.getPeerNodeID(), peerStatus);
return peerStatus;
}

public boolean isKnownPeer(NodeID peerID) {
return peerStatuses.containsKey(peerID);
}
}
47 changes: 31 additions & 16 deletions rskj-core/src/main/java/co/rsk/net/sync/SyncConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,47 @@
import com.google.common.annotations.VisibleForTesting;

import javax.annotation.concurrent.Immutable;
import java.time.Duration;

@Immutable
public final class SyncConfiguration {
public static final SyncConfiguration DEFAULT = new SyncConfiguration(5, 2);
@VisibleForTesting
public static final SyncConfiguration IMMEDIATE_FOR_TESTING = new SyncConfiguration(1, 2);
public static final SyncConfiguration DEFAULT = new SyncConfiguration(5, 2, 30, 10);

private final int minimumPeers;
private final int timeoutWaitingPeers;
@VisibleForTesting
public static final SyncConfiguration IMMEDIATE_FOR_TESTING = new SyncConfiguration(1, 2, 1, 10);

public SyncConfiguration(int minimumPeers, int timeoutWaitingPeers) {
this.minimumPeers = minimumPeers;
this.timeoutWaitingPeers = timeoutWaitingPeers;
}
private final int expectedPeers;
private final Duration timeoutWaitingPeers;
private final Duration timeoutWaitingRequest;
private final Duration expirationTimePeerStatus;

/**
* @return Ideally, the minimum number of peers we would want to start finding a connection point.
*
* @param expectedPeers The expected number of peers we would want to start finding a connection point.
* @param timeoutWaitingPeers Timeout in minutes to start finding the connection point when we have at least one peer
* @param timeoutWaitingRequest Timeout in seconds to wait for syncing requests
* @param expirationTimePeerStatus Expiration time in minutes for peer status
*/
public final int getMinimumPeers() {
return minimumPeers;
public SyncConfiguration(int expectedPeers, int timeoutWaitingPeers, int timeoutWaitingRequest, int expirationTimePeerStatus) {
this.expectedPeers = expectedPeers;
this.timeoutWaitingPeers = Duration.ofMinutes(timeoutWaitingPeers);
this.timeoutWaitingRequest = Duration.ofSeconds(timeoutWaitingRequest);
this.expirationTimePeerStatus = Duration.ofMinutes(expirationTimePeerStatus);
}

/**
* @return Timeout in minutes to start finding the connection point when we have at least one peer.
*/
public final int getTimeoutWaitingPeers() {
public final int getExpectedPeers() {
return expectedPeers;
}

public final Duration getTimeoutWaitingPeers() {
return timeoutWaitingPeers;
}

public final Duration getTimeoutWaitingRequest() {
return timeoutWaitingRequest;
}

public final Duration getExpirationTimePeerStatus() {
return expirationTimePeerStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package co.rsk.net.sync;

public interface SyncEventsHandler {
void canStartSyncing();
void stopSyncing();
}
Loading

0 comments on commit 9223e11

Please sign in to comment.