diff --git a/src/SharpPulsar.Test/ConsumerRedeliveryTest.cs b/src/SharpPulsar.Test/ConsumerRedeliveryTest.cs index 0d6f67dc..9f990238 100644 --- a/src/SharpPulsar.Test/ConsumerRedeliveryTest.cs +++ b/src/SharpPulsar.Test/ConsumerRedeliveryTest.cs @@ -1,8 +1,12 @@ using System; +using System.Collections.Generic; using System.Text; using System.Text.Json; using System.Threading.Tasks; +using DotNet.Testcontainers.Builders; using SharpPulsar.Builder; +using SharpPulsar.Interfaces; +using SharpPulsar.Protocol.Proto; using SharpPulsar.Test.Fixture; using SharpPulsar.TestContainer; using Xunit; @@ -43,59 +47,183 @@ public ConsumerRedeliveryTest(ITestOutputHelper output, PulsarFixture fixture) _system = fixture.System; } - [Fact] - public async Task TestUnAckMessageRedeliveryWithReceive() + /// + /// It verifies that redelivered messages are sorted based on the ledger-ids. + ///
+        /// 1. client publishes 100 messages across 50 ledgers
+        /// 2. broker delivers 100 messages to consumer
+        /// 3. consumer ack every alternative message and doesn't ack 50 messages
+        /// 4. broker sorts replay messages based on ledger and redelivers messages ledger by ledger
+        /// 
+ /// + /// + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestOrderedRedelivery(bool ackReceiptEnabled) { - var topic = $"persistent://public/default/async-unack-redelivery-{Guid.NewGuid()}"; + var topic = "persistent://public/default/redelivery-" + DateTimeHelper.CurrentUnixTimeMillis(); + + //broker.conf + //conf.setManagedLedgerMaxEntriesPerLedger(2); + //conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); - var pBuilder = new ProducerConfigBuilder(); - pBuilder.Topic(topic); + var pBuilder = new ProducerConfigBuilder() + .Topic(topic) + .ProducerName("my-producer-name"); var producer = await _client.NewProducerAsync(pBuilder); - const int messageCount = 10; + var builder = new ConsumerConfigBuilder() + .Topic(topic) + .SubscriptionName("s1") + .IsAckReceiptEnabled(ackReceiptEnabled) + .SubscriptionType(CommandSubscribe.SubType.Shared); + var consumer1 = await _client.NewConsumerAsync(builder); - for (var i = 0; i < messageCount; i++) + const int totalMsgs = 100; + + for (var i = 0; i < totalMsgs; i++) { - var receipt = await producer.SendAsync(Encoding.UTF8.GetBytes("my-message-" + i)); - _output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true })); + var message = "my-message-" + i; + var receipt = await producer.SendAsync(Encoding.UTF8.GetBytes(message)); + _output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true })); } - var builder = new ConsumerConfigBuilder(); - builder.Topic(topic); - builder.SubscriptionName("sub-TestUnAckMessageRedeliveryWithReceive"); - builder.AckTimeout(TimeSpan.FromMilliseconds(5000)); - builder.ForceTopicCreation(true); - builder.AcknowledgmentGroupTime(TimeSpan.Zero); - builder.SubscriptionType(Protocol.Proto.CommandSubscribe.SubType.Shared); + + var consumedCount = 0; + var messageIds = new HashSet(); + for (var i = 0; i < totalMsgs; i++) + { + var message = (Message)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); + if (message != null && (consumedCount % 2) == 0) + { + consumer1.Acknowledge(message); + } + else + { + messageIds.Add(message.MessageId); + } + var receivedMessage = Encoding.UTF8.GetString(message.Data); + _output.WriteLine($"Received message: [{receivedMessage}]"); + + consumedCount += 1; + } + Assert.Equal(totalMsgs, consumedCount); + + // redeliver all unack messages + await consumer1.RedeliverUnacknowledgedMessagesAsync(messageIds); + _output.WriteLine($"MessageIds: [{messageIds.Count}]"); + //await Task.Delay(1000); + MessageId lastMsgId = null; + var count = 1; + for (var i = 0; i < totalMsgs / 2; i++) + { + var message = (Message)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); + if (message != null) + { + var msgId = (MessageId)message.MessageId; + if (lastMsgId != null) + { + Assert.True(lastMsgId.LedgerId <= msgId.LedgerId, "lastMsgId: " + lastMsgId + " -- msgId: " + msgId); + } + + lastMsgId = msgId; + _output.WriteLine($"{count++} MessageId: [{lastMsgId}]"); + } + + } + + // close consumer so, this consumer's unack messages will be redelivered to new consumer + consumer1.Close(); + + /* var consumer2 = await _client.NewConsumerAsync(builder); + + await Task.Delay(TimeSpan.FromSeconds(10)); + count = 0; + lastMsgId = null; + for (var i = 0; i < totalMsgs / 2; i++) + { + + var message = (Message)await consumer2.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); + if (message != null) + { + var msgId = (MessageId)message.MessageId; + if (lastMsgId != null) + { + Assert.True(lastMsgId.LedgerId <= msgId.LedgerId); + } + lastMsgId = msgId; + _output.WriteLine($"{count++} RedeliverUnacknowledgedMessage MessageId: [{lastMsgId}]"); + } + }*/ + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task TestUnAckMessageRedeliveryWithReceive(bool ackReceiptEnabled) + { + var topic = $"persistent://public/default/async-unack-redelivery-{Guid.NewGuid()}"; + + var builder = new ConsumerConfigBuilder() + .Topic(topic) + .SubscriptionName("sub-TestUnAckMessageRedeliveryWithReceive") + .AckTimeout(TimeSpan.FromMilliseconds(3000)) + .IsAckReceiptEnabled(ackReceiptEnabled) + .EnableBatchIndexAcknowledgment(ackReceiptEnabled); + var consumer = await _client.NewConsumerAsync(builder); + + var pBuilder = new ProducerConfigBuilder() + .Topic(topic) + .EnableBatching(true) + .BatchingMaxMessages(5) + .BatchingMaxPublishDelay(TimeSpan.FromSeconds(1)); + var producer = await _client.NewProducerAsync(pBuilder); + + const int messages = 10; + + for (var i = 0; i < messages; i++) + { + await producer.SendAsync(Encoding.UTF8.GetBytes("my-message-" + i)); + //_output.WriteLine(JsonSerializer.Serialize(receipt, new JsonSerializerOptions { WriteIndented = true })); + } + var messageIds = new HashSet(); var messageReceived = 0; await Task.Delay(TimeSpan.FromMilliseconds(1000)); - for (var i = 0; i < messageCount - 2; ++i) + for (var i = 0; i < messages; ++i) { var m = (Message)await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); - - _output.WriteLine($"BrokerEntryMetadata[timestamp:{m.BrokerEntryMetadata.BrokerTimestamp} index: {m.BrokerEntryMetadata?.Index.ToString()}"); - var receivedMessage = Encoding.UTF8.GetString(m.Data); - _output.WriteLine($"Received message: [{receivedMessage}]"); - messageReceived++; + if (m != null) + { + _output.WriteLine($"BrokerEntryMetadata[timestamp:{m.BrokerEntryMetadata.BrokerTimestamp} index: {m.BrokerEntryMetadata?.Index.ToString()}"); + var receivedMessage = Encoding.UTF8.GetString(m.Data); + _output.WriteLine($"Received message: [{receivedMessage}]"); + messageReceived++; + messageIds.Add(m.MessageId); + } } - - Assert.True(messageReceived > 0); - await Task.Delay(TimeSpan.FromSeconds(1)); - for (var i = 0; i < messageCount - 5; i++) + Assert.Equal(10, messageReceived); + // redeliver all unack messages + await consumer.RedeliverUnacknowledgedMessagesAsync(messageIds); + //Assert.True(messageReceived > 0); + await Task.Delay(TimeSpan.FromSeconds(5)); + for (var i = 0; i < messages; i++) { var m = (Message)await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000)); - if(m != null) + if (m != null) { var receivedMessage = Encoding.UTF8.GetString(m.Data); - _output.WriteLine($"Received message: [{receivedMessage}]"); + _output.WriteLine($"{messageReceived} Received message: [{receivedMessage}]"); + await consumer.AcknowledgeAsync(m); messageReceived++; } - } + Assert.Equal(20, messageReceived); + await producer.CloseAsync(); await consumer.CloseAsync(); - Assert.True(messageReceived > 5); + //Assert.True(messageReceived > 5); } public async Task InitializeAsync() { @@ -114,6 +242,28 @@ public async Task DisposeAsync() { await _client.ShutdownAsync(); } + private static object[][] AckReceiptEnabled() + { + return + [ + [true], + [false] + ]; + } + + + private object[][] batchedMessageAck() + { + // When batch index ack is disabled (by default), only after all single messages were sent would the pending + // ACK be added into the ACK tracker. + return + [ + [3, 5, CommandAck.AckType.Individual], + [5, 5, CommandAck.AckType.Individual], + [3, 5, CommandAck.AckType.Cumulative], + [5, 5, CommandAck.AckType.Cumulative] + ]; + } } diff --git a/src/SharpPulsar/Client/ClientCnx.cs b/src/SharpPulsar/Client/ClientCnx.cs index 80759333..8fb25419 100644 --- a/src/SharpPulsar/Client/ClientCnx.cs +++ b/src/SharpPulsar/Client/ClientCnx.cs @@ -31,7 +31,7 @@ internal sealed class ClientCnx : ReceiveActor, IWithUnboundedStash, IWithTimers private State _state; private readonly IActorRef _self; private IActorRef _sendMessage; - private IActorRef _sender; + //private IActorRef _sender; private readonly Dictionary Message, IActorRef Requester)> _pendingRequests = new Dictionary Message, IActorRef Requester)>(); // LookupRequests that waiting in client side. @@ -151,7 +151,7 @@ private void Receives() Receive(p => { - _sender = Sender; + //_sender = Sender; switch (p.Command) { case "NewLookup": @@ -239,7 +239,7 @@ private void Receives() }); Receive(r => { - _sender = Sender; + //_sender = Sender; SendRequestWithId(r.Message, r.RequestId, r.NeedsResponse); }); Receive(r => @@ -673,7 +673,7 @@ private void HandleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEndO // caller of this method needs to be protected under pendingLookupRequestSemaphore private void AddPendingLookupRequests(long requestId, ReadOnlySequence message) { - _pendingRequests.Add(requestId, (message, _sender)); + _pendingRequests.Add(requestId, (message, Sender)); } private bool RemovePendingLookupRequest(long requestId, out IActorRef actor) @@ -749,7 +749,7 @@ private void HandleError(CommandError error) } else { - _sender?.Tell(response); + Sender?.Tell(response); _log.Warning($"Received unknown request id from server: {error.RequestId}"); } } @@ -796,7 +796,7 @@ private void NewLookup(ReadOnlySequence request, long requestId) } catch (Exception ex) { - _sender.Tell(PulsarClientException.Unwrap(ex)); + Sender.Tell(PulsarClientException.Unwrap(ex)); } } @@ -868,14 +868,14 @@ private bool SendRequestAndHandleTimeout(ReadOnlySequence requestMessage, try { _sendMessage.Tell(new SendMessage(requestMessage)); - _pendingRequests.Add(requestId, (requestMessage, _sender)); + _pendingRequests.Add(requestId, (requestMessage, Sender)); _requestTimeoutQueue.Enqueue(new RequestTime(DateTimeHelper.CurrentUnixTimeMillis(), requestId, requestType)); return true; } catch (Exception ex) { - _sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); + Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); } return false; } @@ -885,11 +885,11 @@ private void SendRequest(ReadOnlySequence requestMessage, long requestId) { _sendMessage.Tell(new SendMessage(requestMessage)); if (requestId >= 0) - _pendingRequests.Add(requestId, (requestMessage, _sender)); + _pendingRequests.Add(requestId, (requestMessage, Sender)); } catch (Exception ex) { - _sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); + Sender.Tell(new AskResponse(PulsarClientException.Unwrap(ex))); } } diff --git a/src/SharpPulsar/Consumer.cs b/src/SharpPulsar/Consumer.cs index dd5585cf..5018ce1d 100644 --- a/src/SharpPulsar/Consumer.cs +++ b/src/SharpPulsar/Consumer.cs @@ -352,6 +352,15 @@ public async ValueTask RedeliverUnacknowledgedMessagesAsync() throw ask.Exception; } + public void RedeliverUnacknowledgedMessages(ISet messageIds) + => RedeliverUnacknowledgedMessagesAsync(messageIds).ConfigureAwait(false); + public async ValueTask RedeliverUnacknowledgedMessagesAsync(ISet messageIds) + { + var ask = await _consumerActor.Ask(new RedeliverUnacknowledgedMessageIds(messageIds)) + .ConfigureAwait(false); + if (ask.Failed) + throw ask.Exception; + } public void Resume() { _consumerActor.Tell(Messages.Consumer.Resume.Instance); diff --git a/src/SharpPulsar/Consumer/ConsumerActor.cs b/src/SharpPulsar/Consumer/ConsumerActor.cs index 8351ec47..9160e880 100644 --- a/src/SharpPulsar/Consumer/ConsumerActor.cs +++ b/src/SharpPulsar/Consumer/ConsumerActor.cs @@ -926,7 +926,7 @@ private void Ready() _replyTo.Tell(new AskResponse(Unwrap(ex))); } }); - Receive(m => + Receive(_ => { RedeliverUnacknowledgedMessages(); Sender.Tell(new AskResponse()); diff --git a/src/SharpPulsar/Exceptions/PulsarClientException.cs b/src/SharpPulsar/Exceptions/PulsarClientException.cs index a2313454..746249d4 100644 --- a/src/SharpPulsar/Exceptions/PulsarClientException.cs +++ b/src/SharpPulsar/Exceptions/PulsarClientException.cs @@ -1049,15 +1049,11 @@ public static PulsarClientException Unwrap(Exception t) { return (PulsarClientException)t; } - else if (t is Exception) - { - throw (RuntimeException)t; - } // Unwrap the exception to keep the same exception type but a stack trace that includes the application calling // site Exception cause = t.InnerException; - string msg = cause.Message; + string msg = cause != null? cause.Message: t.Message; PulsarClientException newException; if (cause is TimeoutException) { diff --git a/src/SharpPulsar/Messages/Consumer/RedeliverUnacknowledgedMessages.cs b/src/SharpPulsar/Messages/Consumer/RedeliverUnacknowledgedMessages.cs index 57b4b1e0..4a6ea8c8 100644 --- a/src/SharpPulsar/Messages/Consumer/RedeliverUnacknowledgedMessages.cs +++ b/src/SharpPulsar/Messages/Consumer/RedeliverUnacknowledgedMessages.cs @@ -1,8 +1,4 @@  -using System; -using System.ComponentModel.DataAnnotations; -using Org.BouncyCastle.Crypto.Modes.Gcm; -using SharpPulsar.Admin.v2; using SharpPulsar.Exceptions; namespace SharpPulsar.Messages.Consumer