Skip to content

Commit

Permalink
Merge pull request #176 from Sharp-Pulsar/consumer
Browse files Browse the repository at this point in the history
[changes] Add folder for internal consumer and reader
  • Loading branch information
eaba authored Feb 1, 2024
2 parents ee4f396 + fb47a21 commit 4894e85
Show file tree
Hide file tree
Showing 39 changed files with 2,467 additions and 2,220 deletions.
2 changes: 2 additions & 0 deletions .nuke/build.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
"ServeDocs",
"Test",
"TestContainer",
"Tests",
"Token"
]
}
Expand Down Expand Up @@ -126,6 +127,7 @@
"ServeDocs",
"Test",
"TestContainer",
"Tests",
"Token"
]
}
Expand Down
24 changes: 23 additions & 1 deletion Tutorials/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class Program
static string myTopic = $"persistent://public/default/mytopic-{Guid.NewGuid()}";
//static string myTopic = $"persistent://public/default/mytopic-pulsar";
private static PulsarClient _client;
public static string Token { get; private set; }
static async Task Main(string[] args)
{
await StartContainer();
//await StartContainer();
await TokenStartContainer();
var url = "pulsar://127.0.0.1:6650";
//pulsar client settings builder
Console.WriteLine("Welcome!!");
Expand Down Expand Up @@ -122,6 +124,22 @@ private static async ValueTask StartContainer()
await AwaitPortReadiness($"http://127.0.0.1:8081/");
Console.WriteLine("AwaitPortReadiness Test Container");
}
private static async ValueTask TokenStartContainer()
{
var t = TokenBuildContainer();
_container = t
.WithCleanUp(true)
.Build();

await _container.StartAsync();
Console.WriteLine("Start Test Container");
await AwaitPortReadiness($"http://127.0.0.1:8080/metrics/");

await Task.Delay(2000);
var s = await _container.ExecAsync(new List<string> { @"./bin/pulsar", "tokens", "create", "--secret-key", "/pulsar/secret.key", "--subject", "test-user" });
Token = s.Stdout;
// await AwaitPortReadiness($"http://127.0.0.1:8081/");
}
private static async ValueTask ProduceConsumer(PulsarClient pulsarClient)
{
var consumer = await pulsarClient
Expand Down Expand Up @@ -829,6 +847,10 @@ private static PulsarBuilder BuildContainer()
{
return new PulsarBuilder();
}
private static PulsarTokenBuilder TokenBuildContainer()
{
return new PulsarTokenBuilder();
}
internal static async Task RunOauth()
{
var fileUri = new Uri(GetConfigFilePath());
Expand Down
24 changes: 23 additions & 1 deletion build/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ protected override void OnBuildInitialized()
DotNetRestore(s => s
.SetProjectFile(Solution));
});

