Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/FishPiOffical/rhythm
Browse files Browse the repository at this point in the history
  • Loading branch information
csfwff committed Dec 30, 2024
2 parents bc402a8 + b1c8dec commit e679ad1
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 72 deletions.
37 changes: 37 additions & 0 deletions src/main/java/org/b3log/symphony/cache/FollowingCountCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Rhythm - A modern community (forum/BBS/SNS/blog) platform written in Java.
* Modified version from Symphony, Thanks Symphony :)
* Copyright (C) 2012-present, b3log.org
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.b3log.symphony.cache;

import org.b3log.latke.ioc.Singleton;
import org.json.JSONObject;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

@Singleton
public class FollowingCountCache {

public static final Map<String, JSONObject> COUNT_CACHE = Collections.synchronizedMap(new LinkedHashMap<>() {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > 2000;
}
});
}
59 changes: 54 additions & 5 deletions src/main/java/org/b3log/symphony/processor/ChatroomProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ public void nodePush(final RequestContext context) {
}
break;
}
context.renderJSON(StatusCodes.SUCC);
context.renderMsg("数据上传成功!");
}

public void getNode(final RequestContext context) {
Expand Down Expand Up @@ -303,20 +305,67 @@ public void getNode(final RequestContext context) {
}
ret.put(Keys.DATA, wsScheme + "://" + wsHost + port2 + "/chat-room-channel?apiKey=" + key);
} else {
Map.Entry<String, Integer> minEntry = null;
// 按权重分配节点
StringBuilder logBuilder = new StringBuilder();
Map.Entry<String, Double> selectedNode = null;
int totalWeight = NodeUtil.nodeWeights.values().stream().mapToInt(Integer::intValue).sum();
int totalClients = NodeUtil.wsOnline.size();
logBuilder.append("=== 分配日志开始 ===\n");
logBuilder.append("总客户端数: ").append(totalClients).append(", 总权重: ").append(totalWeight).append("\n");
logBuilder.append("节点状态:\n");
for (Map.Entry<String, Integer> entry : NodeUtil.wsOnline.entrySet()) {
if (minEntry == null || entry.getValue() < minEntry.getValue()) {
minEntry = entry;
String node = entry.getKey();
int currentClients = entry.getValue();
int weight = NodeUtil.nodeWeights.getOrDefault(node, 1); // 默认权重为 1

// 计算期望客户端数
double expectedClients = (double) totalClients * weight / totalWeight;
// 计算实际/期望比值
double ratio = currentClients / Math.max(1, expectedClients);

logBuilder.append("节点: ").append(node)
.append(", 当前客户端数: ").append(currentClients)
.append(", 权重: ").append(weight)
.append(", 期望客户端数: ").append(String.format("%.2f", expectedClients))
.append(", 比值: ").append(String.format("%.2f", ratio)).append("\n");

// 选择比值最小的节点,比值相同时,选择权重更高的节点
if (selectedNode == null || ratio < selectedNode.getValue() ||
(ratio == selectedNode.getValue() && weight > NodeUtil.nodeWeights.get(selectedNode.getKey()))) {
selectedNode = new AbstractMap.SimpleEntry<>(node, ratio);
}

}

// 分配选中的节点
if (selectedNode != null) {
String allocatedNode = selectedNode.getKey();
NodeUtil.wsOnline.put(allocatedNode, NodeUtil.wsOnline.getOrDefault(allocatedNode, 0) + 1);
logBuilder.append("选中节点: ").append(allocatedNode).append("\n");
logBuilder.append("=== 分配日志结束 ===\n");
System.out.println(logBuilder);
// 返回节点信息
ret.put(Keys.DATA, allocatedNode + "?apiKey=" + key);
ret.put(Keys.MSG, NodeUtil.nodeNickNames.get(allocatedNode));
} else {
// 平均分配
Map.Entry<String, Integer> minEntry = null;
for (Map.Entry<String, Integer> entry : NodeUtil.wsOnline.entrySet()) {
if (minEntry == null || entry.getValue() < minEntry.getValue()) {
minEntry = entry;
}
}
NodeUtil.wsOnline.put(minEntry.getKey(), NodeUtil.wsOnline.getOrDefault(minEntry.getKey(), 0) + 1);
ret.put(Keys.DATA, minEntry.getKey() + "?apiKey=" + key);
ret.put(Keys.MSG, NodeUtil.nodeNickNames.get(minEntry.getKey()));
}
ret.put(Keys.DATA, minEntry.getKey() + "?apiKey=" + key);
ret.put(Keys.MSG, NodeUtil.nodeNickNames.get(minEntry.getKey()));
}
JSONArray data = new JSONArray();
for (Map.Entry<String, Integer> entry : NodeUtil.wsOnline.entrySet()) {
JSONObject node = new JSONObject();
node.put("node", entry.getKey());
node.put("name", NodeUtil.nodeNickNames.get(entry.getKey()));
node.put("weight", NodeUtil.nodeWeights.get(entry.getKey()));
node.put("online", entry.getValue());
data.put(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public static boolean record(final RequestContext context) {
}
}
StringBuilder userSessionList = new StringBuilder();
userSessionList.append("<details><summary>北京一区(" + ChatroomChannel.SESSIONS.size() + "人)</summary>");
userSessionList.append("<details><summary>钉子户(" + ChatroomChannel.SESSIONS.size() + "人)</summary>");
for (Map.Entry<String, Integer> s : sessionList.entrySet()) {
userSessionList.append(s.getKey() + " " + s.getValue() + "<br>");
}
Expand Down Expand Up @@ -388,9 +388,9 @@ public static boolean record(final RequestContext context) {
Map<String, Long> result = ChatroomChannel.check();
StringBuilder stringBuilder = new StringBuilder();
if (result.isEmpty()) {
sendBotMsg("北京一区:报告!没有超过6小时未活跃的成员,一切都很和谐~");
sendBotMsg("钉子户:报告!没有超过6小时未活跃的成员,一切都很和谐~");
} else {
stringBuilder.append("北京一区:报告!成功扫描超过6小时未活跃的成员,并已将他们断开连接:<br>");
stringBuilder.append("钉子户:报告!成功扫描超过6小时未活跃的成员,并已将他们断开连接:<br>");
stringBuilder.append("<details><summary>不活跃用户列表</summary>");
for (String i : result.keySet()) {
long time = result.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public class ChatroomChannel implements WebSocketChannel {
*/
@Override
public void onConnect(final WebSocketSession session) {
// 发送过期客户端消息
String text = "{\"userOId\":1630399192600,\"userAvatarURL\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"userNickname\":\"阿达\",\"oId\":\"" + System.currentTimeMillis() + "\",\"userName\":\"adlered\",\"type\":\"msg\",\"content\":\"<p><\\/p><h3>您正在使用过期的客户端<\\/h3>\\n<p>您的客户端正在使用过期的协议连接到摸鱼派聊天室,该协议已被停用,无法接收消息。<\\/p>\\n<p>请将客户端更新到最新版本以享受更快的聊天室服务,如已是最新版本,请联系客户端开发者更新客户端。<\\/p>\\n<p>新版摸鱼派聊天室协议开发指南:<a href=\\\"https://fishpi.cn/article/1733591297543\\\" target=\\\"_blank\\\" rel=\\\"nofollow\\\">https://fishpi.cn/article/1733591297543<\\/a><\\/p>\\n<p><\\/p>\",\"md\":\"### 您正在使用过期的客户端\\n\\n您的客户端正在使用过期的协议连接到摸鱼派聊天室,该协议已被停用,无法接收消息。\\n\\n请将客户端更新到最新版本以享受更快的聊天室服务,如已是最新版本,请联系客户端开发者更新客户端。\\n\\n新版摸鱼派聊天室协议开发指南:https://fishpi.cn/article/1733591297543\",\"userAvatarURL20\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"sysMetal\":\"{\\\"list\\\":[{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派铁粉\\\",\\\"description\\\":\\\"捐助摸鱼派达1024RMB; 编号No.9\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht3-b97ea102.jpg&backcolor=ee3a8c&fontcolor=ffffff\\\",\\\"enabled\\\":true},{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派忠粉\\\",\\\"description\\\":\\\"捐助摸鱼派达256RMB; 编号No.20\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht2-bea67b29.jpg&backcolor=87cefa&fontcolor=efffff\\\",\\\"enabled\\\":true},{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派粉丝\\\",\\\"description\\\":\\\"捐助摸鱼派达16RMB; 编号No.38\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht1-d8149de4.jpg&backcolor=ffffff&fontcolor=ff3030\\\",\\\"enabled\\\":true}]}\",\"client\":\"Web/PC网页端\",\"time\":\"" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + "\",\"userAvatarURL210\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"userAvatarURL48\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\"}";
sendText(session, text);
session.close();
/**
String userStr = session.getHttpSession().getAttribute(User.USER);
try {
userStr = ApiProcessor.getUserByKey(session.getParameter("apiKey")).toString();
Expand Down Expand Up @@ -114,12 +119,10 @@ public void onConnect(final WebSocketSession session) {
AdminProcessor.manager.onMessageSent(4, msgStr.length());
// 保存 Active 信息
userActive.put(user.optString("userName"), System.currentTimeMillis());
// 发送过期客户端消息
String text = "{\"userOId\":1630399192600,\"userAvatarURL\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"userNickname\":\"阿达\",\"oId\":\"" + System.currentTimeMillis() + "\",\"userName\":\"adlered\",\"type\":\"msg\",\"content\":\"<p><\\/p><h3>您正在使用过期的客户端<\\/h3>\\n<p>您的客户端正在使用过期的协议连接到摸鱼派聊天室,该协议将于 2025 年 3 月 1 日 被停用。<\\/p>\\n<p>请将客户端更新到最新版本以享受更快的聊天室服务,如已是最新版本,请联系客户端开发者更新客户端。<\\/p>\\n<p>新版摸鱼派聊天室协议开发指南:<a href=\\\"https://fishpi.cn/article/1733591297543\\\" target=\\\"_blank\\\" rel=\\\"nofollow\\\">https://fishpi.cn/article/1733591297543<\\/a><\\/p>\\n<p><\\/p>\",\"md\":\"### 您正在使用过期的客户端\\n\\n您的客户端正在使用过期的协议连接到摸鱼派聊天室,该协议将于 2025年3月1日 被停用。\\n\\n请将客户端更新到最新版本以享受更快的聊天室服务,如已是最新版本,请联系客户端开发者更新客户端。\\n\\n新版摸鱼派聊天室协议开发指南:https://fishpi.cn/article/1733591297543\",\"userAvatarURL20\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"sysMetal\":\"{\\\"list\\\":[{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派铁粉\\\",\\\"description\\\":\\\"捐助摸鱼派达1024RMB; 编号No.9\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht3-b97ea102.jpg&backcolor=ee3a8c&fontcolor=ffffff\\\",\\\"enabled\\\":true},{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派忠粉\\\",\\\"description\\\":\\\"捐助摸鱼派达256RMB; 编号No.20\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht2-bea67b29.jpg&backcolor=87cefa&fontcolor=efffff\\\",\\\"enabled\\\":true},{\\\"data\\\":\\\"\\\",\\\"name\\\":\\\"摸鱼派粉丝\\\",\\\"description\\\":\\\"捐助摸鱼派达16RMB; 编号No.38\\\",\\\"attr\\\":\\\"url=https://file.fishpi.cn/2021/12/ht1-d8149de4.jpg&backcolor=ffffff&fontcolor=ff3030\\\",\\\"enabled\\\":true}]}\",\"client\":\"Web/PC网页端\",\"time\":\"" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis()) + "\",\"userAvatarURL210\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\",\"userAvatarURL48\":\"https://file.fishpi.cn/2023/12/blob-1d3b18ec.png\"}";
sendText(session, text);
} else {
session.close();
}
**/
}

private static SimpleCurrentLimiter customMessageCurrentLimit = new SimpleCurrentLimiter(60, 6);
Expand Down
12 changes: 1 addition & 11 deletions src/main/java/org/b3log/symphony/service/CronMgmtService.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,23 +299,13 @@ public void start() {
Symphonys.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
NodeUtil.init();
} catch (final Exception e) {
LOGGER.log(Level.ERROR, "Executes cron failed", e);
} finally {
Stopwatchs.release();
}
}, delay, 1 * 60 * 1000, TimeUnit.MILLISECONDS);
delay += 2000;

Symphonys.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(() -> {
try {
NodeUtil.initOnline();
} catch (final Exception e) {
LOGGER.log(Level.ERROR, "Executes cron failed", e);
} finally {
Stopwatchs.release();
}
}, delay, 3 * 60 * 1000, TimeUnit.MILLISECONDS);
}, delay, 1 * 60 * 1000, TimeUnit.MILLISECONDS);
delay += 2000;
}

Expand Down
35 changes: 33 additions & 2 deletions src/main/java/org/b3log/symphony/service/FollowQueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.b3log.latke.repository.*;
import org.b3log.latke.service.annotation.Service;
import org.b3log.latke.util.Stopwatchs;
import org.b3log.symphony.cache.FollowingCountCache;
import org.b3log.symphony.model.Follow;
import org.b3log.symphony.repository.ArticleRepository;
import org.b3log.symphony.repository.FollowRepository;
Expand Down Expand Up @@ -395,9 +396,23 @@ public long getFollowingCount(final String followerId, final int followingType)
final List<Filter> filters = new ArrayList<>();
filters.add(new PropertyFilter(Follow.FOLLOWER_ID, FilterOperator.EQUAL, followerId));
filters.add(new PropertyFilter(Follow.FOLLOWING_TYPE, FilterOperator.EQUAL, followingType));
JSONObject cacheJSON = new JSONObject();
cacheJSON.put(Follow.FOLLOWER_ID, followerId);
cacheJSON.put(Follow.FOLLOWING_TYPE, followingType);
JSONObject result = FollowingCountCache.COUNT_CACHE.get(cacheJSON.toString());
if (result != null) {
if (result.optLong("time") > System.currentTimeMillis() - FIVE_MINUTES_IN_MILLIS) {
return result.optLong("count");
}
}
final Query query = new Query().setFilter(new CompositeFilter(CompositeFilterOperator.AND, filters));
try {
return followRepository.count(query);
long count = followRepository.count(query);
JSONObject json = new JSONObject();
json.put("count", count);
json.put("time", System.currentTimeMillis());
FollowingCountCache.COUNT_CACHE.put(cacheJSON.toString(), json);
return count;
} catch (final RepositoryException e) {
LOGGER.log(Level.ERROR, "Counts following count failed", e);
return 0;
Expand All @@ -407,6 +422,8 @@ public long getFollowingCount(final String followerId, final int followingType)
}
}

