Skip to content

Commit

Permalink
🎨 rhyus beta1
Browse files Browse the repository at this point in the history
  • Loading branch information
adlered committed Dec 7, 2024
1 parent d9897b0 commit 3b27efe
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/main/java/org/b3log/symphony/processor/ChatroomProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.b3log.latke.Keys;
import org.b3log.latke.Latkes;
import org.b3log.latke.http.Dispatcher;
import org.b3log.latke.http.RequestContext;
import org.b3log.latke.http.WebSocketSession;
Expand Down Expand Up @@ -243,6 +244,15 @@ public void getNode(final RequestContext context) {
ret.put(Keys.MSG, "");
if (NodeUtil.wsOnline == null || NodeUtil.wsOnline.isEmpty()) {
ret.put(Keys.DATA, "wss://fishpi.cn/chat-room-channel?apiKey=" + key);
final String serverScheme = Latkes.getServerScheme();
String wsScheme = StringUtils.containsIgnoreCase(serverScheme, "https") ? "wss" : "ws";
String wsHost = Latkes.getServerHost();
String port1 = Latkes.getServerPort();
String port2 = "";
if (StringUtils.isNotBlank(port1) && !"80".equals(port1) && !"443".equals(port1)) {
port2 = ":" + port1;
}
ret.put(Keys.DATA, wsScheme + "://" + wsHost + port2 + "/chat-room-channel?apiKey=" + key);
} else {
Map.Entry<String, Integer> minEntry = null;
for (Map.Entry<String, Integer> entry : NodeUtil.wsOnline.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public static boolean record(final RequestContext context) {
// ==! 前置参数 !==

// ==? 判断是否在 Channel 中 ==?
boolean atChannel = false;
/*boolean atChannel = false;
for (Map.Entry<WebSocketSession, JSONObject> onlineUser : ChatroomChannel.onlineUsers.entrySet()) {
try {
String uName = onlineUser.getValue().optString(User.USER_NAME);
Expand All @@ -114,7 +114,7 @@ public static boolean record(final RequestContext context) {
if (!atChannel) {
context.renderJSON(StatusCodes.ERR).renderMsg("发送失败:当前未在聊天室中,请刷新页面。");
return false;
}
}*/
// ==! 判断是否在 Channel 中 ==!

// ==? 发弹幕频率限制 ?==
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/b3log/symphony/service/CronMgmtService.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public void start() {
Stopwatchs.release();
}
}, delay, 60 * 1000, TimeUnit.MILLISECONDS);
delay += 2000;

Symphonys.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
Expand All @@ -303,7 +304,8 @@ public void start() {
} finally {
Stopwatchs.release();
}
}, delay, 10 * 60 * 1000, TimeUnit.MILLISECONDS);
}, delay, 1 * 60 * 1000, TimeUnit.MILLISECONDS);
delay += 2000;

Symphonys.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
Expand All @@ -313,7 +315,7 @@ public void start() {
} finally {
Stopwatchs.release();
}
}, delay, 5 * 60 * 1000, TimeUnit.MILLISECONDS);
}, delay, 3 * 60 * 1000, TimeUnit.MILLISECONDS);
delay += 2000;
}

Expand Down
90 changes: 88 additions & 2 deletions src/main/java/org/b3log/symphony/util/NodeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.b3log.latke.Latkes;
import org.b3log.latke.ioc.BeanManager;
import org.b3log.latke.model.User;
import org.b3log.symphony.model.Common;
import org.b3log.symphony.model.UserExt;
import org.b3log.symphony.processor.channel.ChatroomChannel;
import org.b3log.symphony.service.AvatarQueryService;
import org.json.JSONArray;
import org.json.JSONObject;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -105,14 +114,79 @@ public static void initOnline() {
onlineList.put(jsonArray.get(j));
}
wsOnline.put(i, jsonArray.length());
webSocket.sendClose(1000, "Nornal Closure");
LOGGER.log(Level.INFO, "Remote " + i + " online list updated. count=" + jsonArray.length());
} catch (Exception e) {
System.out.println(serverUri + " No response within 10 seconds. giveup.");
webSocket.sendClose(1000, "Nornal Closure");
LOGGER.log(Level.ERROR, serverUri + " No response within 10 seconds. giveup.", e);
}
} catch (Exception ignored) {
}
}
remoteUsers = onlineList;

// 推送在线信息给子节点
final BeanManager beanManager = BeanManager.getInstance();
final AvatarQueryService avatarQueryService = beanManager.getReference(AvatarQueryService.class);

Map<String, JSONObject> filteredOnlineUsers = new HashMap<>();
for (JSONObject object : ChatroomChannel.onlineUsers.values()) {
String name = object.optString(User.USER_NAME);
filteredOnlineUsers.put(name, object);
}

