Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for transactional messages #1853

Closed
imduffy15 opened this issue Jun 29, 2019 · 10 comments · Fixed by #2499
Closed

Support for transactional messages #1853

imduffy15 opened this issue Jun 29, 2019 · 10 comments · Fixed by #2499

Comments

@imduffy15
Copy link

The client doesn't support transactional messages. On trying to consume messages created by a transactional producer (scala/java based) the python client fails to correctly give values for key/value.

@jutley
Copy link

jutley commented Aug 29, 2019

We've been having issues with one of our consumers that uses this library, and realized that it's because of transactional messages. In particular, we're seeing that the control messages are being exposed to the client, which should not happen. See the documentation here, which says:

Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

@jeffwidman
Copy link
Contributor

A PR would be most welcome!

@jouve
Copy link

jouve commented Aug 25, 2020

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

@dotnwat
Copy link

dotnwat commented Mar 17, 2021

We would be interested in helping add support at least for adding the ability to filter out control messages. It looks like this issue is a bit old, so I thought I'd check to see where this is at before jumping in.

@jeffwidman
Copy link
Contributor

I don't think anyone has put together a PR, but if you did we'd be happy to review it.

@nilansaha
Copy link

Seems pretty old but any update ?

@nilansaha
Copy link

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

Does this work ?

@nilansaha
Copy link

@dotnwat @jutley I have the same problem but I can't replicate this locally using kakfa zookeeper docker however. I can try making a PR if I can replicate it locally.

@smalyshev
Copy link

in https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L458, the batch should be skipped if it's a control batch:

            while batch is not None:
                if getattr(batch, 'is_control_batch', False):
                    continue

wdyt ?

This probably won't work, but if you add

batch = records.next_batch()

before continue, then it might work. Also probably makes sense to put the condition here: https://github.com/dpkp/kafka-python/blob/master/kafka/consumer/fetcher.py#L467 so the offsets are correct?

@bradenneal1
Copy link

We recently struck the above issue. I've tested the proposed fix from @jouve / @nilansaha /@smalyshev and it works as intended. I've created a pull request under #2361

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants