Skip to content

Commit

Permalink
fix: Now RStream task expiration will be handled correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
QwQ-dev committed Jan 9, 2025
1 parent a6b898e commit 6c40b0e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,33 +112,49 @@ public ScheduledTask<?> start() {
continue;
}

if (value.size() > 1) {
// Greater than 2 because, in addition to the data, there is also an expiration time
if (value.size() > 2) {
Log.error("RStream message is not a pair! StreamMessageId: " + streamMessageId);
continue;
}

Map.Entry<Object, Object> entry = value.entrySet().iterator().next();
String left = entry.getKey().toString();
String right = entry.getValue().toString();
Pair<String, String> pair = Pair.of(left, right);
long expirationTime =
Long.parseLong(value.getOrDefault("expiration-time", 0).toString());

// Get all registed accepter
for (RStreamAccepterInterface accepter : accepters) {
String actionName = accepter.getActionName();
if (expirationTime == 0 || System.currentTimeMillis() > expirationTime) {
rStream.remove(streamMessageId);
continue;
}

for (Map.Entry<Object, Object> entry : value.entrySet()) {
Object key = entry.getKey();
String left = key.toString();

// Filter action name
if (actionName != null && !actionName.equals(left)) {
if (left.equals("expiration-time")) {
continue;
}

// New thread async accept
ScheduledTask<?> schedule =
schedule(() -> accepter.accept(rStream, streamMessageId, legacyPlayerDataService, pair.getRight()));
String right = entry.getValue().toString();
Pair<String, String> pair = Pair.of(left, right);

// Get all registed accepter
for (RStreamAccepterInterface accepter : accepters) {
String actionName = accepter.getActionName();

// Filter action name
if (actionName != null && !actionName.equals(left)) {
continue;
}

// New thread async accept
ScheduledTask<?> schedule =
schedule(() -> accepter.accept(rStream, streamMessageId, legacyPlayerDataService, pair.getRight()));

if (accepter.isRecodeLimit()) {
schedule.getFuture().whenComplete(
(result, throwable) -> acceptedId.add(streamMessageId)
);
if (accepter.isRecodeLimit()) {
schedule.getFuture().whenComplete(
(result, throwable) -> acceptedId.add(streamMessageId)
);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.redisson.api.RedissonClient;
import org.redisson.api.stream.StreamAddArgs;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -38,10 +39,19 @@ public ScheduledTask<?> start() {
RKeyUtil.getTempRMapCacheKey(legacyPlayerDataService)
);

Duration expirationTime = rStreamTask.getExpirationTime();

mapCache.put(
rStreamTask.getActionName(), rStreamTask.getData(),
rStreamTask.getExpirationTime().toMillis(), TimeUnit.MILLISECONDS
expirationTime.toMillis(), TimeUnit.MILLISECONDS
);

// Set expiration time for the cache
mapCache.put(
"expiration-time", String.valueOf(System.currentTimeMillis() + expirationTime.toMillis()),
0, TimeUnit.MILLISECONDS
);

rStream.add(StreamAddArgs.entries(mapCache));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessag
Log.error("Error while syncing player data (L1ToL2PlayerDataSyncByNameRStreamAccepter)", throwable);
return;
}
rStream.remove(streamMessageId);
ack(rStream, streamMessageId);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessag
Log.error("Error while syncing player data (L1ToL2PlayerDataSyncByUuidRStreamAccepter)", throwable);
return;
}
rStream.remove(streamMessageId);
ack(rStream, streamMessageId);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessag
*/
if (player != null && player.isOnline()) {
legacyPlayerDataService.getLegacyPlayerData(player.getUniqueId()).addData(dataMap);
rStream.remove(streamMessageId);
ack(rStream, streamMessageId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessag
*/
if (player != null && player.isOnline()) {
legacyPlayerDataService.getLegacyPlayerData(player.getUniqueId()).addData(dataMap);
rStream.remove(streamMessageId);
ack(rStream, streamMessageId);
}
}
}

0 comments on commit 6c40b0e

Please sign in to comment.