-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for transactional messages #1853
Comments
We've been having issues with one of our consumers that uses this library, and realized that it's because of transactional messages. In particular, we're seeing that the control messages are being exposed to the client, which should not happen. See the documentation here, which says:
|
A PR would be most welcome! |
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch: while batch is not None:
if getattr(batch, 'is_control_batch', False):
continue wdyt ? |
We would be interested in helping add support at least for adding the ability to filter out control messages. It looks like this issue is a bit old, so I thought I'd check to see where this is at before jumping in. |
I don't think anyone has put together a PR, but if you did we'd be happy to review it. |
Seems pretty old but any update ? |
Does this work ? |
This probably won't work, but if you add
before continue, then it might work. Also probably makes sense to put the condition here: https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L467 so the offsets are correct? |
We recently struck the above issue. I've tested the proposed fix from @jouve / @nilansaha /@smalyshev and it works as intended. I've created a pull request under #2361 |
The client doesn't support transactional messages. On trying to consume messages created by a transactional producer (scala/java based) the python client fails to correctly give values for key/value.
The text was updated successfully, but these errors were encountered: