Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add python to streaming subscriptions #4485

Merged
merged 5 commits into from
Jan 10, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,112 @@ As messages are sent to the given message handler code, there is no concept of r

The example below shows the different ways to stream subscribe to a topic.

{{< tabs Go>}}
{{< tabs Python Go >}}

{{% codetab %}}

You can use the `subscribe` method, which returns a `Subscription` object and allows you to pull messages from the stream by calling the `next_message` method. This runs in and may block the main thread while waiting for messages.

```python
import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError

counter = 0


def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
return 'success'


def main():
with DaprClient() as client:
global counter

subscription = client.subscribe(
pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
)

try:
while counter < 5:
try:
message = subscription.next_message()

except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue

# Process the message
response_status = process_message(message)

if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)

finally:
print("Closing subscription...")
subscription.close()


if __name__ == '__main__':
main()

```

You can also use the `subscribe_with_handler` method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn't block the main thread.

```python
import time

from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse

counter = 0


def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
return TopicEventResponse('success')


def main():
with (DaprClient() as client):
# This will start a new thread that will listen for messages
# and process them in the `process_message` function
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='orders', handler_fn=process_message,
dead_letter_topic='orders_dead'
)

while counter < 5:
time.sleep(1)

print("Closing subscription...")
close_fn()


if __name__ == '__main__':
main()
```

[Learn more about streaming subscriptions using the Python SDK client.]({{< ref "python-client.md#streaming-message-subscription" >}})

{{% /codetab %}}

{{% codetab %}}

Expand Down
Loading