Target Tests => _ => _
.DependsOn(Test)
.DependsOn(Token);
Target Compile => _ => _
.DependsOn(Restore)
.Executes(() =>
Expand Down Expand Up @@ -169,6 +171,7 @@ protected override void OnBuildInitialized()
.DependsOn(Compile)
.Executes(async() =>
{

var projects = new List<string>
{
"SharpPulsar.Test",
Expand Down Expand Up @@ -199,6 +202,25 @@ protected override void OnBuildInitialized()
}
await Container.StopAsync();
await Container.DisposeAsync();
await Task.Delay(5000);
var token = Solution.GetProject("SharpPulsar.Test.Token").NotNull("project != null");
Information($"Running tests from {token}");
foreach (var fw in token.GetTargetFrameworks())
{

DotNetTest(c => c
.SetProjectFile(token)
.SetConfiguration(Configuration)
.SetFramework(fw)
.EnableNoBuild()
.SetBlameCrash(true)
.SetBlameHang(true)
.SetBlameHangTimeout("30m")
.EnableNoRestore()
.When(true, _ => _
.SetLoggers("console;verbosity=detailed")
.SetResultsDirectory(OutputTests)));
}
});
Target Token => _ => _
.DependsOn(Compile)
Expand Down
71 changes: 60 additions & 11 deletions src/SharpPulsar.Test.Token/TokenTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System.Text;
using Akka.Configuration;
using SharpCompress;
using SharpPulsar.Auth;
using SharpPulsar.Auth.OAuth2;
using SharpPulsar.Builder;
using SharpPulsar.Interfaces;
using SharpPulsar.Schemas;
Expand All @@ -9,15 +13,51 @@
namespace SharpPulsar.Test.Token
{
[Collection(nameof(PulsarTokenCollection))]
public class TokenTests
public class TokenTests : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private readonly string _topic;
private readonly PulsarClient _client;
private PulsarClient? _client;
private PulsarSystem _system;
private PulsarClientConfigBuilder _configBuilder;
public TokenTests(ITestOutputHelper output, PulsarTokenFixture fixture)
{
var s = fixture.Container.ExecAsync(new List<string> { @"./bin/pulsar", "tokens", "create", "--secret-key", "/pulsar/secret.key", "--subject", "test-user" })
.GetAwaiter()
.GetResult();
_output = output;
_client = fixture.PulsarSystem?.NewClient(fixture.ConfigBuilder).AsTask().GetAwaiter().GetResult()!;
var client = new PulsarClientConfigBuilder();
var serviceUrl = "pulsar://localhost:6650";
var webUrl = "http://localhost:8080";
client.ServiceUrl(serviceUrl);
client.WebUrl(webUrl);

client.Authentication(AuthenticationFactory.Token(s.Stdout));
client.ServiceUrl(serviceUrl);
client.WebUrl(webUrl);
_system = PulsarSystem.GetInstance(actorSystemName: "token", config: ConfigurationFactory.ParseString(@"
akka
{
loglevel = DEBUG
log-config-on-start = on
loggers=[""Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog""]
actor
{
debug
{
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
coordinated-shutdown
{
exit-clr = on
}
}"));
_configBuilder = client;
_topic = $"persistent://public/default/token-{Guid.NewGuid()}";
}

Expand All @@ -26,7 +66,7 @@ public virtual async Task Token_ProducerInstantiation()
{
var producer = new ProducerConfigBuilder<string>();
producer.Topic(_topic);
var stringProducerBuilder = await _client.NewProducerAsync(new StringSchema(), producer);
var stringProducerBuilder = await _client!.NewProducerAsync(new StringSchema(), producer);
Assert.NotNull(stringProducerBuilder);
await stringProducerBuilder.CloseAsync();
_client.Dispose();
Expand All @@ -37,7 +77,7 @@ public virtual async Task Token_ConsumerInstantiation()
var consumer = new ConsumerConfigBuilder<string>();
consumer.Topic(_topic);
consumer.SubscriptionName($"token-test-sub-{Guid.NewGuid()}");
var stringConsumerBuilder = await _client.NewConsumerAsync(new StringSchema(), consumer);
var stringConsumerBuilder = await _client!.NewConsumerAsync(new StringSchema(), consumer);
Assert.NotNull(stringConsumerBuilder);
await stringConsumerBuilder.CloseAsync();
_client.Dispose();
Expand All @@ -48,7 +88,7 @@ public virtual async void Token_ReaderInstantiation()
var reader = new ReaderConfigBuilder<string>();
reader.Topic(_topic);
reader.StartMessageId(IMessageId.Earliest);
var stringReaderBuilder = await _client.NewReaderAsync(new StringSchema(), reader);
var stringReaderBuilder = await _client!.NewReaderAsync(new StringSchema(), reader);
Assert.NotNull(stringReaderBuilder);
await stringReaderBuilder.CloseAsync();
_client.Dispose();
Expand All @@ -65,7 +105,7 @@ public async Task Token_ProduceAndConsume()

var producerBuilder = new ProducerConfigBuilder<byte[]>();
producerBuilder.Topic(topic);
var producer = await _client.NewProducerAsync(producerBuilder);
var producer = await _client!.NewProducerAsync(producerBuilder);

await producer.NewMessage().KeyBytes(byteKey)
.Properties(new Dictionary<string, string> { { "KeyBytes", Encoding.UTF8.GetString(byteKey) } })
Expand All @@ -85,7 +125,7 @@ await producer.NewMessage().KeyBytes(byteKey)
if (message != null)
_output.WriteLine($"BrokerEntryMetadata[timestamp:{message.BrokerEntryMetadata?.BrokerTimestamp} index: {message.BrokerEntryMetadata?.Index.ToString()}");

Assert.Equal(byteKey, message.KeyBytes);
Assert.Equal(byteKey, message!.KeyBytes);

Assert.True(message.HasBase64EncodedKey());
var receivedMessage = Encoding.UTF8.GetString(message.Data);
Expand All @@ -107,7 +147,7 @@ public async Task Token_ProduceAndConsumeBatch()
.Topic(_topic)
.ForceTopicCreation(true)
.SubscriptionName($"Batch-subscriber-{Guid.NewGuid()}");
var consumer = await _client.NewConsumerAsync(consumerBuilder);
var consumer = await _client!.NewConsumerAsync(consumerBuilder);


var producerBuilder = new ProducerConfigBuilder<byte[]>()
Expand Down Expand Up @@ -140,7 +180,7 @@ public async Task Token_ProduceAndConsumeBatch()

Assert.Equal(byteKey, message?.KeyBytes);
Assert.True(message?.HasBase64EncodedKey());
var receivedMessage = Encoding.UTF8.GetString(message.Data);
var receivedMessage = Encoding.UTF8.GetString(message!.Data);
_output.WriteLine($"Received message: [{receivedMessage}]");
Assert.Equal($"TestMessage-{i}", receivedMessage);
}
Expand All @@ -149,6 +189,15 @@ public async Task Token_ProduceAndConsumeBatch()
await consumer.CloseAsync();
_client.Dispose();
}

public async Task InitializeAsync()
{

_client = await _system.NewClient(_configBuilder);
}

public async Task DisposeAsync()
{
await _client!.ShutdownAsync();
}
}
}
3 changes: 2 additions & 1 deletion src/SharpPulsar.Test.Token/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"authParamsString": "",
"authCertPath": "",
"operationTime": 60000,
"connectionTime": 0
"connectionTime": 0,
"authentication": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace SharpPulsar.Test
public class AcknowledgementsGroupingTrackerTest : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private TaskCompletionSource<PulsarClient> _tcs;
//private TaskCompletionSource<PulsarClient> _tcs;
private PulsarClient _client;
private PulsarSystem _system;
private PulsarClientConfigBuilder _configBuilder;
Expand Down
4 changes: 2 additions & 2 deletions src/SharpPulsar.Test/PartitionedProducerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public virtual async Task TestGetNumOfPartitions()
_output.WriteLine($"Consumer acknowledged : {Encoding.UTF8.GetString(message.Data)} from topic: {m.Topic}");
message = await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
}
consumer.Unsubscribe();
consumer.Unsubscribe(true);
foreach (var producer in producers)
{
await producer.CloseAsync();
Expand Down Expand Up @@ -258,7 +258,7 @@ public virtual async Task TestBinaryProtoToGetTopicsOfNamespacePersistent()
_output.WriteLine($"Consumer acknowledged : {Encoding.UTF8.GetString(message.Data)} from topic: {m.Topic}");
message = await consumer.ReceiveAsync(TimeSpan.FromMicroseconds(5000));
}
consumer.Unsubscribe();
consumer.Unsubscribe(false);
await consumer.CloseAsync();
await producer1.CloseAsync();
await producer2.CloseAsync();
Expand Down
4 changes: 3 additions & 1 deletion src/SharpPulsar.Test/ZeroQueueSizeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ public async Task TestZeroQueueSizeMessageRedeliveryForListener()
}
finally
{
latch.Signal();
_output.WriteLine($"Message Listener Count: {latch?.CurrentCount!}");
if (latch?.CurrentCount! > 0)
latch?.Signal();
}

}, null));
Expand Down
24 changes: 23 additions & 1 deletion src/SharpPulsar.TestContainer/PulsarFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Reflection;
using System.Security.Cryptography.X509Certificates;
using Akka.Configuration;
using Microsoft.Extensions.Configuration;
using SharpPulsar.Builder;
using SharpPulsar.Configuration;
Expand Down Expand Up @@ -29,7 +30,28 @@ public IConfigurationRoot GetIConfigurationRoot(string outputPath)
}
public virtual void SetupSystem(string? service = null, string? web = null)
{
System = PulsarSystem.GetInstance(actorSystemName: "tests");
System = PulsarSystem.GetInstance(actorSystemName: "tests", config: ConfigurationFactory.ParseString(@"
akka
{
loglevel = DEBUG
log-config-on-start = on
loggers=[""Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog""]
actor
{
debug
{
receive = on
autoreceive = on
lifecycle = on
event-stream = on
unhandled = on
}
}
coordinated-shutdown
{
exit-clr = on
}
}"));
var client = new PulsarClientConfigBuilder();
var clienConfigSetting = _configuration.GetSection("client");
var serviceUrl = service ?? clienConfigSetting.GetSection("service-url").Value;
Expand Down
Loading

0 comments on commit 4894e85

Please sign in to comment.