Skip to content

Commit

Permalink
KAFKA-8507; Unify connection name flag for command line tool [KIP-499] (
Browse files Browse the repository at this point in the history
apache#8023)

This change updates ConsoleProducer, ConsumerPerformance, VerifiableProducer, and VerifiableConsumer classes to add and prefer the --bootstrap-server flag for defining the connection point of the Kafka cluster. This change is part of KIP-499: https://cwiki.apache.org/confluence/display/KAFKA/KIP-499+-+Unify+connection+name+flag+for+command+line+tool.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>,  Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
  • Loading branch information
mitchell-h authored Feb 13, 2020
1 parent 7e1c39f commit 96c69da
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 35 deletions.
23 changes: 19 additions & 4 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ object ConsoleProducer {

props ++= config.extraProducerProps

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
if(config.bootstrapServer != null)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
else
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)

props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "console-producer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
Expand Down Expand Up @@ -121,10 +125,15 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
.describedAs("broker-list")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.requiredUnless("broker-list")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
"If specified without value, then it defaults to 'gzip'")
Expand Down Expand Up @@ -216,11 +225,17 @@ object ConsoleProducer {
options = tryParse(parser, args)

CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from standard input and publish it to Kafka.")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt)

CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)

val topic = options.valueOf(topicOpt)

val bootstrapServer = options.valueOf(bootstrapServerOpt)
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)

val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
ToolsUtils.validatePortOrDie(parser, brokerHostsAndPorts)

val sync = options.has(syncOpt)
val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
val compressionCodec = if (options.has(compressionCodecOpt))
Expand Down
31 changes: 24 additions & 7 deletions core/src/main/scala/kafka/tools/ConsumerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}

import com.typesafe.scalalogging.LazyLogging
import joptsimple.{OptionException, OptionParser, OptionSet}
import kafka.utils.{CommandLineUtils, ToolsUtils}
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
Expand Down Expand Up @@ -202,9 +203,14 @@ object ConsumerPerformance extends LazyLogging {
}

class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val bootstrapServersOpt = parser.accepts("broker-list", "REQUIRED: The server(s) to connect to.")
.withRequiredArg()
.describedAs("host")
val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg
.describedAs("broker-list")
.ofType(classOf[String])
val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.")
.requiredUnless("broker-list")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
Expand Down Expand Up @@ -250,11 +256,10 @@ object ConsumerPerformance extends LazyLogging {
.ofType(classOf[Long])
.defaultsTo(10000)

options = parser.parse(args: _*)

options = tryParse(parser, args)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer")

CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt, bootstrapServersOpt)
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)

val printMetrics = options.has(printMetricsOpt)

Expand All @@ -264,7 +269,10 @@ object ConsumerPerformance extends LazyLogging {
new Properties

import org.apache.kafka.clients.consumer.ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))

val brokerHostsAndPorts = options.valueOf(if (options.has(bootstrapServerOpt)) bootstrapServerOpt else brokerListOpt)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts)

props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
Expand All @@ -283,5 +291,14 @@ object ConsumerPerformance extends LazyLogging {
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val recordFetchTimeoutMs = options.valueOf(recordFetchTimeoutOpt).longValue()

def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
}
}
}
}
42 changes: 37 additions & 5 deletions core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.utils.Exit

