From 96c69da8c164966c1281df7001d5d239053442fd Mon Sep 17 00:00:00 2001 From: Mitch Date: Thu, 13 Feb 2020 15:44:51 -0600 Subject: [PATCH] KAFKA-8507; Unify connection name flag for command line tool [KIP-499] (#8023) This change updates ConsoleProducer, ConsumerPerformance, VerifiableProducer, and VerifiableConsumer classes to add and prefer the --bootstrap-server flag for defining the connection point of the Kafka cluster. This change is part of KIP-499: https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool. Reviewers: Ron Dagostino , Stanislav Kozlovski , Chia-Ping Tsai , Jason Gustafson --- .../scala/kafka/tools/ConsoleProducer.scala | 23 ++++++++-- .../kafka/tools/ConsumerPerformance.scala | 31 ++++++++++---- .../kafka/tools/ConsoleProducerTest.scala | 42 ++++++++++++++++--- .../kafka/tools/ConsumerPerformanceTest.scala | 41 +++++++++++++++++- docs/quickstart.html | 4 +- docs/security.html | 2 +- docs/streams/quickstart.html | 8 ++-- .../kafka/tools/VerifiableConsumer.java | 33 ++++++++++++--- .../kafka/tools/VerifiableProducer.java | 28 +++++++++++-- 9 files changed, 177 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index c857f700d0488..6a19981fbd3bb 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -86,7 +86,11 @@ object ConsoleProducer { props ++= config.extraProducerProps - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + if(config.bootstrapServer != null) + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) + else + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) + props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") @@ -121,10 +125,15 @@ object ConsoleProducer { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) - val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.") .withRequiredArg .describedAs("broker-list") .ofType(classOf[String]) + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." + "If specified without value, then it defaults to 'gzip'") @@ -216,11 +225,17 @@ object ConsoleProducer { options = tryParse(parser, args) CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from standard input and publish it to Kafka.") - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) + + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt) val topic = options.valueOf(topicOpt) + + val bootstrapServer = options.valueOf(bootstrapServerOpt) val brokerList = options.valueOf(brokerListOpt) - ToolsUtils.validatePortOrDie(parser,brokerList) + + val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt) + ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts) + val sync = options.has(syncOpt) val compressionCodecOptionValue = options.valueOf(compressionCodecOpt) val compressionCodec = if (options.has(compressionCodecOpt)) diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 278d69486b012..04e8886df3866 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.{Properties, Random} import com.typesafe.scalalogging.LazyLogging +import joptsimple.{OptionException, OptionParser, OptionSet} import kafka.utils.{CommandLineUtils, ToolsUtils} import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer @@ -202,9 +203,14 @@ object ConsumerPerformance extends LazyLogging { } class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) { - val bootstrapServersOpt = parser.accepts("broker-list", "REQUIRED: The server(s) to connect to.") - .withRequiredArg() - .describedAs("host") + val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg + .describedAs("broker-list") + .ofType(classOf[String]) + val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.") + .requiredUnless("broker-list") + .withRequiredArg + .describedAs("server to connect to") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") .withRequiredArg @@ -250,11 +256,10 @@ object ConsumerPerformance extends LazyLogging { .ofType(classOf[Long]) .defaultsTo(10000) - options = parser.parse(args: _*) - + options = tryParse(parser, args) CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer") - CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt, bootstrapServersOpt) + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt) val printMetrics = options.has(printMetricsOpt) @@ -264,7 +269,10 @@ object ConsumerPerformance extends LazyLogging { new Properties import org.apache.kafka.clients.consumer.ConsumerConfig - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt)) + + val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt) + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts) + props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)) props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString) props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString) @@ -283,5 +291,14 @@ object ConsumerPerformance extends LazyLogging { val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt)) val hideHeader = options.has(hideHeaderOpt) val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue() + + def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { + try + parser.parse(args: _*) + catch { + case e: OptionException => + CommandLineUtils.printUsageAndDie(parser, e.getMessage) + } + } } } diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index 00efc6a0ca8c7..ef7b6c631aa97 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala @@ -27,7 +27,7 @@ import kafka.utils.Exit class ConsoleProducerTest { - val validArgs: Array[String] = Array( + val brokerListValidArgs: Array[String] = Array( "--broker-list", "localhost:1001,localhost:1002", "--topic", @@ -37,20 +37,45 @@ class ConsoleProducerTest { "--property", "key.separator=#" ) - + val bootstrapServerValidArgs: Array[String] = Array( + "--bootstrap-server", + "localhost:1003,localhost:1004", + "--topic", + "t3", + "--property", + "parse.key=true", + "--property", + "key.separator=#" + ) val invalidArgs: Array[String] = Array( "--t", // not a valid argument "t3" ) + val bootstrapServerOverride: Array[String] = Array( + "--broker-list", + "localhost:1001", + "--bootstrap-server", + "localhost:1002", + "--topic", + "t3", + ) @Test - def testValidConfigs(): Unit = { - val config = new ConsoleProducer.ProducerConfig(validArgs) + def testValidConfigsBrokerList(): Unit = { + val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs) val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) assertEquals(util.Arrays.asList("localhost:1001", "localhost:1002"), producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) } + @Test + def testValidConfigsBootstrapServer(): Unit = { + val config = new ConsoleProducer.ProducerConfig(bootstrapServerValidArgs) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals(util.Arrays.asList("localhost:1003", "localhost:1004"), + producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + } + @Test(expected = classOf[IllegalArgumentException]) def testInvalidConfigs(): Unit = { Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) @@ -63,11 +88,18 @@ class ConsoleProducerTest { @Test def testParseKeyProp(): Unit = { - val config = new ConsoleProducer.ProducerConfig(validArgs) + val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs) val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader] reader.init(System.in,ConsoleProducer.getReaderProps(config)) assert(reader.keySeparator == "#") assert(reader.parseKey) } + @Test + def testBootstrapServerOverride(): Unit = { + val config = new ConsoleProducer.ProducerConfig(bootstrapServerOverride) + val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config)) + assertEquals(util.Arrays.asList("localhost:1002"), + producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + } } diff --git a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala index 3beec10ae04e0..2ae4fa7d5b4b1 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala @@ -42,7 +42,7 @@ class ConsumerPerformanceTest { } @Test - def testConfig(): Unit = { + def testConfigBrokerList(): Unit = { //Given val args: Array[String] = Array( "--broker-list", "localhost:9092", @@ -54,7 +54,44 @@ class ConsumerPerformanceTest { val config = new ConsumerPerformance.ConsumerPerfConfig(args) //Then - assertEquals("localhost:9092", config.options.valueOf(config.bootstrapServersOpt)) + assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts)) + assertEquals("test", config.topic) + assertEquals(10, config.numMessages) + } + + @Test + def testConfigBootStrapServer(): Unit = { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + ) + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts)) + assertEquals("test", config.topic) + assertEquals(10, config.numMessages) + } + + @Test + def testBrokerListOverride(): Unit = { + //Given + val args: Array[String] = Array( + "--broker-list", "localhost:9094", + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + ) + + //When + val config = new ConsumerPerformance.ConsumerPerfConfig(args) + + //Then + assertEquals("localhost:9092", config.brokerHostsAndPorts) assertEquals("test", config.topic) assertEquals(10, config.numMessages) } diff --git a/docs/quickstart.html b/docs/quickstart.html index b9e0c7ccb6b3a..01da162bf61d9 100644 --- a/docs/quickstart.html +++ b/docs/quickstart.html @@ -73,7 +73,7 @@

Step 4: Send some messages
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
 This is a message
 This is another message
 
@@ -162,7 +162,7 @@

Step 6: Settin Let's publish a few messages to our new topic:

-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
 ...
 my test message 1
 my test message 2
diff --git a/docs/security.html b/docs/security.html
index d080e045c2e30..f9fca78b358fd 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -232,7 +232,7 @@ 
Confi
Examples using console-producer and console-consumer:
-            kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
+            kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
             kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
diff --git a/docs/streams/quickstart.html b/docs/streams/quickstart.html index 4c0d0c92b732f..a6781f62b369a 100644 --- a/docs/streams/quickstart.html +++ b/docs/streams/quickstart.html @@ -188,7 +188,7 @@

Step 4: St Now we can start the console producer in a separate terminal to write some input data to this topic:
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 
and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal: @@ -212,7 +212,7 @@

Step 5 (in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 
@@ -246,7 +246,7 @@

Step 5 Your terminal should look as follows:
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 hello kafka streams
 
@@ -279,7 +279,7 @@

Step 5 Let's enter one final input text line "join kafka summit" and hit <RETURN> in the console producer to the input topic streams-plaintext-input before we wrap up this quickstart:
-> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
+> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
 all streams lead to kafka
 hello kafka streams
 join kafka summit
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index f34b9e22d2633..b65eb9a2a7d13 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -28,6 +28,7 @@
 import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import net.sourceforge.argparse4j.inf.Namespace;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -504,14 +505,23 @@ private static ArgumentParser argParser() {
                 .newArgumentParser("verifiable-consumer")
                 .defaultHelp(true)
                 .description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
-
-        parser.addArgument("--broker-list")
+        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
+                .description("Group of arguments for connection to brokers")
+                .required(true);
+        connectionGroup.addArgument("--bootstrap-server")
                 .action(store())
-                .required(true)
+                .required(false)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("bootstrapServer")
+                .help("REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+        connectionGroup.addArgument("--broker-list")
+                .action(store())
+                .required(false)
                 .type(String.class)
                 .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
                 .dest("brokerList")
-                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+                .help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
 
         parser.addArgument("--topic")
                 .action(store())
@@ -598,6 +608,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
 
         boolean useAutoCommit = res.getBoolean("useAutoCommit");
         String configFile = res.getString("consumer.config");
+        String brokerHostandPort = null;
 
         Properties consumerProps = new Properties();
         if (configFile != null) {
@@ -614,7 +625,18 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
         if (groupInstanceId != null) {
             consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
         }
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+
+
+        if (res.get("bootstrapServer") != null) {
+            brokerHostandPort = res.getString("bootstrapServer");
+        } else if (res.getString("brokerList") != null) {
+            brokerHostandPort = res.getString("brokerList");
+        } else {
+            parser.printHelp();
+            Exit.exit(0);
+        }
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);
+
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
         consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
         consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
@@ -643,7 +665,6 @@ public static void main(String[] args) {
             parser.printHelp();
             Exit.exit(0);
         }
-
         try {
             final VerifiableConsumer consumer = createFromArgs(parser, args);
             Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 7dbd215161905..befdfddf3f79c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -24,6 +24,7 @@
 import net.sourceforge.argparse4j.ArgumentParsers;
 import net.sourceforge.argparse4j.inf.ArgumentParser;
 import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
 import net.sourceforge.argparse4j.inf.Namespace;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -119,14 +120,24 @@ private static ArgumentParser argParser() {
                 .type(String.class)
                 .metavar("TOPIC")
                 .help("Produce messages to this topic.");
+        MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
+                .description("Group of arguments for connection to brokers")
+                .required(true);
+        connectionGroup.addArgument("--bootstrap-server")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("bootstrapServer")
+                .help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
 
-        parser.addArgument("--broker-list")
+        connectionGroup.addArgument("--broker-list")
                 .action(store())
-                .required(true)
+                .required(false)
                 .type(String.class)
                 .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
                 .dest("brokerList")
-                .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+                .help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified.  Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
 
         parser.addArgument("--max-messages")
                 .action(store())
@@ -222,7 +233,16 @@ public static VerifiableProducer createFromArgs(ArgumentParser parser, String[]
             createTime = null;
 
         Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+
+        if (res.get("bootstrapServer") != null) {
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("bootstrapServer"));
+        } else if (res.getString("brokerList") != null) {
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+        } else {
+            parser.printHelp();
+            Exit.exit(0);
+        }
+
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,