Skip to content

Commit

Permalink
Merge pull request #868 from LittleFish-233/master
Browse files Browse the repository at this point in the history
看不懂rabbitmq新版的异步接口,于是回退代码到旧版本
  • Loading branch information
LittleFish-233 authored Feb 2, 2025
2 parents 1ff32e2 + 5b871ab commit 45b085b
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
</ItemGroup>
</Project>
77 changes: 41 additions & 36 deletions CnGalWebSite/CnGalWebSite.EventBus/Services/EventBusRabbitMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public class EventBusRabbitMQ(IConfiguration configuration, ILogger<EventBusRabb
private readonly ConcurrentDictionary<string, TaskCompletionSource<byte[]>> callbackMapper = new();

private IConnection _connection;
private IChannel _channel;
private IModel _channel;
private string replyQueueName;

public async Task Init()
public void Init()
{
if (_channel == null && _connection == null)
{
Expand All @@ -30,15 +30,15 @@ public async Task Init()
UserName = _configuration["EventBus_UserName"],
Password = _configuration["EventBus_Password"],
};
_connection = await factory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}
}

public async Task SendMessage<T>(string queue, T message)
public void SendMessage<T>(string queue, T message)
{
await Init();
await _channel.QueueDeclareAsync(
Init();
_channel.QueueDeclare(
queue: queue, // 队列名称
durable: true, // 是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息
exclusive: false, // 是否排他,如果一个队列声明为排他队列,该队列仅对时候次声明它的连接可见,并在连接断开时自动删除
Expand All @@ -48,30 +48,30 @@ await _channel.QueueDeclareAsync(

var json = JsonSerializer.Serialize(message);
var body = Encoding.UTF8.GetBytes(json);
await _channel.BasicPublishAsync(exchange: "", routingKey: queue, body: body);
_channel.BasicPublish(exchange: "", routingKey: queue, body: body);
}

public async Task SubscribeMessages<T>(string queue, Action<T> action)
public void SubscribeMessages<T>(string queue, Action<T> action)
{
await Init();
await _channel.QueueDeclareAsync(
Init();
_channel.QueueDeclare(
queue: queue, // 队列名称
durable: true, // 是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息
exclusive: false, // 是否排他,如果一个队列声明为排他队列,该队列仅对时候次声明它的连接可见,并在连接断开时自动删除
autoDelete: false, // 是否自动删除,自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
arguments: null // 设置队列的其他参数
);

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, eventArgs) =>
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, eventArgs) =>
{
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

var obj = JsonSerializer.Deserialize<T>(message);
action(obj);
};
await _channel.BasicConsumeAsync(queue: queue, autoAck: true, consumer: consumer);
_channel.BasicConsume(queue: queue, autoAck: true, consumer: consumer);
}

/// <summary>
Expand All @@ -81,10 +81,10 @@ await _channel.QueueDeclareAsync(
/// <typeparam name="TOutput"></typeparam>
/// <param name="queue"></param>
/// <param name="func"></param>
public async Task CreateRpcServer<TInput, TOutput>(string queue, Func<TInput, Task<TOutput>> func)
public void CreateRpcServer<TInput, TOutput>(string queue, Func<TInput, Task<TOutput>> func)
{
await Init();
await _channel.QueueDeclareAsync(
Init();
_channel.QueueDeclare(
queue: queue,
durable: false,
exclusive: false,
Expand All @@ -93,15 +93,17 @@ await _channel.QueueDeclareAsync(
);

// 限制每次只传递一个事件
await _channel.BasicQosAsync(0, 1, false);
_channel.BasicQos(0, 1, false);

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
TOutput response = default;

var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId;

try
{
Expand All @@ -122,42 +124,42 @@ await _channel.QueueDeclareAsync(
var json = JsonSerializer.Serialize(response);
var responseBytes = Encoding.UTF8.GetBytes(json);
// 发布
await _channel.BasicPublishAsync("", props.ReplyTo, false, new BasicProperties(), responseBytes);
_channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "回传RPC结果失败");
}
finally
{
await _channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}

}
};
await _channel.BasicConsumeAsync(queue: queue, autoAck: false, consumer: consumer);
_channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
}

/// <summary>
/// 创建RPC客户端 只调用一次
/// </summary>
public async Task CreateRpcClient()
public void CreateRpcClient()
{
await Init();
Init();
// declare a server-named queue
replyQueueName = (await _channel.QueueDeclareAsync()).QueueName;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
replyQueueName = _channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
tcs.TrySetResult(body);
};

await _channel.BasicConsumeAsync(consumer: consumer,
queue: replyQueueName,
autoAck: true);
_channel.BasicConsume(consumer: consumer,
queue: replyQueueName,
autoAck: true);
}

/// <summary>
Expand All @@ -169,12 +171,12 @@ await _channel.BasicConsumeAsync(consumer: consumer,
/// <param name="input"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<TOutput> CallRpcAsync<TInput, TOutput>(string queue, TInput input, CancellationToken cancellationToken = default)
public Task<TOutput> CallRpcAsync<TInput, TOutput>(string queue, TInput input, CancellationToken cancellationToken = default)
{
await Init();
Init();

// 创建唯一标识
BasicProperties props = new BasicProperties();
IBasicProperties props = _channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = replyQueueName;
Expand All @@ -189,7 +191,10 @@ public async Task<TOutput> CallRpcAsync<TInput, TOutput>(string queue, TInput in
callbackMapper.TryAdd(correlationId, tcs);

// 发布
await _channel.BasicPublishAsync(string.Empty, queue, true, props, messageBytes);
_channel.BasicPublish(exchange: string.Empty,
routingKey: queue,
basicProperties: props,
body: messageBytes);

cancellationToken.Register(() => callbackMapper.TryRemove(correlationId, out _));

Expand All @@ -208,7 +213,7 @@ public async Task<TOutput> CallRpcAsync<TInput, TOutput>(string queue, TInput in
return output;
}, cancellationToken);

return await ret;
return ret;
}

public void Dispose()
Expand Down
8 changes: 4 additions & 4 deletions CnGalWebSite/CnGalWebSite.EventBus/Services/IEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ namespace CnGalWebSite.EventBus.Services
{
public interface IEventBus
{
Task SendMessage<T>(string queue, T message);
void SendMessage<T>(string queue, T message);

Task SubscribeMessages<T>(string queue, Action<T> action);
void SubscribeMessages<T>(string queue, Action<T> action);

Task CreateRpcServer<TInput, TOutput>(string queue, Func<TInput, Task<TOutput>> func);
void CreateRpcServer<TInput, TOutput>(string queue, Func<TInput, Task<TOutput>> func);

Task CreateRpcClient();
void CreateRpcClient();

Task<TOutput> CallRpcAsync<TInput, TOutput>(string queue, TInput input, CancellationToken cancellationToken = default);
}
Expand Down
14 changes: 7 additions & 7 deletions CnGalWebSite/CnGalWebSite.ExamineService/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
});
});

_logger.LogInformation("客户端上线");
_eventBusService.InitRpcClient();
//_logger.LogInformation("客户端上线");
//_eventBusService.InitRpcClient();

var re = await _eventBusService.CallSensitiveWordsCheck(new SensitiveWordsCheckModel
{
Texts = ["傻逼煞笔沙比","12122112"]
}, stoppingToken);
//var re = await _eventBusService.CallSensitiveWordsCheck(new SensitiveWordsCheckModel
//{
// Texts = ["傻逼煞笔沙比","12122112"]
//}, stoppingToken);

_logger.LogError("检查到 {re} 个敏感词:\n {}", re.Words.Count, string.Join("\n ", re.Words));
//_logger.LogError("检查到 {re} 个敏感词:\n {}", re.Words.Count, string.Join("\n ", re.Words));



Expand Down
48 changes: 24 additions & 24 deletions CnGalWebSite/CnGalWebSite.Kanban.ChatGPT/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,38 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
_eventBusService.InitRpcClient();


// 1
var message = "你是傻逼";
// _logger.LogInformation("客户端发送消息:{message}", message);
//// 1
//var message = "你是傻逼";
//// _logger.LogInformation("客户端发送消息:{message}", message);

var re = await _eventBusService.CallKanbanChatGPT(new EventBus.Models.KanbanChatGPTSendModel
{
Message = message,
IsFirst = true,
UserId = "123",
MessageMax = 3
}, stoppingToken);
//var re = await _eventBusService.CallKanbanChatGPT(new EventBus.Models.KanbanChatGPTSendModel
//{
// Message = message,
// IsFirst = true,
// UserId = "123",
// MessageMax = 3
//}, stoppingToken);


if (re.Success == false)
{
_logger.LogError("接收失败:{re}", re.Message);
}
//if (re.Success == false)
//{
// _logger.LogError("接收失败:{re}", re.Message);
//}



// 2
message = "你是傻逼";
//// 2
//message = "你是傻逼";

re = await _eventBusService.CallKanbanChatGPT(new EventBus.Models.KanbanChatGPTSendModel
{
Message = message,
IsFirst = false,
UserId = "123",
MessageMax = 3
}, stoppingToken);
//re = await _eventBusService.CallKanbanChatGPT(new EventBus.Models.KanbanChatGPTSendModel
//{
// Message = message,
// IsFirst = false,
// UserId = "123",
// MessageMax = 3
//}, stoppingToken);

_logger.LogInformation("接收消息{re}", re.Success ? "成功" : "失败");
//_logger.LogInformation("接收消息{re}", re.Success ? "成功" : "失败");

//// 3
//message = "不对哦~《硅心》是由呐呐呐制作组制作的galgame。讲述了突然到来的机器人少女,打破了某自由插画师的隐世单机生活,一场人机恋爱喜剧由此开的故事。";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@
{
image = "weibo.png";
}
else if (Model.PublishPlatformName.ToLower().Contains("贴吧"))
else if (Model.PublishPlatformName.ToLower().Contains("贴吧") || Model.PublishPlatformName.ToLower().Contains("百度"))
{
image = "BaiDuWiki.png";
}
Expand Down

0 comments on commit 45b085b

Please sign in to comment.