diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index e42a29da..b061b5c9 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -120,6 +120,15 @@ public async Task Shutdown() request, response, metadata); } + public async Task> + RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var metadata = _client.Sign(); + var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout); + return new RpcInvocation( + request, response, metadata); + } + public async Task> SendMessage( Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout) { diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index 743df9fe..ac9108dc 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -61,6 +61,16 @@ Task> Heartbeat(Endpoints end /// Task of response. Task> NotifyClientTermination( Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout); + + /// + /// Recall messages. + /// + /// The target endpoints. + /// gRPC request of recalling messages. + /// Request max duration. + /// Task of response. + Task> RecallMessage( + Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout); /// /// Send message to remote endpoints. diff --git a/csharp/rocketmq-client-csharp/IRecallReceipt.cs b/csharp/rocketmq-client-csharp/IRecallReceipt.cs new file mode 100644 index 00000000..8291cd66 --- /dev/null +++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public interface IRecallReceipt + { + string MessageId { get; } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs b/csharp/rocketmq-client-csharp/IRpcClient.cs index 8145ea18..eb369c2d 100644 --- a/csharp/rocketmq-client-csharp/IRpcClient.cs +++ b/csharp/rocketmq-client-csharp/IRpcClient.cs @@ -52,6 +52,8 @@ Task ForwardMessageToDeadLetterQueue(Me Task NotifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout); + Task RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout); + Task Shutdown(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RecallReceipt.cs b/csharp/rocketmq-client-csharp/RecallReceipt.cs new file mode 100644 index 00000000..80cf120c --- /dev/null +++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public sealed class RecallReceipt : IRecallReceipt + { + public RecallReceipt(string messageId) + { + MessageId = messageId; + } + + public string MessageId { get; } + + public override string ToString() + { + return $"{nameof(MessageId)}: {MessageId}"; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs index eeff96e5..346ead28 100644 --- a/csharp/rocketmq-client-csharp/RpcClient.cs +++ b/csharp/rocketmq-client-csharp/RpcClient.cs @@ -189,5 +189,14 @@ internal static HttpMessageHandler CreateHttpHandler() var call = _stub.NotifyClientTerminationAsync(request, callOptions); return await call.ResponseAsync; } + + public async Task RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var deadline = DateTime.UtcNow.Add(timeout); + var callOptions = new CallOptions(metadata, deadline); + + var call = _stub.RecallMessageAsync(request, callOptions); + return await call.ResponseAsync; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index c9fe8014..3b177580 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -31,7 +31,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message } public string MessageId { get; } - + public string TransactionId { get; } private MessageQueue MessageQueue { get; }