Skip to content

Commit

Permalink
HOTFIX: Revert "KAFKA-18067: Kafka Streams can leak Producer client u…
Browse files Browse the repository at this point in the history
…nder EOS (apache#17931)" (apache#19078)

This reverts commit e883746.

The commit that is reverted prevents Kafka Streams from re-initializing
its transactional producer. If an exception that fences the
transactional producer occurs, the producer is not re-initialized during
the handling of the exception. That causes an infinite loop of
ProducerFencedExceptions with corresponding rebalances.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, David Jacot
<djacot@confluent.io>
  • Loading branch information
cadonna authored Mar 3, 2025
1 parent a24fedf commit ff94c44
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ private Producer<byte[], byte[]> producer() {
}

public void reInitializeProducer() {
if (!streamsProducer.isClosed())
streamsProducer.resetProducer(producer());
streamsProducer.resetProducer(producer());
}

StreamsProducer streamsProducer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class StreamsProducer {
private Producer<byte[], byte[]> producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private boolean closed = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
Expand Down Expand Up @@ -99,10 +98,6 @@ boolean transactionInFlight() {
return transactionInFlight;
}

boolean isClosed() {
return closed;
}

/**
* @throws IllegalStateException if EOS is disabled
*/
Expand Down Expand Up @@ -325,7 +320,6 @@ void flush() {

void close() {
producer.close();
closed = true;
transactionInFlight = false;
transactionInitialized = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,23 +190,9 @@ public void shouldCloseIfEosV2Enabled() {

activeTaskCreator.close();

assertThat(activeTaskCreator.streamsProducer().isClosed(), is(true));
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}

@Test
public void shouldNotReInitializeProducerOnClose() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();

activeTaskCreator.streamsProducer().close();
activeTaskCreator.reInitializeProducer();
// If streamsProducer is not closed, clientSupplier will recreate a producer,
// resulting in more than one producer being created.
assertThat(mockClientSupplier.producers.size(), is(1));
}

// error handling

@Test
Expand Down

0 comments on commit ff94c44

Please sign in to comment.