Skip to content

Commit

Permalink
Correct API documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
zapek committed Nov 26, 2023
1 parent 776c650 commit 887b358
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions app/src/main/java/io/xeres/app/xrs/service/gxs/GxsRsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,20 @@ protected enum VerificationStatus
protected abstract List<M> onPendingMessageListRequest(PeerConnection recipient, GxsId groupId, Instant since);

/**
* Called when a peer sends the list of new messages that might interest us, within a group.
* Called when the peer wants specific messages to be transferred to him, within a group.
*
* @param groupId the group ID
* @param messageIds the ids of new messages
* @return the subset of those messages that we actually want
* @param messageIds the ids of messages that the peer wants
* @return the messages that we have available within the requested set
*/
protected abstract List<M> onMessageListRequest(GxsId groupId, Set<MessageId> messageIds);

/**
* Called when the peer wants specific messages to be transferred to him, within a group.
* Called when a peer sends the list of new messages that might interest us, within a group.
*
* @param groupId the group ID
* @param messageIds the ids of messages that the peer wants
* @return the messages that we have available within the requested set
* @param messageIds the ids of new messages
* @return the subset of those messages that we actually want
*/
protected abstract List<MessageId> onMessageListResponse(GxsId groupId, Set<MessageId> messageIds);

Expand Down Expand Up @@ -340,13 +340,13 @@ protected void sendSyncNotification(PeerConnection peerConnection)

