Skip to content

Commit

Permalink
Merge pull request #183 from Sharp-Pulsar/connectionHandler
Browse files Browse the repository at this point in the history
Improved `ConnectionHandler`
  • Loading branch information
eaba authored Sep 22, 2024
2 parents 1cea72c + 0ea30f1 commit 4518cf0
Show file tree
Hide file tree
Showing 50 changed files with 399 additions and 341 deletions.
4 changes: 2 additions & 2 deletions Tutorials/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ await AnsiConsole.Live(table)

});
}
private static async ValueTask<List<MessageId>> PublishMessages(string topic, int count, string message, PulsarClient client)
private static async ValueTask<List<MessageIdAdv>> PublishMessages(string topic, int count, string message, PulsarClient client)
{
var keys = new List<MessageId>();
var keys = new List<MessageIdAdv>();
var builder = new ProducerConfigBuilder<byte[]>()
.Topic(topic);
var producer = await client.NewProducerAsync(builder);
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test.API/BatchMessageIdTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void EqualsTest()
var batchMsgId2 = new BatchMessageId(1, 1, 1, 1);
var batchMsgId3 = new BatchMessageId(0, 0, 0, 1);
var batchMsgId4 = new BatchMessageId(0, 0, 0, -1);
var msgId = new MessageId(0, 0, 0);
var msgId = new MessageIdAdv(0, 0, 0);

Assert.True(batchMsgId1.Equals(batchMsgId1));
Assert.False(batchMsgId1.Equals(batchMsgId2));
Expand Down
42 changes: 21 additions & 21 deletions src/SharpPulsar.Test.API/MessageIdCompareToTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class MessageIdCompareToTest
[Fact]
public virtual void TestEqual()
{
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);

var batchMessageId1 = new BatchMessageId(234L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 345L, 456, 567);
Expand All @@ -43,10 +43,10 @@ public virtual void TestEqual()
[Fact]
public virtual void TestGreaterThan()
{
var MessageId1 = new MessageId(124L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(123L, 344L, 567);
var MessageId4 = new MessageId(123L, 344L, 566);
var MessageId1 = new MessageIdAdv(124L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(123L, 344L, 567);
var MessageId4 = new MessageIdAdv(123L, 344L, 566);

var batchMessageId1 = new BatchMessageId(235L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 346L, 456, 567);
Expand Down Expand Up @@ -75,10 +75,10 @@ public virtual void TestGreaterThan()
[Fact]
public virtual void TestLessThan()
{
var MessageId1 = new MessageId(124L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(123L, 344L, 567);
var MessageId4 = new MessageId(123L, 344L, 566);
var MessageId1 = new MessageIdAdv(124L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(123L, 344L, 567);
var MessageId4 = new MessageIdAdv(123L, 344L, 566);

var batchMessageId1 = new BatchMessageId(235L, 345L, 456, 567);
var batchMessageId2 = new BatchMessageId(234L, 346L, 456, 567);
Expand Down Expand Up @@ -107,7 +107,7 @@ public virtual void TestLessThan()
[Fact]
public virtual void TestCompareDifferentType()
{
var MessageId = new MessageId(123L, 345L, 567);
var MessageId = new MessageIdAdv(123L, 345L, 567);
var batchMessageId1 = new BatchMessageId(123L, 345L, 566, 789);
var batchMessageId2 = new BatchMessageId(123L, 345L, 567, 789);
var batchMessageId3 = new BatchMessageId(MessageId);
Expand All @@ -121,7 +121,7 @@ public virtual void TestCompareDifferentType()
[Fact]
public virtual void CompareToSymmetricTest()
{
var simpleMessageId = new MessageId(123L, 345L, 567);
var simpleMessageId = new MessageIdAdv(123L, 345L, 567);
// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
var batchMessageId1 = new BatchMessageId(123L, 345L, 567, -1);
var batchMessageId2 = new BatchMessageId(123L, 345L, 567, 1);
Expand All @@ -140,7 +140,7 @@ public virtual void CompareToSymmetricTest()
[Fact]
public virtual void TestMessageIdCompareToTopicMessageId()
{
var MessageId = new MessageId(123L, 345L, 567);
var MessageId = new MessageIdAdv(123L, 345L, 567);
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(123L, 345L, 566, 789));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(123L, 345L, 567, 789));
var topicMessageId3 = new TopicMessageId("test-topic-partition-0", "test-topic", new BatchMessageId(MessageId));
Expand All @@ -157,8 +157,8 @@ public virtual void TestBatchMessageIdCompareToTopicMessageId()
var MessageId1 = new BatchMessageId(123L, 345L, 567, 789);
var MessageId2 = new BatchMessageId(123L, 345L, 567, 0);
var MessageId3 = new BatchMessageId(123L, 345L, 567, -1);
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageId(123L, 345L, 566));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageId(123L, 345L, 567));
var topicMessageId1 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageIdAdv(123L, 345L, 566));
var topicMessageId2 = new TopicMessageId("test-topic-partition-0", "test-topic", new MessageIdAdv(123L, 345L, 567));
Assert.True(MessageId1.CompareTo(topicMessageId1) > 0, "Expected to be greater than");
Assert.True(MessageId1.CompareTo(topicMessageId2) > 0, "Expected to be greater than");
Assert.True(MessageId2.CompareTo(topicMessageId2) > 0, "Expected to be greater than");
Expand Down Expand Up @@ -187,9 +187,9 @@ public virtual void TestMultiMessageIdEqual()

// 1 item
var topic1 = "topicName1";
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(345L, 456L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(345L, 456L, 567);

var item1 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId1 } });
var item2 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId2 } });
Expand Down Expand Up @@ -263,9 +263,9 @@ public virtual void TestMultiMessageIdCompareto()