private static final long FIVE_MINUTES_IN_MILLIS = 1000 * 60 * 5;

/**
* Gets the follower count of a following specified by the given following id and following type.
*
Expand All @@ -418,9 +435,23 @@ public long getFollowerCount(final String followingId, final int followingType)
final List<Filter> filters = new ArrayList<>();
filters.add(new PropertyFilter(Follow.FOLLOWING_ID, FilterOperator.EQUAL, followingId));
filters.add(new PropertyFilter(Follow.FOLLOWING_TYPE, FilterOperator.EQUAL, followingType));
JSONObject cacheJSON = new JSONObject();
cacheJSON.put(Follow.FOLLOWING_ID, followingId);
cacheJSON.put(Follow.FOLLOWING_TYPE, followingType);
JSONObject result = FollowingCountCache.COUNT_CACHE.get(cacheJSON.toString());
if (result != null) {
if (result.optLong("time") > System.currentTimeMillis() - FIVE_MINUTES_IN_MILLIS) {
return result.optLong("count");
}
}
final Query query = new Query().setFilter(new CompositeFilter(CompositeFilterOperator.AND, filters));
try {
return followRepository.count(query);
long count = followRepository.count(query);
JSONObject json = new JSONObject();
json.put("count", count);
json.put("time", System.currentTimeMillis());
FollowingCountCache.COUNT_CACHE.put(cacheJSON.toString(), json);
return count;
} catch (final RepositoryException e) {
LOGGER.log(Level.ERROR, "Counts follower count failed", e);
return 0;
Expand Down
Loading

0 comments on commit e679ad1

Please sign in to comment.