From 2d8e5d0f1863f98ee0ef452ea1fdaf5b6d5d4ea8 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Wed, 8 Jan 2025 17:08:04 -0500 Subject: [PATCH 1/2] add python to streaming subscriptions Signed-off-by: Hannah Hunter --- .../pubsub/subscription-methods.md | 107 +++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index 62ed2811ebe..d593cf27aeb 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -203,7 +203,112 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs Go>}} +{{< tabs Python Go >}} + +{{% codetab %}} + +You can use the `subscribe` method, which returns a `Subscription` object and allows you to pull messages from the stream by calling the `next_message` method. This runs in and may block the main thread while waiting for messages. + +```python +import time + +from dapr.clients import DaprClient +from dapr.clients.grpc.subscription import StreamInactiveError + +counter = 0 + + +def process_message(message): + global counter + counter += 1 + # Process the message here + print(f'Processing message: {message.data()} from {message.topic()}...') + return 'success' + + +def main(): + with DaprClient() as client: + global counter + + subscription = client.subscribe( + pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' + ) + + try: + while counter < 5: + try: + message = subscription.next_message() + + except StreamInactiveError as e: + print('Stream is inactive. Retrying...') + time.sleep(1) + continue + if message is None: + print('No message received within timeout period.') + continue + + # Process the message + response_status = process_message(message) + + if response_status == 'success': + subscription.respond_success(message) + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) + + finally: + print("Closing subscription...") + subscription.close() + + +if __name__ == '__main__': + main() + +``` + +You can also use the `subscribe_with_handler` method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn't block the main thread. + +```python +import time + +from dapr.clients import DaprClient +from dapr.clients.grpc._response import TopicEventResponse + +counter = 0 + + +def process_message(message): + # Process the message here + global counter + counter += 1 + print(f'Processing message: {message.data()} from {message.topic()}...') + return TopicEventResponse('success') + + +def main(): + with (DaprClient() as client): + # This will start a new thread that will listen for messages + # and process them in the `process_message` function + close_fn = client.subscribe_with_handler( + pubsub_name='pubsub', topic='TOPIC_A', handler_fn=process_message, + dead_letter_topic='TOPIC_A_DEAD' + ) + + while counter < 5: + time.sleep(1) + + print("Closing subscription...") + close_fn() + + +if __name__ == '__main__': + main() +``` + +[Learn more about streaming subscriptions using the Python SDK client.]({{< ref "python-client.md#streaming-message-subscription" >}}) + +{{% /codetab %}} {{% codetab %}} From fb07dbcfed4cafa7f22d850dd741e10767a9f3ba Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 10 Jan 2025 13:14:03 -0500 Subject: [PATCH 2/2] update topic and deadletter topic Signed-off-by: Hannah Hunter --- .../building-blocks/pubsub/subscription-methods.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index d593cf27aeb..5c31057ee32 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -231,7 +231,7 @@ def main(): global counter subscription = client.subscribe( - pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' + pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead' ) try: @@ -291,8 +291,8 @@ def main(): # This will start a new thread that will listen for messages # and process them in the `process_message` function close_fn = client.subscribe_with_handler( - pubsub_name='pubsub', topic='TOPIC_A', handler_fn=process_message, - dead_letter_topic='TOPIC_A_DEAD' + pubsub_name='pubsub', topic='orders', handler_fn=process_message, + dead_letter_topic='orders_dead' ) while counter < 5: