From 5803b75e3b4b4211e4ea6601425bb31336c0001c Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Fri, 20 Dec 2024 17:48:46 +0900 Subject: [PATCH] temp: remove all async dealermethods --- src/Libplanet.Net/Transports/NetMQChannel.cs | 30 +++++++++++++++---- .../Transports/NetMQTransport.cs | 1 + 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/Libplanet.Net/Transports/NetMQChannel.cs b/src/Libplanet.Net/Transports/NetMQChannel.cs index 158f92ea23..c3165a2ad9 100644 --- a/src/Libplanet.Net/Transports/NetMQChannel.cs +++ b/src/Libplanet.Net/Transports/NetMQChannel.cs @@ -65,6 +65,7 @@ public void Open() public async IAsyncEnumerable SendMessageAsync( NetMQMessage message, + TimeSpan? timeout, int expectedResponses, [EnumeratorCancellation] CancellationToken cancellationToken) { @@ -72,6 +73,7 @@ public async IAsyncEnumerable SendMessageAsync( await _requests.Writer.WriteAsync( new MessageRequest( message, + timeout, expectedResponses, channel, cancellationToken), @@ -88,14 +90,19 @@ private async Task ProcessRuntime(CancellationToken ct) { using var dealer = new DealerSocket(); dealer.Options.DisableTimeWait = true; - dealer.Connect(await _peer.ResolveNetMQAddressAsync()); + var address = await _peer.ResolveNetMQAddressAsync(); + _logger.Debug("[NetMQChannel] Connecting {Address}", address); + dealer.Connect(address); while (!ct.IsCancellationRequested) { MessageRequest req = await _requests.Reader.ReadAsync(ct); _lastUpdated = DateTimeOffset.UtcNow; CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(ct, req.CancellationToken); - _logger.Debug("[NetMQChannel] Trying to send message {Message}", req.Message); + _logger.Debug( + "[NetMQChannel] Trying to send message {Message} (count: {ExpectedResponses})", + req.Message, + req.ExpectedResponses); if (!dealer.TrySendMultipartMessage(req.Message)) { _logger.Debug( @@ -109,9 +116,18 @@ private async Task ProcessRuntime(CancellationToken ct) foreach (var i in Enumerable.Range(0, req.ExpectedResponses)) { - NetMQMessage raw = await dealer.ReceiveMultipartMessageAsync( - cancellationToken: linked.Token - ); + _logger.Debug( + "[NetMQChannel] Waiting for replies... (#{Index})", i); + var raw = new NetMQMessage(); + if (!dealer.TryReceiveMultipartMessage( + req.Timeout ?? TimeSpan.FromSeconds(1), + ref raw)) + { + break; + } + + _logger.Debug( + "[NetMQChannel] Successfully received replies #{Index}", i); _lastUpdated = DateTimeOffset.UtcNow; await req.Channel.Writer.WriteAsync(raw, linked.Token); @@ -125,11 +141,13 @@ private readonly struct MessageRequest { public MessageRequest( NetMQMessage message, + TimeSpan? timeout, in int expectedResponses, Channel channel, CancellationToken cancellationToken) { Message = message; + Timeout = timeout; ExpectedResponses = expectedResponses; Channel = channel; CancellationToken = cancellationToken; @@ -137,6 +155,8 @@ public MessageRequest( public NetMQMessage Message { get; } + public TimeSpan? Timeout { get; } + public int ExpectedResponses { get; } public Channel Channel { get; } diff --git a/src/Libplanet.Net/Transports/NetMQTransport.cs b/src/Libplanet.Net/Transports/NetMQTransport.cs index 6d0a4fbd8b..3f6159f317 100644 --- a/src/Libplanet.Net/Transports/NetMQTransport.cs +++ b/src/Libplanet.Net/Transports/NetMQTransport.cs @@ -343,6 +343,7 @@ CancellationToken cancellationToken await foreach (var raw in channel.SendMessageAsync( rawMessage, + timeout, expectedResponses, linkedCt)) {