private void handleGxsSyncGroupRequestItem(PeerConnection peerConnection, GxsSyncGroupRequestItem item)
{
log.debug("Got {} from {}", item, peerConnection);
log.debug("{} sent {}", peerConnection, item);

var transactionId = getTransactionId(peerConnection);
var since = Instant.ofEpochSecond(item.getLastUpdated());
if (areGxsUpdatesAvailableForPeer(since))
{
log.debug("Updates available for peer, sending group ids...");
log.debug("Group updates available, sending ids...");
List<GxsSyncGroupItem> items = new ArrayList<>();

onAvailableGroupListRequest(peerConnection, since).forEach(gxsGroupItem -> {
Expand Down Expand Up @@ -374,7 +374,7 @@ private void handleGxsSyncGroupRequestItem(PeerConnection peerConnection, GxsSyn
}
else
{
log.debug("No update available for {}", peerConnection);
log.debug("No group updates available");
}

// XXX: check if the peer is subscribed, encrypt or not the group, etc... it's rsgxsnetservice.cc/handleRecvSyncGroup we might not need that for gxsid transferts
Expand All @@ -384,14 +384,14 @@ private void handleGxsSyncGroupRequestItem(PeerConnection peerConnection, GxsSyn

private void handleGxsSyncMessageRequestItem(PeerConnection peerConnection, GxsSyncMessageRequestItem item)
{
log.debug("Got {} from {}", item, peerConnection);
log.debug("{} sent {}", peerConnection, item);

var transactionId = getTransactionId(peerConnection);
var lastUpdated = Instant.ofEpochSecond(item.getLastUpdated());
var since = Instant.ofEpochSecond(item.getCreateSince());
if (areMessageUpdatesAvailableForPeer(item.getGroupId(), lastUpdated, since))
{
log.debug("Messages available for {}, sending...", peerConnection);
log.debug("New messages available, sending ids...");
List<GxsSyncMessageItem> items = new ArrayList<>();

onPendingMessageListRequest(peerConnection, item.getGroupId(), since).forEach(gxsMessageItem -> {
Expand All @@ -415,7 +415,7 @@ private void handleGxsSyncMessageRequestItem(PeerConnection peerConnection, GxsS
}
else
{
log.debug("No messages available for {}", peerConnection);
log.debug("No new messages");
}

// XXX: maybe some more to do, check rsgxsnetservice.cc/handleRecvSyncMsg
Expand Down Expand Up @@ -443,7 +443,6 @@ protected int getTransactionId(PeerConnection peerConnection)
private boolean areGxsUpdatesAvailableForPeer(Instant lastPeerUpdate)
{
var lastServiceUpdate = gxsUpdateService.getLastServiceGroupsUpdate(getServiceType());
log.debug("Local update: {} <-> peer update: {}", lastServiceUpdate, lastPeerUpdate);
// XXX: there should be a way to detect if the peer is sending a lastPeerUpdate several times (means the transaction isn't complete yet)
return lastPeerUpdate.isBefore(lastServiceUpdate);
}
Expand Down Expand Up @@ -487,15 +486,15 @@ public void processItems(PeerConnection peerConnection, Transaction<?> transacti
@SuppressWarnings("unchecked")
var gxsIdsMap = ((List<GxsSyncGroupItem>) transaction.getItems()).stream()
.collect(toMap(GxsSyncGroupItem::getGroupId, gxsSyncGroupItem -> Instant.ofEpochSecond(gxsSyncGroupItem.getPublishTimestamp())));
log.debug("{} has the following gxsIds (new or updates) for us (total: {}): {} ...", peerConnection, gxsIdsMap.keySet().size(), gxsIdsMap.keySet().stream().limit(10).toList());
log.debug("{} has the following group ids (new or updates) for us (total: {}): {} ...", peerConnection, gxsIdsMap.keySet().size(), gxsIdsMap.keySet().stream().limit(10).toList());
requestGxsGroups(peerConnection, onAvailableGroupListResponse(gxsIdsMap));
}
else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_GROUP_LIST_REQUEST))
{
@SuppressWarnings("unchecked")
var gxsIds = ((List<GxsSyncGroupItem>) transaction.getItems()).stream()
.map(GxsSyncGroupItem::getGroupId).collect(toSet());
log.debug("{} wants the following gxs ids (total: {}): {} ...", peerConnection, gxsIds.size(), gxsIds.stream().limit(10).toList());
log.debug("{} wants the following group ids (total: {}): {} ...", peerConnection, gxsIds.size(), gxsIds.stream().limit(10).toList());
sendGxsGroups(peerConnection, onGroupListRequest(gxsIds));
}
else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_GROUPS))
Expand All @@ -508,6 +507,7 @@ else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_GROUPS
verifyAndStoreGroups(peerConnection, gxsGroupItems);
if (!gxsGroupItems.isEmpty())
{
log.debug("{} sent groups", peerConnection);
gxsUpdateService.setLastPeerGroupsUpdate(peerConnection.getLocation(), transaction.getUpdated(), getServiceType());
gxsUpdateService.setLastServiceGroupsUpdateNow(getServiceType());
peerConnectionManager.doForAllPeersExceptSender(this::sendSyncNotification, peerConnection, this);
Expand All @@ -521,7 +521,7 @@ else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_MESSAG
@SuppressWarnings("unchecked")
var messageIds = ((List<GxsSyncMessageItem>) transaction.getItems()).stream()
.map(GxsSyncMessageItem::getMessageId).collect(toSet());
log.debug("{} has the following messageIds for groupId {} (new) for us (total: {}): {} ...", peerConnection, groupId, messageIds.size(), messageIds.stream().limit(10).toList());
log.debug("{} has the following message ids for group {} (new) for us (total: {}): {} ...", peerConnection, groupId, messageIds.size(), messageIds.stream().limit(10).toList());
requestGxsMessages(peerConnection, groupId, onMessageListResponse(groupId, messageIds));
}
else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_MESSAGE_LIST_REQUEST))
Expand All @@ -532,7 +532,7 @@ else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_MESSAG
@SuppressWarnings("unchecked")
var messageIds = ((List<GxsSyncMessageItem>) transaction.getItems()).stream()
.map(GxsSyncMessageItem::getMessageId).collect(toSet());
log.debug("{} wants the following messageIds for groupId {} (total: {}): {} ...", peerConnection, groupId, messageIds.size(), messageIds.stream().limit(10).toList());
log.debug("{} wants the following message ids for group {} (total: {}): {} ...", peerConnection, groupId, messageIds.size(), messageIds.stream().limit(10).toList());
sendGxsMessages(peerConnection, onMessageListRequest(groupId, messageIds));
}
else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_MESSAGES))
Expand All @@ -545,6 +545,7 @@ else if (transaction.getTransactionFlags().contains(TransactionFlags.TYPE_MESSAG
verifyAndStoreMessages(peerConnection, gxsMessageItems);
if (!gxsMessageItems.isEmpty())
{
log.debug("{} sent messages", peerConnection);
gxsUpdateService.setLastPeerMessageUpdate(peerConnection.getLocation(), gxsMessageItems.get(0).getGxsId(), transaction.getUpdated(), getServiceType());
//setLastServiceGroupsUpdateNow(getServiceType()); XXX: should that be done? I'd say no but RS has some comment in the source about it
peerConnectionManager.doForAllPeersExceptSender(this::sendSyncNotification, peerConnection, this);
Expand Down

0 comments on commit 887b358

Please sign in to comment.