// 1 item
var topic1 = "topicName1";
var MessageId1 = new MessageId(123L, 345L, 567);
var MessageId2 = new MessageId(123L, 345L, 567);
var MessageId3 = new MessageId(345L, 456L, 567);
var MessageId1 = new MessageIdAdv(123L, 345L, 567);
var MessageId2 = new MessageIdAdv(123L, 345L, 567);
var MessageId3 = new MessageIdAdv(345L, 456L, 567);

var item1 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId1 } });
var item2 = new MultiMessageId(new Dictionary<string, IMessageId> { { topic1, MessageId2 } });
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test.API/MessageIdUtilsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class MessageIdUtilsTest
[InlineData(39, 5)]
public void TestId(long ledger, long entry)
{
var id = new MessageId(ledger, entry, -1);
var id = new MessageIdAdv(ledger, entry, -1);
var offset = MessageIdUtils.GetOffset(id);
var id1 = MessageIdUtils.GetMessageId(offset);
Assert.Equal(id, id1);
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test.API/MessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public virtual void TestTopicMessageImplReplicatedInfo()
builder.ReplicatedFrom = from;
var payload = ReadOnlySequence<byte>.Empty;
var msg = Message<byte[]>.Create(builder, payload, ISchema<int>.Bytes);
msg.SetMessageId(new MessageId(-1, -1, -1));
msg.SetMessageId(new MessageIdAdv(-1, -1, -1));
var topicMessage = new TopicMessage<byte[]>(topicName, topicName, msg, null);

Assert.True(topicMessage.Replicated);
Expand All @@ -210,7 +210,7 @@ public virtual void TestTopicMessageImplNoReplicatedInfo()
var builder = new MessageMetadata();
var payload = ReadOnlySequence<byte>.Empty;
var msg = Message<byte[]>.Create(builder, payload, ISchema<int>.Bytes);
msg.SetMessageId(new MessageId(-1, -1, -1));
msg.SetMessageId(new MessageIdAdv(-1, -1, -1));
var topicMessage = new TopicMessage<byte[]>(topicName, topicName, msg, null);

Assert.False(topicMessage.Replicated);
Expand Down
37 changes: 19 additions & 18 deletions src/SharpPulsar.Test/AcknowledgementsGroupingTrackerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public async Task TestAckTracker()
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg3 = new MessageId(5, 3, 0);
var msg4 = new MessageId(5, 4, 0);
var msg5 = new MessageId(5, 5, 0);
var msg6 = new MessageId(5, 6, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);
var msg3 = new MessageIdAdv(5, 3, 0);
var msg4 = new MessageIdAdv(5, 4, 0);
var msg5 = new MessageIdAdv(5, 5, 0);
var msg6 = new MessageIdAdv(5, 6, 0);
var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
tracker.Tell(new AddAcknowledgment(msg1, CommandAck.AckType.Individual, new Dictionary<string, long>()));
Expand Down Expand Up @@ -134,17 +134,18 @@ public async Task TestAckTracker()
public async Task TestImmediateAckingTracker()
{

var builder = new ConsumerConfigBuilder<byte[]>();
builder.AcknowledgmentGroupTime(TimeSpan.Zero);
builder.Topic($"TestAckTracker-{Guid.NewGuid()}");
builder.SubscriptionName($"TestAckTracker-sub-{Guid.NewGuid()}");
var builder = new ConsumerConfigBuilder<byte[]>()
.AcknowledgmentGroupTime(TimeSpan.Zero)
.IsAckReceiptEnabled(false)
.Topic($"TestAckTracker-{Guid.NewGuid()}")
.SubscriptionName($"TestAckTracker-sub-{Guid.NewGuid()}");
var conf = builder.ConsumerConfigurationData;
var consumer = await _client.NewConsumerAsync(builder);
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);

var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
Expand Down Expand Up @@ -178,12 +179,12 @@ public async Task TestAckTrackerMultiAck()
var unack = _client.ActorSystem.ActorOf(UnAckedChunckedMessageIdSequenceMap.Prop());
var tracker = _client.ActorSystem.ActorOf(PersistentAcknowledgmentsGroupingTracker<byte[]>.Prop(unack, consumer.ConsumerActor, consumer.ConsumerActor/*dummy*/, 1, consumer.ConsumerActor, conf));

var msg1 = new MessageId(5, 1, 0);
var msg2 = new MessageId(5, 2, 0);
var msg3 = new MessageId(5, 3, 0);
var msg4 = new MessageId(5, 4, 0);
var msg5 = new MessageId(5, 5, 0);
var msg6 = new MessageId(5, 6, 0);
var msg1 = new MessageIdAdv(5, 1, 0);
var msg2 = new MessageIdAdv(5, 2, 0);
var msg3 = new MessageIdAdv(5, 3, 0);
var msg4 = new MessageIdAdv(5, 4, 0);
var msg5 = new MessageIdAdv(5, 5, 0);
var msg6 = new MessageIdAdv(5, 6, 0);

var isDuplicate = await tracker.Ask<bool>(new IsDuplicate(msg1));
Assert.False(isDuplicate);
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test/ConsumerRedeliveryTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ public async Task TestOrderedRedelivery(bool ackReceiptEnabled)
await consumer1.RedeliverUnacknowledgedMessagesAsync(messageIds);
_output.WriteLine($"MessageIds: [{messageIds.Count}]");
//await Task.Delay(1000);
MessageId lastMsgId = null;
MessageIdAdv lastMsgId = null;
var count = 1;
for (var i = 0; i < totalMsgs / 2; i++)
{
var message = (Message<byte[]>)await consumer1.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
if (message != null)
{
var msgId = (MessageId)message.MessageId;
var msgId = (MessageIdAdv)message.MessageId;
if (lastMsgId != null)
{
Assert.True(lastMsgId.LedgerId <= msgId.LedgerId, "lastMsgId: " + lastMsgId + " -- msgId: " + msgId);
Expand Down
6 changes: 3 additions & 3 deletions src/SharpPulsar.Test/EventSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,16 @@ public virtual async Task ReaderSourceTaggedTest()

Assert.True(receivedCount > 0);
}
private async Task<ISet<MessageId>> PublishMessages(string topic, int count)
private async Task<ISet<MessageIdAdv>> PublishMessages(string topic, int count)
{
var ids = new HashSet<MessageId>();
var ids = new HashSet<MessageIdAdv>();
var builder = new ProducerConfigBuilder<DataOpEx>()
.Topic(topic);
var producer = await _client.NewProducerAsync(AvroSchema<DataOpEx>.Of(typeof(DataOpEx)), builder);
for (var i = 0; i < count; i++)
{
var key = "key" + i;
MessageId id = null;
MessageIdAdv id = null;
if (i % 2 == 0)
id = await producer.NewMessage().Key(key).Property("twitter", "mestical").Value(new DataOpEx { Text = "my-event-message-" + i, EventTime = DateTimeHelper.CurrentUnixTimeMillis() }).SendAsync();
else
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test/MultiTopicsConsumerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public async Task TestMultiTopicConsumer()
await consumer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
}

private async Task<List<MessageId>> PublishMessages(string topic, int count, string message)
private async Task<List<MessageIdAdv>> PublishMessages(string topic, int count, string message)
{
var keys = new List<MessageId>();
var keys = new List<MessageIdAdv>();
var builder = new ProducerConfigBuilder<byte[]>()
.Topic(topic);
var producer = await _client.NewProducerAsync(builder);
Expand Down
5 changes: 3 additions & 2 deletions src/SharpPulsar.Test/NegativeAcksTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public async Task TestAddAndRemove()
var size = await tracker.Ask<long>(Size.Instance);//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Assert.Equal(0, size);

var mid = new MessageId(1L, 1L, -1);
var mid = new MessageIdAdv(1L, 1L, -1);
var added = await tracker.Ask<bool>(new Add(mid));//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Assert.True(added);
added = await tracker.Ask<bool>(new Add(mid));//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
Expand Down Expand Up @@ -172,7 +172,8 @@ private async Task TestNegativeAcks(bool batching, bool usePartition, CommandSub
// There should be no more messages
//Assert.Null(nu);
await producer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.CloseAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.UnsubscribeAsync();//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.CloseAsync();
}
public async Task InitializeAsync()
{
Expand Down
2 changes: 1 addition & 1 deletion src/SharpPulsar.Test/ZeroQueueSizeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public async Task TestPauseAndResume()
//await Task.Delay(10000);
//Assert.True(latch.Value.Wait(TimeSpan.FromSeconds(2)), "Timed out waiting for message listener acks");

//await consumer.UnsubscribeAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
await consumer.UnsubscribeAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
await producer.CloseAsync();//;//.ConfigureAwait(false); https://xunit.net/xunit.analyzers/rules/xUnit1030 https://xunit.net/xunit.analyzers/rules/xUnit1030
}
public async Task InitializeAsync()
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar/Batch/BatchMessageContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ public override ProducerActor<T>.OpSendMsg<T> CreateOpSendMsg()
}
private ReadOnlySequence<byte> SendMessage(long producerId, long sequenceId, int numMessages, IMessageId messageId, MessageMetadata msgMetadata, byte[] compressedPayload)
{
if (messageId is MessageId)
if (messageId is MessageIdAdv)
{
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageId)messageId).LedgerId, ((MessageId)messageId).EntryId, msgMetadata, compressedPayload);
return Commands.NewSend(producerId, sequenceId, numMessages, ChecksumType.Crc32C, ((MessageIdAdv)messageId).LedgerId, ((MessageIdAdv)messageId).EntryId, msgMetadata, compressedPayload);
}
else
{
Expand Down
10 changes: 5 additions & 5 deletions src/SharpPulsar/Batch/BatchMessageId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/// </summary>
namespace SharpPulsar.Batch
{
public class BatchMessageId : MessageId
public class BatchMessageId : MessageIdAdv
{
private const int NoBatch = -1;
public int BatchIndex { get; }
Expand Down Expand Up @@ -68,7 +68,7 @@ public virtual int CompareTo(object o)
return Compare(other);
}

if (o is MessageId id)
if (o is MessageIdAdv id)
{
int res = base.CompareTo(id);
if (res == 0 && BatchIndex > NoBatch)
Expand Down Expand Up @@ -100,7 +100,7 @@ public override bool Equals(object obj)
return LedgerId == other1.LedgerId && EntryId == other1.EntryId && PartitionIndex == other1.PartitionIndex && BatchIndex == other1.BatchIndex && BatchSize == other1.BatchSize;
}

if (obj is MessageId other)
if (obj is MessageIdAdv other)
{
return LedgerId == other.LedgerId && EntryId == other.EntryId && PartitionIndex == other.PartitionIndex && BatchIndex == NoBatch;
}
Expand Down Expand Up @@ -136,9 +136,9 @@ public virtual bool AckCumulative(int batchsize)

public virtual int BatchSize => Acker.BatchSize;

public virtual MessageId PrevBatchMessageId()
public virtual MessageIdAdv PrevBatchMessageId()
{
return new MessageId(LedgerId, EntryId - 1, PartitionIndex);
return new MessageIdAdv(LedgerId, EntryId - 1, PartitionIndex);
}

private int Compare(BatchMessageId m)
Expand Down
Loading

0 comments on commit 4518cf0

Please sign in to comment.