Skip to content

Commit

Permalink
KAFKA-18262 Remove DefaultPartitioner and UniformStickyPartitioner (a…
Browse files Browse the repository at this point in the history
…pache#18204)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
FrankYang0529 authored Dec 20, 2024
1 parent e9d4aa4 commit 753a003
Show file tree
Hide file tree
Showing 18 changed files with 41 additions and 760 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,23 +326,6 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
this(Utils.propsToMap(properties), keySerializer, valueSerializer);
}

/**
* Check if partitioner is deprecated and log a warning if it is.
*/
@SuppressWarnings("deprecation")
private void warnIfPartitionerDeprecated() {
// Using DefaultPartitioner and UniformStickyPartitioner is deprecated, see KIP-794.
if (partitioner instanceof org.apache.kafka.clients.producer.internals.DefaultPartitioner) {
log.warn("DefaultPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting to get the default partitioning behavior");
}
if (partitioner instanceof org.apache.kafka.clients.producer.UniformStickyPartitioner) {
log.warn("UniformStickyPartitioner is deprecated. Please clear " + ProducerConfig.PARTITIONER_CLASS_CONFIG
+ " configuration setting and set " + ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG
+ " to 'true' to get the uniform sticky partitioning behavior");
}
}

// visible for testing
@SuppressWarnings({"unchecked", "this-escape"})
KafkaProducer(ProducerConfig config,
Expand Down Expand Up @@ -385,7 +368,6 @@ private void warnIfPartitionerDeprecated() {
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
warnIfPartitionerDeprecated();
this.partitionerIgnoreKeys = config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,6 @@ public MockProducer(final Cluster cluster,
this.mockMetrics = new HashMap<>();
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(Cluster.empty(), autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(Cluster.empty(), autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting and key\value serializers.
*
* Equivalent to {@link #MockProducer(Cluster, boolean, Partitioner, Serializer, Serializer) new MockProducer(cluster, autoComplete, new DefaultPartitioner(), keySerializer, valueSerializer)}
*/
@SuppressWarnings("deprecation")
public MockProducer(final Cluster cluster,
final boolean autoComplete,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer) {
this(cluster, autoComplete, new org.apache.kafka.clients.producer.internals.DefaultPartitioner(), keySerializer, valueSerializer);
}

/**
* Create a new mock producer with invented metadata the given autoComplete setting, partitioner and key\value serializers.
*
Expand Down Expand Up @@ -563,6 +538,9 @@ private int partition(ProducerRecord<K, V> record, Cluster cluster) {
}
byte[] keyBytes = keySerializer.serialize(topic, record.headers(), record.key());
byte[] valueBytes = valueSerializer.serialize(topic, record.headers(), record.value());
if (partitioner == null) {
return this.cluster.partitionsForTopic(record.topic()).get(0).partition();
}
return this.partitioner.partition(topic, record.key(), keyBytes, record.value(), valueBytes, cluster);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public interface Partitioner extends Configurable, Closeable {
void close();

/**
* Note this method is only implemented in DefaultPartitioner and {@link UniformStickyPartitioner} which
* Note this method is only implemented in DefaultPartitioner and UniformStickyPartitioner which
* are now deprecated. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner">KIP-794</a> for more info.
* <p>
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class MockProducerTest {
private final String groupId = "group";

private void buildMockProducer(boolean autoComplete) {
this.producer = new MockProducer<>(autoComplete, new MockSerializer(), new MockSerializer());
this.producer = new MockProducer<>(Cluster.empty(), autoComplete, null, new MockSerializer(), new MockSerializer());
}

@AfterEach
Expand Down Expand Up @@ -87,10 +87,16 @@ public void testPartitioner() throws Exception {
PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, null);
Cluster cluster = new Cluster(null, new ArrayList<>(0), asList(partitionInfo0, partitionInfo1),
Collections.emptySet(), Collections.emptySet());
MockProducer<String, String> producer = new MockProducer<>(cluster, true, new StringSerializer(), new StringSerializer());
MockProducer<String, String> producer = new MockProducer<>(
cluster,
true,
new org.apache.kafka.clients.producer.RoundRobinPartitioner(),
new StringSerializer(),
new StringSerializer()
);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key", "value");
Future<RecordMetadata> metadata = producer.send(record);
assertEquals(1, metadata.get().partition(), "Partition should be correct");
assertEquals(0, metadata.get().partition(), "Partition should be correct");
producer.clear();
assertEquals(0, producer.history().size(), "Clear should erase our history");
producer.close();
Expand Down Expand Up @@ -680,7 +686,7 @@ public void shouldNotThrowOnFlushProducerIfProducerIsFenced() {
@Test
@SuppressWarnings("unchecked")
public void shouldThrowClassCastException() {
try (MockProducer<Integer, String> customProducer = new MockProducer<>(true, new IntegerSerializer(), new StringSerializer())) {
try (MockProducer<Integer, String> customProducer = new MockProducer<>(Cluster.empty(), true, null, new IntegerSerializer(), new StringSerializer())) {
assertThrows(ClassCastException.class, () -> customProducer.send(new ProducerRecord(topic, "key1", "value1")));
}
}
Expand Down
Loading

0 comments on commit 753a003

Please sign in to comment.