Skip to content

Commit da7cd47

Browse files
committed
Include responders missing hashes from our historical data store
1 parent ddfa14a commit da7cd47

File tree

4 files changed

+44
-23
lines changed

4 files changed

+44
-23
lines changed

p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343

4444
import java.util.HashMap;
4545
import java.util.Map;
46+
import java.util.Optional;
4647
import java.util.Random;
4748
import java.util.Set;
4849
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,7 +74,7 @@ class RequestDataHandler implements MessageListener {
7374
///////////////////////////////////////////////////////////////////////////////////////////
7475

7576
public interface Listener {
76-
void onComplete(boolean wasTruncated);
77+
void onComplete(boolean wasTruncated, String responderVersion);
7778

7879
@SuppressWarnings("UnusedParameters")
7980
void onFault(String errorMessage, @SuppressWarnings("SameParameterValue") @Nullable Connection connection);
@@ -87,6 +88,7 @@ public interface Listener {
8788
private final NetworkNode networkNode;
8889
private final P2PDataStorage dataStorage;
8990
private final PeerManager peerManager;
91+
private final Optional<String> responderVersion;
9092
private final Listener listener;
9193
private Timer timeoutTimer;
9294
private final int nonce = new Random().nextInt();
@@ -100,10 +102,12 @@ public interface Listener {
100102
RequestDataHandler(NetworkNode networkNode,
101103
P2PDataStorage dataStorage,
102104
PeerManager peerManager,
105+
Optional<String> responderVersion,
103106
Listener listener) {
104107
this.networkNode = networkNode;
105108
this.dataStorage = dataStorage;
106109
this.peerManager = peerManager;
110+
this.responderVersion = responderVersion;
107111
this.listener = listener;
108112
}
109113

@@ -124,7 +128,7 @@ void requestData(NodeAddress nodeAddress, boolean isPreliminaryDataRequest) {
124128
if (isPreliminaryDataRequest)
125129
getDataRequest = dataStorage.buildPreliminaryGetDataRequest(nonce);
126130
else
127-
getDataRequest = dataStorage.buildGetUpdatedDataRequest(networkNode.getNodeAddress(), nonce);
131+
getDataRequest = dataStorage.buildGetUpdatedDataRequest(networkNode.getNodeAddress(), nonce, responderVersion);
128132

129133
if (timeoutTimer == null) {
130134
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
@@ -201,7 +205,7 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
201205
connection.getPeersNodeAddressOptional().get());
202206

203207
cleanup();
204-
listener.onComplete(getDataResponse.isWasTruncated());
208+
listener.onComplete(getDataResponse.isWasTruncated(), getDataResponse.getVersion());
205209
} else {
206210
log.warn("Nonce not matching. That can happen rarely if we get a response after a canceled " +
207211
"handshake (timeout causes connection close but peer might have sent a msg before " +

p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataManager.java

+20-10
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void requestPreliminaryData() {
177177
nodeAddresses.remove(nodeAddress);
178178
// We clone list to avoid mutable change during iterations
179179
List<NodeAddress> remainingNodeAddresses = new ArrayList<>(nodeAddresses);
180-
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), (i * 200 + 1), TimeUnit.MILLISECONDS);
180+
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses, Optional.empty()), (i * 200 + 1), TimeUnit.MILLISECONDS);
181181
}
182182

183183
isPreliminaryDataRequest = true;
@@ -195,7 +195,7 @@ public void requestUpdateData() {
195195
// We use the node we have already connected to to request again
196196
nodeAddressOfPreliminaryDataRequest.ifPresent(candidate -> {
197197
nodeAddresses.remove(candidate);
198-
requestData(candidate, nodeAddresses);
198+
requestData(candidate, nodeAddresses, Optional.empty());
199199

200200
ArrayList<NodeAddress> finalNodeAddresses = new ArrayList<>(nodeAddresses);
201201
int numRequests = 0;
@@ -205,7 +205,7 @@ public void requestUpdateData() {
205205

206206
// It might be that we have a prelim. request open for the same seed, if so we skip to the next.
207207
if (!handlerMap.containsKey(nodeAddress)) {
208-
UserThread.runAfter(() -> requestData(nodeAddress, nodeAddresses), (i * 200 + 1), TimeUnit.MILLISECONDS);
208+
UserThread.runAfter(() -> requestData(nodeAddress, nodeAddresses, Optional.empty()), (i * 200 + 1), TimeUnit.MILLISECONDS);
209209
numRequests++;
210210
}
211211
}
@@ -333,13 +333,15 @@ public void onFault(String errorMessage, @Nullable Connection connection) {
333333
// RequestData
334334
///////////////////////////////////////////////////////////////////////////////////////////
335335

336-
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses) {
336+
private void requestData(NodeAddress nodeAddress, List<NodeAddress> remainingNodeAddresses,
337+
Optional<String> responderVersion) {
337338
if (!stopped) {
338339
if (!handlerMap.containsKey(nodeAddress)) {
339340
RequestDataHandler requestDataHandler = new RequestDataHandler(networkNode, dataStorage, peerManager,
341+
responderVersion,
340342
new RequestDataHandler.Listener() {
341343
@Override
342-
public void onComplete(boolean wasTruncated) {
344+
public void onComplete(boolean wasTruncated, String responderVersion) {
343345
log.trace("RequestDataHandshake of outbound connection complete. nodeAddress={}",
344346
nodeAddress);
345347
stopRetryTimer();
@@ -363,12 +365,20 @@ public void onComplete(boolean wasTruncated) {
363365
}
364366

365367
if (wasTruncated) {
366-
if (numRepeatedRequests < MAX_REPEATED_REQUESTS) {
368+
boolean isResponderOlderVersion = Version.isNewVersion(Version.VERSION, responderVersion);
369+
if (isResponderOlderVersion) {
370+
log.info("The responder's version is older than hours. " +
371+
"We need to include the hashes from our historical store (since {}) " +
372+
"to the next request.", responderVersion);
373+
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses, Optional.of(responderVersion)), 2);
374+
}
375+
376+
else if (numRepeatedRequests < MAX_REPEATED_REQUESTS) {
367377
// If we had allDataReceived already set to true but get a response with truncated flag,
368378
// we still repeat the request to that node for higher redundancy. Otherwise, one seed node
369379
// providing incomplete data would stop others to fill the gaps.
370380
log.info("DataResponse did not contain all data, so we repeat request until we got all data");
371-
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses), 2);
381+
UserThread.runAfter(() -> requestData(nodeAddress, remainingNodeAddresses, Optional.of(responderVersion)), 2);
372382
} else if (!allDataReceived) {
373383
allDataReceived = true;
374384
log.warn("\n#################################################################\n" +
@@ -398,7 +408,7 @@ public void onFault(String errorMessage, @Nullable Connection connection) {
398408
"We will try requestDataFromPeers again.");
399409
NodeAddress nextCandidate = remainingNodeAddresses.get(0);
400410
remainingNodeAddresses.remove(nextCandidate);
401-
requestData(nextCandidate, remainingNodeAddresses);
411+
requestData(nextCandidate, remainingNodeAddresses, Optional.empty());
402412
} else if (handlerMap.isEmpty()) {
403413
// If not other connection attempts are in the handlerMap we assume that no seed
404414
// nodes are available.
@@ -455,7 +465,7 @@ private void requestFromNonSeedNodePeers() {
455465
if (!list.isEmpty()) {
456466
NodeAddress nextCandidate = list.get(0);
457467
list.remove(nextCandidate);
458-
requestData(nextCandidate, list);
468+
requestData(nextCandidate, list, Optional.empty());
459469
}
460470
}
461471

@@ -482,7 +492,7 @@ private void restart() {
482492
if (!list.isEmpty()) {
483493
NodeAddress nextCandidate = list.get(0);
484494
list.remove(nextCandidate);
485-
requestData(nextCandidate, list);
495+
requestData(nextCandidate, list, Optional.empty());
486496
}
487497
},
488498
RETRY_DELAY_SEC);

p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import java.util.HashSet;
105105
import java.util.List;
106106
import java.util.Map;
107+
import java.util.Optional;
107108
import java.util.Set;
108109
import java.util.concurrent.ConcurrentHashMap;
109110
import java.util.concurrent.CopyOnWriteArraySet;
@@ -271,25 +272,26 @@ public void addProtectedMailboxStorageEntryToMap(ProtectedStorageEntry protected
271272
* Returns a PreliminaryGetDataRequest that can be sent to a peer node to request missing Payload data.
272273
*/
273274
public PreliminaryGetDataRequest buildPreliminaryGetDataRequest(int nonce) {
274-
return new PreliminaryGetDataRequest(nonce, getKnownPayloadHashes());
275+
return new PreliminaryGetDataRequest(nonce, getKnownPayloadHashes(Optional.empty()));
275276
}
276277

277278
/**
278279
* Returns a GetUpdatedDataRequest that can be sent to a peer node to request missing Payload data.
279280
*/
280-
public GetUpdatedDataRequest buildGetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce) {
281-
return new GetUpdatedDataRequest(senderNodeAddress, nonce, getKnownPayloadHashes());
281+
public GetUpdatedDataRequest buildGetUpdatedDataRequest(NodeAddress senderNodeAddress, int nonce,
282+
Optional<String> responderVersion) {
283+
return new GetUpdatedDataRequest(senderNodeAddress, nonce, getKnownPayloadHashes(responderVersion));
282284
}
283285

284286
/**
285287
* Returns the set of known payload hashes. This is used in the GetData path to request missing data from peer nodes
286288
*/
287-
private Set<byte[]> getKnownPayloadHashes() {
289+
private Set<byte[]> getKnownPayloadHashes(Optional<String> responderVersion) {
288290
// We collect the keys of the PersistableNetworkPayload items so we exclude them in our request.
289291
// PersistedStoragePayload items don't get removed, so we don't have an issue with the case that
290292
// an object gets removed in between PreliminaryGetDataRequest and the GetUpdatedDataRequest and we would
291293
// miss that event if we do not load the full set or use some delta handling.
292-
Map<ByteArray, PersistableNetworkPayload> mapForDataRequest = getMapForDataRequest();
294+
Map<ByteArray, PersistableNetworkPayload> mapForDataRequest = getMapForDataRequest(responderVersion);
293295
Set<byte[]> excludedKeys = getKeysAsByteSet(mapForDataRequest);
294296
Set<byte[]> excludedKeysFromProtectedStorageEntryMap = getKeysAsByteSet(map);
295297
excludedKeys.addAll(excludedKeysFromProtectedStorageEntryMap);
@@ -374,7 +376,7 @@ public GetDataResponse buildGetDataResponse(
374376
// Utils for collecting the exclude hashes
375377
///////////////////////////////////////////////////////////////////////////////////////////
376378

377-
private Map<ByteArray, PersistableNetworkPayload> getMapForDataRequest() {
379+
private Map<ByteArray, PersistableNetworkPayload> getMapForDataRequest(Optional<String> responderVersion) {
378380
Map<ByteArray, PersistableNetworkPayload> map = new HashMap<>();
379381
appendOnlyDataStoreService.getServices()
380382
.forEach(service -> {
@@ -383,7 +385,11 @@ private Map<ByteArray, PersistableNetworkPayload> getMapForDataRequest() {
383385
var historicalDataStoreService = (HistoricalDataStoreService<? extends PersistableNetworkPayloadStore>) service;
384386
// As we add the version to our request we only use the live data. Eventually missing data will be
385387
// derived from the version.
386-
serviceMap = historicalDataStoreService.getMapOfLiveData();
388+
if (responderVersion.isEmpty()) {
389+
serviceMap = historicalDataStoreService.getMapOfLiveData();
390+
} else {
391+
serviceMap = historicalDataStoreService.getMapSinceVersion(responderVersion.get(), Collections.emptySet(), new AtomicBoolean());
392+
}
387393
} else {
388394
serviceMap = service.getMap();
389395
}
@@ -547,7 +553,7 @@ static private <T extends NetworkPayload> Set<T> filterKnownHashes(
547553
}
548554

549555
public Collection<PersistableNetworkPayload> getPersistableNetworkPayloadCollection() {
550-
return getMapForDataRequest().values();
556+
return getMapForDataRequest(Optional.empty()).values();
551557
}
552558

553559
private Set<byte[]> getKeysAsByteSet(Map<ByteArray, ? extends PersistablePayload> map) {

p2p/src/test/java/bisq/network/p2p/storage/P2PDataStorageRequestDataTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.security.KeyPair;
3434
import java.security.NoSuchAlgorithmException;
3535

36+
import java.util.Optional;
3637
import java.util.Set;
3738

3839
import org.junit.jupiter.api.BeforeEach;
@@ -105,7 +106,7 @@ public void buildPreliminaryGetDataRequest_EmptyP2PDataStore() {
105106
@Test
106107
public void buildGetUpdatedDataRequest_EmptyP2PDataStore() {
107108
GetUpdatedDataRequest getDataRequest =
108-
this.testState.mockedStorage.buildGetUpdatedDataRequest(this.localNodeAddress, 1);
109+
this.testState.mockedStorage.buildGetUpdatedDataRequest(this.localNodeAddress, 1, Optional.empty());
109110

110111
assertEquals(getDataRequest.getNonce(), 1);
111112
assertEquals(getDataRequest.getSenderNodeAddress(), this.localNodeAddress);
@@ -156,7 +157,7 @@ public void requestData_FilledP2PDataStore_GetUpdatedDataRequest() throws NoSuch
156157
this.testState.mockedStorage.addProtectedStorageEntry(toAdd4, this.localNodeAddress, null);
157158

158159
GetUpdatedDataRequest getDataRequest =
159-
this.testState.mockedStorage.buildGetUpdatedDataRequest(this.localNodeAddress, 1);
160+
this.testState.mockedStorage.buildGetUpdatedDataRequest(this.localNodeAddress, 1, Optional.empty());
160161

161162
assertEquals(getDataRequest.getNonce(), 1);
162163
assertEquals(getDataRequest.getSenderNodeAddress(), this.localNodeAddress);

0 commit comments

Comments
 (0)