for (int i = 0; i < NodeUtil.remoteUsers.length(); i++) {
try {
JSONObject temp = NodeUtil.remoteUsers.getJSONObject(i);
filteredOnlineUsers.put(temp.optString(User.USER_NAME), temp);
} catch (Exception ignored) {
}
}

JSONArray onlineArray = new JSONArray();
for (String user : filteredOnlineUsers.keySet()) {
JSONObject object = filteredOnlineUsers.get(user);

String avatar = object.optString(UserExt.USER_AVATAR_URL);
String homePage = Latkes.getStaticServePath() + "/member/" + user;

JSONObject generated = new JSONObject();
generated.put(User.USER_NAME, user);
generated.put(UserExt.USER_AVATAR_URL, avatar);
avatarQueryService.fillUserAvatarURL(generated);
generated.put("homePage", homePage);
onlineArray.put(generated);
}

JSONObject result = new JSONObject();
result.put(Common.ONLINE_CHAT_CNT, filteredOnlineUsers.size());
result.put(Common.TYPE, "online");
result.put("users", onlineArray);
result.put("discussing", ChatroomChannel.discussing);

for (String i : uriNodes) {
try {
String serverUri = i + "?apiKey=" + Symphonys.get("chatroom.node.adminKey");
SSLContext sslContext = createInsecureSSLContext();
HttpClient client = HttpClient.newBuilder()
.sslContext(sslContext)
.build();
CompletableFuture<String> responseFuture = new CompletableFuture<>();
WebSocket webSocket = client.newWebSocketBuilder()
.buildAsync(URI.create(serverUri), new WebSocketListener(responseFuture))
.join();
webSocket.sendText(Symphonys.get("chatroom.node.adminKey") + ":::push " + result, true);
try {
String response = responseFuture.get(10, TimeUnit.SECONDS);
webSocket.sendClose(1000, "Nornal Closure");
LOGGER.log(Level.INFO, "Remote " + i + " has received the full online list.");
} catch (Exception e) {
webSocket.sendClose(1000, "Nornal Closure");
LOGGER.log(Level.ERROR, "Push online list to " + serverUri + " has no response within 10 seconds. giveup.", e);
}
} catch (Exception ignored) {
}
}
}

// 创建忽略 SSL 证书的 SSLContext
Expand Down Expand Up @@ -151,9 +225,21 @@ public void onOpen(WebSocket webSocket) {
webSocket.request(1); // 请求接收一条消息
}

private final StringBuilder messageBuffer = new StringBuilder();
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
responseFuture.complete(data.toString()); // 完成响应的 Future
messageBuffer.append(data);

if (last) {
String completeMessage = messageBuffer.toString();
messageBuffer.setLength(0);

if (!responseFuture.isDone()) {
responseFuture.complete(completeMessage);
}
}

webSocket.request(1);
return CompletableFuture.completedFuture(null);
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/resources/skins/classic/chat-room.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,14 @@
Label.hasMore = true;
ChatRoom.init();
// Init [ChatRoom] channel
ChatRoomChannel.init("${wsScheme}://${serverHost}:${serverPort}${contextPath}/chat-room-channel");
$.ajax({
url: Label.servePath + '/chat-room/node/get',
type: 'GET',
cache: false,
success: function (result) {
ChatRoomChannel.init(result.data);
}
});
var page = 0;
var pointsArray = [];
var linesArray = [];
Expand Down
10 changes: 9 additions & 1 deletion src/main/resources/skins/classic/index.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,15 @@
</#if>
}
ChatRoomChannel.init("${wsScheme}://${serverHost}:${serverPort}${contextPath}/chat-room-channel?type=index");
// Init [ChatRoom] channel
$.ajax({
url: Label.servePath + '/chat-room/node/get',
type: 'GET',
cache: false,
success: function (result) {
ChatRoomChannel.init(result.data);
}
});
var chatRoomPictureStatus = "<#if 0 == chatRoomPictureStatus> blur</#if>";
</script>
Expand Down
9 changes: 8 additions & 1 deletion src/main/resources/skins/mobile/chat-room.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,14 @@
Label.hasMore = true;
ChatRoom.init();
// Init [ChatRoom] channel
ChatRoomChannel.init("${wsScheme}://${serverHost}:${serverPort}${contextPath}/chat-room-channel");
$.ajax({
url: Label.servePath + '/chat-room/node/get',
type: 'GET',
cache: false,
success: function (result) {
ChatRoomChannel.init(result.data);
}
});
var page = 0;
var pointsArray = [];
var linesArray = [];
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/symphony.properties
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ lute.engine.url=http://localhost:8249
pay.wechat.mch_id=
pay.wechat.key=

# Rhyus
chatroom.node.url=ws://127.0.0.1:10831
#ws://121.62.31.42:10831
chatroom.node.adminKey=123456

0 comments on commit 3b27efe

Please sign in to comment.