class ConsoleProducerTest {

val validArgs: Array[String] = Array(
val brokerListValidArgs: Array[String] = Array(
"--broker-list",
"localhost:1001,localhost:1002",
"--topic",
Expand All @@ -37,20 +37,45 @@ class ConsoleProducerTest {
"--property",
"key.separator=#"
)

val bootstrapServerValidArgs: Array[String] = Array(
"--bootstrap-server",
"localhost:1003,localhost:1004",
"--topic",
"t3",
"--property",
"parse.key=true",
"--property",
"key.separator=#"
)
val invalidArgs: Array[String] = Array(
"--t", // not a valid argument
"t3"
)
val bootstrapServerOverride: Array[String] = Array(
"--broker-list",
"localhost:1001",
"--bootstrap-server",
"localhost:1002",
"--topic",
"t3",
)

@Test
def testValidConfigs(): Unit = {
val config = new ConsoleProducer.ProducerConfig(validArgs)
def testValidConfigsBrokerList(): Unit = {
val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
assertEquals(util.Arrays.asList("localhost:1001", "localhost:1002"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
}

@Test
def testValidConfigsBootstrapServer(): Unit = {
val config = new ConsoleProducer.ProducerConfig(bootstrapServerValidArgs)
val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
assertEquals(util.Arrays.asList("localhost:1003", "localhost:1004"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
}

@Test(expected = classOf[IllegalArgumentException])
def testInvalidConfigs(): Unit = {
Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
Expand All @@ -63,11 +88,18 @@ class ConsoleProducerTest {

@Test
def testParseKeyProp(): Unit = {
val config = new ConsoleProducer.ProducerConfig(validArgs)
val config = new ConsoleProducer.ProducerConfig(brokerListValidArgs)
val reader = Class.forName(config.readerClass).getDeclaredConstructor().newInstance().asInstanceOf[LineMessageReader]
reader.init(System.in,ConsoleProducer.getReaderProps(config))
assert(reader.keySeparator == "#")
assert(reader.parseKey)
}

@Test
def testBootstrapServerOverride(): Unit = {
val config = new ConsoleProducer.ProducerConfig(bootstrapServerOverride)
val producerConfig = new ProducerConfig(ConsoleProducer.producerProps(config))
assertEquals(util.Arrays.asList("localhost:1002"),
producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
}
}
41 changes: 39 additions & 2 deletions core/src/test/scala/unit/kafka/tools/ConsumerPerformanceTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ConsumerPerformanceTest {
}

@Test
def testConfig(): Unit = {
def testConfigBrokerList(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9092",
Expand All @@ -54,7 +54,44 @@ class ConsumerPerformanceTest {
val config = new ConsumerPerformance.ConsumerPerfConfig(args)

//Then
assertEquals("localhost:9092", config.options.valueOf(config.bootstrapServersOpt))
assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts))
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}

@Test
def testConfigBootStrapServer(): Unit = {
//Given
val args: Array[String] = Array(
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
)

//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)

//Then
assertEquals("localhost:9092", config.options.valueOf(config.brokerHostsAndPorts))
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}

@Test
def testBrokerListOverride(): Unit = {
//Given
val args: Array[String] = Array(
"--broker-list", "localhost:9094",
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
)

//When
val config = new ConsumerPerformance.ConsumerPerfConfig(args)

//Then
assertEquals("localhost:9092", config.brokerHostsAndPorts)
assertEquals("test", config.topic)
assertEquals(10, config.numMessages)
}
Expand Down
4 changes: 2 additions & 2 deletions docs/quickstart.html
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ <h4><a id="quickstart_send" href="#quickstart_send">Step 4: Send some messages</
Run the producer and then type a few messages into the console to send to the server.</p>

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
This is a message
This is another message
</pre>
Expand Down Expand Up @@ -162,7 +162,7 @@ <h4><a id="quickstart_multibroker" href="#quickstart_multibroker">Step 6: Settin
Let's publish a few messages to our new topic:
</p>
<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
Expand Down
2 changes: 1 addition & 1 deletion docs/security.html
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ <h5><a id="security_configcerthostname" href="#security_configcerthstname">Confi
<br>
Examples using console-producer and console-consumer:
<pre class="brush: bash;">
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties</pre>
</li>
</ol>
Expand Down
8 changes: 4 additions & 4 deletions docs/streams/quickstart.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ <h4><a id="quickstart_streams_start" href="#quickstart_streams_start">Step 4: St
Now we can start the console producer in a separate terminal to write some input data to this topic:

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
</pre>

and inspect the output of the WordCount demo application by reading from its output topic with the console consumer in a separate terminal:
Expand All @@ -212,7 +212,7 @@ <h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5
(in practice, input data for applications will typically be streaming continuously into Kafka, rather than being manually entered as we do in this quickstart):

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
</pre>

Expand Down Expand Up @@ -246,7 +246,7 @@ <h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5
Your terminal should look as follows:

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
</pre>
Expand Down Expand Up @@ -279,7 +279,7 @@ <h4><a id="quickstart_streams_process" href="#quickstart_streams_process">Step 5
Let's enter one final input text line "join kafka summit" and hit &lt;RETURN&gt; in the console producer to the input topic <b>streams-plaintext-input</b> before we wrap up this quickstart:

<pre class="brush: bash;">
&gt; bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
&gt; bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
join kafka summit
Expand Down
33 changes: 27 additions & 6 deletions tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -504,14 +505,23 @@ private static ArgumentParser argParser() {
.newArgumentParser("verifiable-consumer")
.defaultHelp(true)
.description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");

parser.addArgument("--broker-list")
MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
.description("Group of arguments for connection to brokers")
.required(true);
connectionGroup.addArgument("--bootstrap-server")
.action(store())
.required(true)
.required(false)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("bootstrapServer")
.help("REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
connectionGroup.addArgument("--broker-list")
.action(store())
.required(false)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
.help("DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--topic")
.action(store())
Expand Down Expand Up @@ -598,6 +608,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]

boolean useAutoCommit = res.getBoolean("useAutoCommit");
String configFile = res.getString("consumer.config");
String brokerHostandPort = null;

Properties consumerProps = new Properties();
if (configFile != null) {
Expand All @@ -614,7 +625,18 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[]
if (groupInstanceId != null) {
consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));


if (res.get("bootstrapServer") != null) {
brokerHostandPort = res.getString("bootstrapServer");
} else if (res.getString("brokerList") != null) {
brokerHostandPort = res.getString("brokerList");
} else {
parser.printHelp();
Exit.exit(0);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort);

consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy"));
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(res.getInt("sessionTimeout")));
Expand Down Expand Up @@ -643,7 +665,6 @@ public static void main(String[] args) {
parser.printHelp();
Exit.exit(0);
}

try {
final VerifiableConsumer consumer = createFromArgs(parser, args);
Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close());
Expand Down
Loading

0 comments on commit 96c69da

Please sign in to comment.