Skip to content

Commit

Permalink
fix: Each LegacyPlayerDataService has its own RStream communication, …
Browse files Browse the repository at this point in the history
…which will not contain data from other LegacyPlayerDataService.
  • Loading branch information
QwQ-dev committed Jan 7, 2025
1 parent 20a8e4f commit 48efd0e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.fairyproject.container.InjectableComponent;
import io.papermc.paper.event.player.AsyncChatEvent;
import net.legacy.library.cache.model.LockSettings;
import net.legacy.library.commons.util.GsonUtil;
import net.legacy.library.player.model.LegacyPlayerData;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.PlayerQuitDataSaveTask;
Expand Down Expand Up @@ -33,8 +32,8 @@ public void on(AsyncChatEvent event) {
LegacyPlayerDataService legacyPlayerDataService1 = legacyPlayerDataService.get();
LegacyPlayerData legacyPlayerData = legacyPlayerDataService1.getLegacyPlayerData(event.getPlayer().getUniqueId());

Pair<String, String> pair = Pair.of("player-data-sync-name", GsonUtil.getGson().toJson(Pair.of("player-data-service", "PsycheQwQ")));
Pair<String, String> pair2 = Pair.of("player-data-sync-name", GsonUtil.getGson().toJson(Pair.of("player-data-service", "PsycheQwQ2")));
Pair<String, String> pair = Pair.of("player-data-sync-name", "PsycheQwQ");
Pair<String, String> pair2 = Pair.of("player-data-sync-name", "PsycheQwQ2");

legacyPlayerDataService1.redisStreamPubTask(pair, Duration.ofSeconds(60));
legacyPlayerDataService1.redisStreamPubTask(pair2, Duration.ofSeconds(65));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package net.legacy.library.player.service.adapter;
package net.legacy.library.player.serialize.adapter;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.legacy.library.player.task.redis;

import net.legacy.library.player.service.LegacyPlayerDataService;
import org.apache.commons.lang3.tuple.Pair;
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;
Expand All @@ -20,37 +21,29 @@ public interface RStreamAccepterInterface {
* <p>This serves as a unique identifier or categorization for the specific type of task
* being processed. For example, it could represent operations like "player-data-sync-name".
*
* @return A {@link String} representing the action name of the task.
*/
String getActionName();

/**
* Get the target legacy player data service name that this task applies to.
*
* <p>This corresponds to the specific player data service instance this task
* targets. It helps in routing or contextualizing tasks in multiservice or distributed environments.
* <p>If we don't care, we can return {@code null}
*
* @return A {@link String} representing the target service name, If {@code null} is returned, it means there is no limit
* @return A {@link String} representing the action name of the task
*/
String getTargetLegacyPlayerDataServiceName();
String getActionName();

/**
* Determine whether to limit task processing to prevent duplicates within a single server or connection.
*
* <p>If this method returns {@code true}, the task will be processed only once per
* connection on each server. After the {@link #accept(RStream, StreamMessageId, Pair)} method is executed,
* connection on each server. After the {@link #accept(RStream, StreamMessageId, LegacyPlayerDataService, Pair)} method is executed,
* the task will not be executed again by the same instance unless explicitly deleted.
*
* <p>However, if another server or connection processes the task, it can still
* be handled there. A task remains in the rStream until it is correctly processed
* and deleted, or until it expires.
*
* <p>If this method returns {@code false}, the task can be processed repeatedly,
* regardless of whether the {@link #accept(RStream, StreamMessageId, Pair)} method runs on the
* regardless of whether the {@link #accept(RStream, StreamMessageId, LegacyPlayerDataService, Pair)} method runs on the
* same connection or instance.
*
* @return {@code true} if task records are limited to a single handling per connection,
* {@code false} otherwise.
* @return {@code true} if task records are limited to a single handling per connection
* {@code false} otherwise
*/
boolean isRecodeLimit();

Expand All @@ -62,7 +55,7 @@ public interface RStreamAccepterInterface {
* <ul>
* <li>Determine if the task is valid and can be processed.</li>
* <li>If the task is successfully processed, it can be explicitly deleted using
* methods provided by {@link RStream} (e.g. {@code rStream.remove()}).</li>
* methods provided by {@link RStream} (e.g. {@link RStream#remove(StreamMessageId...)}).</li>
* <li>If the processing fails, the task will remain in the rStream and be available
* for handling by other connections or servers.</li>
* </ul>
Expand All @@ -82,9 +75,10 @@ public interface RStreamAccepterInterface {
* <p><b>Note:</b> This method is not exclusive, meaning multiple connections or servers
* may attempt to process the same task concurrently unless additional controls are in place.
*
* @param rStream The {@link RStream} object representing the Redis stream containing the task.
* @param streamMessageId The {@link StreamMessageId} object representing the unique ID of the task.
* @param data The data contained in the task, represented as a {@link Pair} of {@link String} objects.
* @param rStream The {@link RStream} object representing the Redis stream containing the task
* @param legacyPlayerDataService The {@link LegacyPlayerDataService} object representing the service for handling player data
* @param streamMessageId The {@link StreamMessageId} object representing the unique ID of the task
* @param data The data contained in the task, represented as a {@link Pair} of {@link String} objects
*/
void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, Pair<String, String> data);
void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair<String, String> data);
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public ScheduledTask<?> start() {
Runnable runnable = () -> {
RedisCacheServiceInterface redisCacheService = legacyPlayerDataService.getL2Cache();
RedissonClient redissonClient = redisCacheService.getCache();

/*
* Each LegacyPlayerDataService has its own RStream communication
* which will not contain data from other LegacyPlayerDataService
*/
RStream<Object, Object> rStream = redissonClient.getStream(RKeyUtil.getRStreamNameKey(legacyPlayerDataService));

StreamReadArgs args = StreamReadArgs.greaterThan(StreamMessageId.ALL);
Expand Down Expand Up @@ -107,14 +112,16 @@ public ScheduledTask<?> start() {

// Get all registed accepter
for (RStreamAccepterInterface accepter : accepters) {
if (!accepter.getActionName().equals(left) ||
!accepter.getTargetLegacyPlayerDataServiceName().equals(legacyPlayerDataService.getName())) {
String actionName = accepter.getActionName();

// Filter action name
if (actionName != null && !actionName.equals(left)) {
continue;
}

// New thread async accept
ScheduledTask<?> schedule =
schedule(() -> accepter.accept(rStream, streamMessageId, pair));
schedule(() -> accepter.accept(rStream, streamMessageId, legacyPlayerDataService, pair));

if (accepter.isRecodeLimit()) {
schedule.getFuture().whenComplete(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package net.legacy.library.player.task.redis.impl;

import com.google.common.reflect.TypeToken;
import io.fairyproject.log.Log;
import net.legacy.library.commons.util.GsonUtil;
import net.legacy.library.player.annotation.RStreamAccepterRegister;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.redis.L1ToL2DataSyncTask;
Expand All @@ -13,8 +11,6 @@
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;

import java.util.Optional;

/**
* @author qwq-dev
* @since 2025-01-04 20:59
Expand All @@ -26,41 +22,26 @@ public String getActionName() {
return "player-data-sync-name";
}

public String getTargetLegacyPlayerDataServiceName() {
return "player-data-service";
}

@Override
public boolean isRecodeLimit() {
return true;
}

@Override
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, Pair<String, String> data) {
Object value = data.getValue();

Pair<String, String> pair = GsonUtil.getGson().fromJson(
value.toString(), new TypeToken<Pair<String, String>>() {
}.getType()
);

String first = pair.getKey();
String second = pair.getValue();
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair<String, String> data) {
// Scound must be player name
String second = data.getValue();

// Very slow, but it's async so it's fine
OfflinePlayer offlinePlayer =
Bukkit.getOfflinePlayer(second);

Optional<LegacyPlayerDataService> legacyPlayerDataService =
LegacyPlayerDataService.getLegacyPlayerDataService(first);

legacyPlayerDataService.ifPresent(service -> L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), service).start().getFuture().whenComplete((aVoid, throwable) -> {
L1ToL2DataSyncTask.of(offlinePlayer.getUniqueId(), legacyPlayerDataService).start().getFuture().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
Log.error("Error while syncing player data", throwable);
return;
}

rStream.remove(streamMessageId);
}));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package net.legacy.library.player.task.redis.impl;

import com.google.common.reflect.TypeToken;
import io.fairyproject.log.Log;
import net.legacy.library.commons.util.GsonUtil;
import net.legacy.library.player.annotation.RStreamAccepterRegister;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.redis.L1ToL2DataSyncTask;
Expand All @@ -11,7 +9,6 @@
import org.redisson.api.RStream;
import org.redisson.api.StreamMessageId;

import java.util.Optional;
import java.util.UUID;

/**
Expand All @@ -25,37 +22,22 @@ public String getActionName() {
return "player-data-sync-uuid";
}

public String getTargetLegacyPlayerDataServiceName() {
return "player-data-service";
}

@Override
public boolean isRecodeLimit() {
return true;
}

@Override
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, Pair<String, String> data) {
Object value = data.getValue();

Pair<String, String> pair = GsonUtil.getGson().fromJson(
value.toString(), new TypeToken<Pair<String, String>>() {
}.getType()
);
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair<String, String> data) {
// Scound must be player uuid
String second = data.getValue();

String first = pair.getKey();
String second = pair.getValue();

Optional<LegacyPlayerDataService> legacyPlayerDataService =
LegacyPlayerDataService.getLegacyPlayerDataService(first);

legacyPlayerDataService.ifPresent(service -> L1ToL2DataSyncTask.of(UUID.fromString(second), service).start().getFuture().whenComplete((aVoid, throwable) -> {
L1ToL2DataSyncTask.of(UUID.fromString(second), legacyPlayerDataService).start().getFuture().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
Log.error("Error while syncing player data", throwable);
return;
}

rStream.remove(streamMessageId);
}));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.legacy.library.player.task.redis.impl;

import net.legacy.library.player.annotation.RStreamAccepterRegister;
import net.legacy.library.player.service.LegacyPlayerDataService;
import net.legacy.library.player.task.redis.RStreamAccepterInterface;
import org.apache.commons.lang3.tuple.Pair;
import org.redisson.api.RStream;
Expand All @@ -17,18 +18,13 @@ public String getActionName() {
return "player-data-update-name";
}

@Override
public String getTargetLegacyPlayerDataServiceName() {
return null;
}

@Override
public boolean isRecodeLimit() {
return true;
}

@Override
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, Pair<String, String> data) {
// TODO
public void accept(RStream<Object, Object> rStream, StreamMessageId streamMessageId, LegacyPlayerDataService legacyPlayerDataService, Pair<String, String> data) {

}
}

0 comments on commit 48efd0e

Please sign in to comment.