From 48efd0e040a6b8901f75e79ffa41738f3a7b1851 Mon Sep 17 00:00:00 2001 From: QwQ-dev Date: Tue, 7 Jan 2025 11:12:21 +0800 Subject: [PATCH] fix: Each LegacyPlayerDataService has its own RStream communication, which will not contain data from other LegacyPlayerDataService. --- .../player/listener/PlayerListener.java | 5 ++- .../adapter/PairTypeAdapter.java | 2 +- .../task/redis/RStreamAccepterInterface.java | 34 ++++++++----------- .../task/redis/RStreamAccepterTask.java | 13 +++++-- .../PlayerDataSyncNameRedisStreamAccept.java | 29 +++------------- .../PlayerDataSyncUUIDRedisStreamAccept.java | 28 +++------------ .../PlayerDataUpdateRedisStreamAccept.java | 10 ++---- 7 files changed, 40 insertions(+), 81 deletions(-) rename player/src/main/java/net/legacy/library/player/{service => serialize}/adapter/PairTypeAdapter.java (96%) diff --git a/player/src/main/java/net/legacy/library/player/listener/PlayerListener.java b/player/src/main/java/net/legacy/library/player/listener/PlayerListener.java index ffdb5f1..07c4357 100644 --- a/player/src/main/java/net/legacy/library/player/listener/PlayerListener.java +++ b/player/src/main/java/net/legacy/library/player/listener/PlayerListener.java @@ -4,7 +4,6 @@ import io.fairyproject.container.InjectableComponent; import io.papermc.paper.event.player.AsyncChatEvent; import net.legacy.library.cache.model.LockSettings; -import net.legacy.library.commons.util.GsonUtil; import net.legacy.library.player.model.LegacyPlayerData; import net.legacy.library.player.service.LegacyPlayerDataService; import net.legacy.library.player.task.PlayerQuitDataSaveTask; @@ -33,8 +32,8 @@ public void on(AsyncChatEvent event) { LegacyPlayerDataService legacyPlayerDataService1 = legacyPlayerDataService.get(); LegacyPlayerData legacyPlayerData = legacyPlayerDataService1.getLegacyPlayerData(event.getPlayer().getUniqueId()); - Pair pair = Pair.of("player-data-sync-name", GsonUtil.getGson().toJson(Pair.of("player-data-service", "PsycheQwQ"))); - Pair pair2 = Pair.of("player-data-sync-name", GsonUtil.getGson().toJson(Pair.of("player-data-service", "PsycheQwQ2"))); + Pair pair = Pair.of("player-data-sync-name", "PsycheQwQ"); + Pair pair2 = Pair.of("player-data-sync-name", "PsycheQwQ2"); legacyPlayerDataService1.redisStreamPubTask(pair, Duration.ofSeconds(60)); legacyPlayerDataService1.redisStreamPubTask(pair2, Duration.ofSeconds(65)); diff --git a/player/src/main/java/net/legacy/library/player/service/adapter/PairTypeAdapter.java b/player/src/main/java/net/legacy/library/player/serialize/adapter/PairTypeAdapter.java similarity index 96% rename from player/src/main/java/net/legacy/library/player/service/adapter/PairTypeAdapter.java rename to player/src/main/java/net/legacy/library/player/serialize/adapter/PairTypeAdapter.java index d80478b..2a1577f 100644 --- a/player/src/main/java/net/legacy/library/player/service/adapter/PairTypeAdapter.java +++ b/player/src/main/java/net/legacy/library/player/serialize/adapter/PairTypeAdapter.java @@ -1,4 +1,4 @@ -package net.legacy.library.player.service.adapter; +package net.legacy.library.player.serialize.adapter; import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; diff --git a/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterInterface.java b/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterInterface.java index 0089f80..abee016 100644 --- a/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterInterface.java +++ b/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterInterface.java @@ -1,5 +1,6 @@ package net.legacy.library.player.task.redis; +import net.legacy.library.player.service.LegacyPlayerDataService; import org.apache.commons.lang3.tuple.Pair; import org.redisson.api.RStream; import org.redisson.api.StreamMessageId; @@ -20,25 +21,17 @@ public interface RStreamAccepterInterface { *

This serves as a unique identifier or categorization for the specific type of task * being processed. For example, it could represent operations like "player-data-sync-name". * - * @return A {@link String} representing the action name of the task. - */ - String getActionName(); - - /** - * Get the target legacy player data service name that this task applies to. - * - *

This corresponds to the specific player data service instance this task - * targets. It helps in routing or contextualizing tasks in multiservice or distributed environments. + *

If we don't care, we can return {@code null} * - * @return A {@link String} representing the target service name, If {@code null} is returned, it means there is no limit + * @return A {@link String} representing the action name of the task */ - String getTargetLegacyPlayerDataServiceName(); + String getActionName(); /** * Determine whether to limit task processing to prevent duplicates within a single server or connection. * *

If this method returns {@code true}, the task will be processed only once per - * connection on each server. After the {@link #accept(RStream, StreamMessageId, Pair)} method is executed, + * connection on each server. After the {@link #accept(RStream, StreamMessageId, LegacyPlayerDataService, Pair)} method is executed, * the task will not be executed again by the same instance unless explicitly deleted. * *

However, if another server or connection processes the task, it can still @@ -46,11 +39,11 @@ public interface RStreamAccepterInterface { * and deleted, or until it expires. * *

If this method returns {@code false}, the task can be processed repeatedly, - * regardless of whether the {@link #accept(RStream, StreamMessageId, Pair)} method runs on the + * regardless of whether the {@link #accept(RStream, StreamMessageId, LegacyPlayerDataService, Pair)} method runs on the * same connection or instance. * - * @return {@code true} if task records are limited to a single handling per connection, - * {@code false} otherwise. + * @return {@code true} if task records are limited to a single handling per connection + * {@code false} otherwise */ boolean isRecodeLimit(); @@ -62,7 +55,7 @@ public interface RStreamAccepterInterface { *

    *
  • Determine if the task is valid and can be processed.
  • *
  • If the task is successfully processed, it can be explicitly deleted using - * methods provided by {@link RStream} (e.g. {@code rStream.remove()}).
  • + * methods provided by {@link RStream} (e.g. {@link RStream#remove(StreamMessageId...)}). *
  • If the processing fails, the task will remain in the rStream and be available * for handling by other connections or servers.
  • *
@@ -82,9 +75,10 @@ public interface RStreamAccepterInterface { *

Note: This method is not exclusive, meaning multiple connections or servers * may attempt to process the same task concurrently unless additional controls are in place. * - * @param rStream The {@link RStream} object representing the Redis stream containing the task. - * @param streamMessageId The {@link StreamMessageId} object representing the unique ID of the task. - * @param data The data contained in the task, represented as a {@link Pair} of {@link String} objects. + * @param rStream The {@link RStream} object representing the Redis stream containing the task + * @param legacyPlayerDataService The {@link LegacyPlayerDataService} object representing the service for handling player data + * @param streamMessageId The {@link StreamMessageId} object representing the unique ID of the task + * @param data The data contained in the task, represented as a {@link Pair} of {@link String} objects */ - void accept(RStream rStream, StreamMessageId streamMessageId, Pair data); + void accept(RStream rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair data); } \ No newline at end of file diff --git a/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterTask.java b/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterTask.java index 62a996b..52704b2 100644 --- a/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterTask.java +++ b/player/src/main/java/net/legacy/library/player/task/redis/RStreamAccepterTask.java @@ -74,6 +74,11 @@ public ScheduledTask start() { Runnable runnable = () -> { RedisCacheServiceInterface redisCacheService = legacyPlayerDataService.getL2Cache(); RedissonClient redissonClient = redisCacheService.getCache(); + + /* + * Each LegacyPlayerDataService has its own RStream communication + * which will not contain data from other LegacyPlayerDataService + */ RStream rStream = redissonClient.getStream(RKeyUtil.getRStreamNameKey(legacyPlayerDataService)); StreamReadArgs args = StreamReadArgs.greaterThan(StreamMessageId.ALL); @@ -107,14 +112,16 @@ public ScheduledTask start() { // Get all registed accepter for (RStreamAccepterInterface accepter : accepters) { - if (!accepter.getActionName().equals(left) || - !accepter.getTargetLegacyPlayerDataServiceName().equals(legacyPlayerDataService.getName())) { + String actionName = accepter.getActionName(); + + // Filter action name + if (actionName != null && !actionName.equals(left)) { continue; } // New thread async accept ScheduledTask schedule = - schedule(() -> accepter.accept(rStream, streamMessageId, pair)); + schedule(() -> accepter.accept(rStream, streamMessageId, legacyPlayerDataService, pair)); if (accepter.isRecodeLimit()) { schedule.getFuture().whenComplete( diff --git a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncNameRedisStreamAccept.java b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncNameRedisStreamAccept.java index 594f670..a3ef94c 100644 --- a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncNameRedisStreamAccept.java +++ b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncNameRedisStreamAccept.java @@ -1,8 +1,6 @@ package net.legacy.library.player.task.redis.impl; -import com.google.common.reflect.TypeToken; import io.fairyproject.log.Log; -import net.legacy.library.commons.util.GsonUtil; import net.legacy.library.player.annotation.RStreamAccepterRegister; import net.legacy.library.player.service.LegacyPlayerDataService; import net.legacy.library.player.task.redis.L1ToL2DataSyncTask; @@ -13,8 +11,6 @@ import org.redisson.api.RStream; import org.redisson.api.StreamMessageId; -import java.util.Optional; - /** * @author qwq-dev * @since 2025-01-04 20:59 @@ -26,41 +22,26 @@ public String getActionName() { return "player-data-sync-name"; } - public String getTargetLegacyPlayerDataServiceName() { - return "player-data-service"; - } - @Override public boolean isRecodeLimit() { return true; } @Override - public void accept(RStream rStream, StreamMessageId streamMessageId, Pair data) { - Object value = data.getValue(); - - Pair pair = GsonUtil.getGson().fromJson( - value.toString(), new TypeToken>() { - }.getType() - ); - - String first = pair.getKey(); - String second = pair.getValue(); + public void accept(RStream rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair data) { + // Scound must be player name + String second = data.getValue(); // Very slow, but it's async so it's fine OfflinePlayer offlinePlayer = Bukkit.getOfflinePlayer(second); - Optional legacyPlayerDataService = - LegacyPlayerDataService.getLegacyPlayerDataService(first); - - legacyPlayerDataService.ifPresent(service -> L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), service).start().getFuture().whenComplete((aVoid, throwable) -> { + L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), legacyPlayerDataService).start().getFuture().whenComplete((aVoid, throwable) -> { if (throwable != null) { Log.error("Error while syncing player data", throwable); return; } - rStream.remove(streamMessageId); - })); + }); } } \ No newline at end of file diff --git a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncUUIDRedisStreamAccept.java b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncUUIDRedisStreamAccept.java index 0291c59..0727e2b 100644 --- a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncUUIDRedisStreamAccept.java +++ b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataSyncUUIDRedisStreamAccept.java @@ -1,8 +1,6 @@ package net.legacy.library.player.task.redis.impl; -import com.google.common.reflect.TypeToken; import io.fairyproject.log.Log; -import net.legacy.library.commons.util.GsonUtil; import net.legacy.library.player.annotation.RStreamAccepterRegister; import net.legacy.library.player.service.LegacyPlayerDataService; import net.legacy.library.player.task.redis.L1ToL2DataSyncTask; @@ -11,7 +9,6 @@ import org.redisson.api.RStream; import org.redisson.api.StreamMessageId; -import java.util.Optional; import java.util.UUID; /** @@ -25,37 +22,22 @@ public String getActionName() { return "player-data-sync-uuid"; } - public String getTargetLegacyPlayerDataServiceName() { - return "player-data-service"; - } - @Override public boolean isRecodeLimit() { return true; } @Override - public void accept(RStream rStream, StreamMessageId streamMessageId, Pair data) { - Object value = data.getValue(); - - Pair pair = GsonUtil.getGson().fromJson( - value.toString(), new TypeToken>() { - }.getType() - ); + public void accept(RStream rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair data) { + // Scound must be player uuid + String second = data.getValue(); - String first = pair.getKey(); - String second = pair.getValue(); - - Optional legacyPlayerDataService = - LegacyPlayerDataService.getLegacyPlayerDataService(first); - - legacyPlayerDataService.ifPresent(service -> L1ToL2DataSyncTask.of(UUID.fromString(second), service).start().getFuture().whenComplete((aVoid, throwable) -> { + L1ToL2DataSyncTask.of(UUID.fromString(second), legacyPlayerDataService).start().getFuture().whenComplete((aVoid, throwable) -> { if (throwable != null) { Log.error("Error while syncing player data", throwable); return; } - rStream.remove(streamMessageId); - })); + }); } } \ No newline at end of file diff --git a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataUpdateRedisStreamAccept.java b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataUpdateRedisStreamAccept.java index 6d7fdb1..55303bf 100644 --- a/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataUpdateRedisStreamAccept.java +++ b/player/src/main/java/net/legacy/library/player/task/redis/impl/PlayerDataUpdateRedisStreamAccept.java @@ -1,6 +1,7 @@ package net.legacy.library.player.task.redis.impl; import net.legacy.library.player.annotation.RStreamAccepterRegister; +import net.legacy.library.player.service.LegacyPlayerDataService; import net.legacy.library.player.task.redis.RStreamAccepterInterface; import org.apache.commons.lang3.tuple.Pair; import org.redisson.api.RStream; @@ -17,18 +18,13 @@ public String getActionName() { return "player-data-update-name"; } - @Override - public String getTargetLegacyPlayerDataServiceName() { - return null; - } - @Override public boolean isRecodeLimit() { return true; } @Override - public void accept(RStream rStream, StreamMessageId streamMessageId, Pair data) { - // TODO + public void accept(RStream rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair data) { + } } \ No newline at end of file