From e9b926f3799a5474120e483ee4cde6bc03f886ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sa=C3=BAl=20Pi=C3=B1a?= Date: Sat, 29 Jun 2024 14:24:29 -0500 Subject: [PATCH] add native producer and consumer --- .../java/kafka/sandbox/cli/KafkaClients.java | 2 +- kafka-native-clients/build.gradle | 35 ++++++++ .../src/main/java/kafka/sandbox/App.java | 29 +++++++ .../main/java/kafka/sandbox/cli/Consumer.java | 80 ++++++++++++++++++ .../kafka/sandbox/cli/ConsumerCommand.java | 51 +++++++++++ .../java/kafka/sandbox/cli/KafkaClients.java | 23 +++++ .../main/java/kafka/sandbox/cli/Producer.java | 49 +++++++++++ .../kafka/sandbox/cli/ProducerCommand.java | 61 ++++++++++++++ .../cli/consumers/BooleanConsumer.java | 18 ++++ .../sandbox/cli/consumers/DoubleConsumer.java | 18 ++++ .../sandbox/cli/consumers/FloatConsumer.java | 18 ++++ .../cli/consumers/IntegerConsumer.java | 18 ++++ .../sandbox/cli/consumers/LongConsumer.java | 18 ++++ .../sandbox/cli/consumers/StringConsumer.java | 18 ++++ .../cli/producers/BooleanProducer.java | 24 ++++++ .../sandbox/cli/producers/DoubleProducer.java | 24 ++++++ .../sandbox/cli/producers/FloatProducer.java | 24 ++++++ .../cli/producers/IntegerProducer.java | 24 ++++++ .../sandbox/cli/producers/LongProducer.java | 24 ++++++ .../sandbox/cli/producers/StringProducer.java | 25 ++++++ .../src/main/resources/consumer.properties | 7 ++ .../src/main/resources/producer.properties | 4 + md/SUMMARY.md | 1 + md/avro-producer-and-consumer.md | 2 +- md/native-producer-and-consumer.md | 1 + md/producing-and-consuming-natives.md | 84 +++++++++++++++++++ settings.gradle | 1 + 27 files changed, 681 insertions(+), 2 deletions(-) create mode 100644 kafka-native-clients/build.gradle create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/App.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/Consumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/ConsumerCommand.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/Producer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/ProducerCommand.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/BooleanConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/DoubleConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/FloatConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/IntegerConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/LongConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/StringConsumer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/BooleanProducer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/DoubleProducer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/FloatProducer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/IntegerProducer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/LongProducer.java create mode 100644 kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/StringProducer.java create mode 100644 kafka-native-clients/src/main/resources/consumer.properties create mode 100644 kafka-native-clients/src/main/resources/producer.properties create mode 100644 md/native-producer-and-consumer.md create mode 100644 md/producing-and-consuming-natives.md diff --git a/kafka-json-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java b/kafka-json-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java index 194ab4c..07c17e6 100644 --- a/kafka-json-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java +++ b/kafka-json-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java @@ -7,7 +7,7 @@ import static picocli.CommandLine.Spec; @Command( - name = "kafka-avro-clients", + name = "kafka-json-clients", description = "Allows you either to produce or consume topic", synopsisSubcommandLabel = "COMMAND" ) diff --git a/kafka-native-clients/build.gradle b/kafka-native-clients/build.gradle new file mode 100644 index 0000000..30613ba --- /dev/null +++ b/kafka-native-clients/build.gradle @@ -0,0 +1,35 @@ +plugins { + id 'java' + id 'application' +} + +repositories { + mavenCentral() + maven { + url = 'https://packages.confluent.io/maven/' + } +} + +dependencies { + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2' + + implementation "org.apache.kafka:kafka-clients:${kafkaVersion}" + + implementation 'info.picocli:picocli:4.6.1' + implementation 'net.datafaker:datafaker:2.0.2' + implementation 'org.slf4j:slf4j-simple:1.7.30' + + compileOnly 'org.projectlombok:lombok:1.18.26' + annotationProcessor 'org.projectlombok:lombok:1.18.26' + + testCompileOnly 'org.projectlombok:lombok:1.18.26' + testAnnotationProcessor 'org.projectlombok:lombok:1.18.26' +} + +application { + mainClass = 'kafka.sandbox.App' +} + +test { + useJUnitPlatform() +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/App.java b/kafka-native-clients/src/main/java/kafka/sandbox/App.java new file mode 100644 index 0000000..077caae --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/App.java @@ -0,0 +1,29 @@ +package kafka.sandbox; + +import kafka.sandbox.cli.ConsumerCommand; +import kafka.sandbox.cli.KafkaClients; +import kafka.sandbox.cli.ProducerCommand; +import picocli.CommandLine; + +import java.io.IOException; +import java.util.Properties; + +public class App { + + public static void main(String[] args) throws IOException { + Properties producerProps = getProperties("producer.properties"); + Properties consumerProps = getProperties("consumer.properties"); + + CommandLine commandLine = new CommandLine(new KafkaClients()) + .addSubcommand(new ProducerCommand(producerProps)) + .addSubcommand(new ConsumerCommand(consumerProps)); + + System.exit(commandLine.execute(args)); + } + + private static Properties getProperties(String fileName) throws IOException { + Properties props = new Properties(); + props.load(App.class.getClassLoader().getResourceAsStream(fileName)); + return props; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/Consumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/Consumer.java new file mode 100644 index 0000000..a25ea90 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/Consumer.java @@ -0,0 +1,80 @@ +package kafka.sandbox.cli; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +@Slf4j +public abstract class Consumer { + + private final KafkaConsumer consumer; + + public Consumer(Properties props) { + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getDeserializer()); + consumer = new KafkaConsumer<>(props); + } + + public void consume(String topic) { + consumer.subscribe(Collections.singleton(topic)); + CountDownLatch latch = new CountDownLatch(1); + Runtime + .getRuntime() + .addShutdownHook( + new Thread("consumer-shutdown-hook") { + @Override + public void run() { + consumer.wakeup(); + latch.countDown(); + } + } + ); + + Thread infiniteLoop = new Thread( + () -> { + try { + while (true) { + ConsumerRecords records = consumer.poll( + Duration.ofMillis(500) + ); + for (ConsumerRecord record : records) { + log.info( + "Consumed message: topic = {}, partition = {}, offset = {}, value = {}", + record.topic(), + record.partition(), + record.offset(), + record.value() + ); + } + consumer.commitSync(); + } + } catch (RecordDeserializationException rde) { + log.warn("{}", rde.getMessage()); + } catch (WakeupException we) { + log.info("Shutdown gracefully"); + } finally { + consumer.close(); + } + }, + "consumer-thread" + ); + + infiniteLoop.start(); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public abstract Class> getDeserializer(); +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/ConsumerCommand.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/ConsumerCommand.java new file mode 100644 index 0000000..f9fdf5f --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/ConsumerCommand.java @@ -0,0 +1,51 @@ +package kafka.sandbox.cli; + +import kafka.sandbox.cli.consumers.*; +import lombok.extern.slf4j.Slf4j; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +import java.util.Properties; +import java.util.concurrent.Callable; + +@Slf4j +@Command(name = "consume", description = "Consumes messages from topic") +public class ConsumerCommand implements Callable { + + private final Properties props; + + @CommandLine.Parameters( + index = "0", + description = "Topic name" + ) + private String topic; + + @CommandLine.Parameters( + index = "1", + description = "Type", + defaultValue = "string" + ) + private String type; + + public ConsumerCommand(Properties props) { + this.props = props; + } + + @Override + public Integer call() { + getConsumer().consume(topic); + return CommandLine.ExitCode.OK; + } + + private Consumer getConsumer() { + return switch (type) { + case "integer" -> new IntegerConsumer(props); + case "string" -> new StringConsumer(props); + case "long" -> new LongConsumer(props); + case "double" -> new DoubleConsumer(props); + case "float" -> new FloatConsumer(props); + case "boolean" -> new BooleanConsumer(props); + default -> throw new RuntimeException("Type not allowed"); + }; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java new file mode 100644 index 0000000..272d2e6 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/KafkaClients.java @@ -0,0 +1,23 @@ +package kafka.sandbox.cli; + +import picocli.CommandLine.Command; +import picocli.CommandLine.Model.CommandSpec; + +import static picocli.CommandLine.ParameterException; +import static picocli.CommandLine.Spec; + +@Command( + name = "kafka-native-clients", + description = "Allows you either to produce or consume topic", + synopsisSubcommandLabel = "COMMAND" +) +public class KafkaClients implements Runnable { + + @Spec + private CommandSpec spec; + + @Override + public void run() { + throw new ParameterException(spec.commandLine(), "Missing required subcommand"); + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/Producer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/Producer.java new file mode 100644 index 0000000..978e4ad --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/Producer.java @@ -0,0 +1,49 @@ +package kafka.sandbox.cli; + +import lombok.extern.slf4j.Slf4j; +import net.datafaker.Faker; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +@Slf4j +public abstract class Producer { + + protected final Faker faker = new Faker(); + private final KafkaProducer producer; + + public Producer(Properties props) { + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer()); + producer = new KafkaProducer<>(props); + } + + public void produce(String topic, int message) { + for (int i = 0; i < message; i++) { + V value = newMessage(); + ProducerRecord record = new ProducerRecord<>( + topic, + value + ); + producer.send( + record, + (metadata, exception) -> { + if (exception != null) { + log.error("Error producing {}", value, exception); + return; + } + log.info("Producing message: {}", value); + } + ); + } + + producer.flush(); + producer.close(); + } + + public abstract V newMessage(); + + public abstract Class> getSerializer(); +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/ProducerCommand.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/ProducerCommand.java new file mode 100644 index 0000000..e284da9 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/ProducerCommand.java @@ -0,0 +1,61 @@ +package kafka.sandbox.cli; + +import kafka.sandbox.cli.producers.*; +import lombok.extern.slf4j.Slf4j; +import picocli.CommandLine; +import picocli.CommandLine.Command; +import picocli.CommandLine.Parameters; + +import java.util.Properties; +import java.util.concurrent.Callable; + +@Slf4j +@Command(name = "produce", description = "Produces messages to topic") +public class ProducerCommand implements Callable { + + private final Properties props; + + + @Parameters( + index = "2", + description = "Total new messages to produce" + ) + private int messages; + + @Parameters( + index = "0", + description = "Topic name" + ) + private String topic; + + @Parameters( + index = "1", + description = "Type", + defaultValue = "string" + ) + private String type; + + + public ProducerCommand(Properties props) { + this.props = props; + } + + @Override + public Integer call() { + getProducer().produce(topic, messages); + return CommandLine.ExitCode.OK; + } + + private Producer getProducer() { + return switch (type) { + case "integer" -> new IntegerProducer(props); + case "string" -> new StringProducer(props); + case "long" -> new LongProducer(props); + case "double" -> new DoubleProducer(props); + case "float" -> new FloatProducer(props); + case "boolean" -> new BooleanProducer(props); + default -> throw new RuntimeException("Type not allowed"); + }; + } + +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/BooleanConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/BooleanConsumer.java new file mode 100644 index 0000000..7ddca49 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/BooleanConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.BooleanDeserializer; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Properties; + +public class BooleanConsumer extends Consumer { + public BooleanConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return BooleanDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/DoubleConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/DoubleConsumer.java new file mode 100644 index 0000000..a3c6848 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/DoubleConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; + +import java.util.Properties; + +public class DoubleConsumer extends Consumer { + public DoubleConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return DoubleDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/FloatConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/FloatConsumer.java new file mode 100644 index 0000000..6fa683a --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/FloatConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.FloatDeserializer; + +import java.util.Properties; + +public class FloatConsumer extends Consumer { + public FloatConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return FloatDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/IntegerConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/IntegerConsumer.java new file mode 100644 index 0000000..1647ef7 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/IntegerConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; + +import java.util.Properties; + +public class IntegerConsumer extends Consumer { + public IntegerConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return IntegerDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/LongConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/LongConsumer.java new file mode 100644 index 0000000..0dd4634 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/LongConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; + +import java.util.Properties; + +public class LongConsumer extends Consumer { + public LongConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return LongDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/StringConsumer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/StringConsumer.java new file mode 100644 index 0000000..1e4b448 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/consumers/StringConsumer.java @@ -0,0 +1,18 @@ +package kafka.sandbox.cli.consumers; + +import kafka.sandbox.cli.Consumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +import java.util.Properties; + +public class StringConsumer extends Consumer { + public StringConsumer(Properties props) { + super(props); + } + + @Override + public Class> getDeserializer() { + return StringDeserializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/BooleanProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/BooleanProducer.java new file mode 100644 index 0000000..fb54226 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/BooleanProducer.java @@ -0,0 +1,24 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.BooleanSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +public class BooleanProducer extends Producer { + + public BooleanProducer(Properties props) { + super(props); + } + + @Override + public Boolean newMessage() { + return faker.bool().bool(); + } + + @Override + public Class> getSerializer() { + return BooleanSerializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/DoubleProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/DoubleProducer.java new file mode 100644 index 0000000..f7c7a20 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/DoubleProducer.java @@ -0,0 +1,24 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +public class DoubleProducer extends Producer { + + public DoubleProducer(Properties props) { + super(props); + } + + @Override + public Double newMessage() { + return faker.number().randomDouble(2, 500, 100); + } + + @Override + public Class> getSerializer() { + return DoubleSerializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/FloatProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/FloatProducer.java new file mode 100644 index 0000000..8171770 --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/FloatProducer.java @@ -0,0 +1,24 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +public class FloatProducer extends Producer { + + public FloatProducer(Properties props) { + super(props); + } + + @Override + public Float newMessage() { + return (float) faker.number().randomDouble(2, 500, 100); + } + + @Override + public Class> getSerializer() { + return FloatSerializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/IntegerProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/IntegerProducer.java new file mode 100644 index 0000000..c0c666a --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/IntegerProducer.java @@ -0,0 +1,24 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +public class IntegerProducer extends Producer { + + public IntegerProducer(Properties props) { + super(props); + } + + @Override + public Integer newMessage() { + return faker.number().numberBetween(100, 200); + } + + @Override + public Class> getSerializer() { + return IntegerSerializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/LongProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/LongProducer.java new file mode 100644 index 0000000..486ff8a --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/LongProducer.java @@ -0,0 +1,24 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +public class LongProducer extends Producer { + + public LongProducer(Properties props) { + super(props); + } + + @Override + public Long newMessage() { + return faker.number().numberBetween(100L, 200L); + } + + @Override + public Class> getSerializer() { + return LongSerializer.class; + } +} diff --git a/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/StringProducer.java b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/StringProducer.java new file mode 100644 index 0000000..8788f4d --- /dev/null +++ b/kafka-native-clients/src/main/java/kafka/sandbox/cli/producers/StringProducer.java @@ -0,0 +1,25 @@ +package kafka.sandbox.cli.producers; + +import kafka.sandbox.cli.Producer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + +public class StringProducer extends Producer { + + public StringProducer(Properties props) { + super(props); + } + + @Override + public String newMessage() { + return faker.name().fullName(); + } + + @Override + public Class> getSerializer() { + return StringSerializer.class; + } +} + diff --git a/kafka-native-clients/src/main/resources/consumer.properties b/kafka-native-clients/src/main/resources/consumer.properties new file mode 100644 index 0000000..58fcd62 --- /dev/null +++ b/kafka-native-clients/src/main/resources/consumer.properties @@ -0,0 +1,7 @@ +bootstrap.servers=localhost:19092,localhost:29092,localhost:39092 +group.id=client.consumer +enable.auto.commit=false +auto.offset.reset=earliest +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +specific.avro.reader=true +client.id=client.consumer \ No newline at end of file diff --git a/kafka-native-clients/src/main/resources/producer.properties b/kafka-native-clients/src/main/resources/producer.properties new file mode 100644 index 0000000..f27ba10 --- /dev/null +++ b/kafka-native-clients/src/main/resources/producer.properties @@ -0,0 +1,4 @@ +bootstrap.servers=localhost:19092,localhost:29092,localhost:39092 +key.serializer=org.apache.kafka.common.serialization.StringSerializer +acks=1 +client.id=client.producer \ No newline at end of file diff --git a/md/SUMMARY.md b/md/SUMMARY.md index ae78708..0cb002b 100644 --- a/md/SUMMARY.md +++ b/md/SUMMARY.md @@ -6,6 +6,7 @@ - [Quick Start](quick-start.md) - [What is Kafka?](what-is-kafka.md) - [Consuming and Producing](consuming-and-producing.md) + - [Native Consumer and Producer](producing-and-consuming-natives.md) - [Message Schemas](message-schemas.md) - [What is Kafka Connect?](what-is-kafka-connect.md) - [Kafka Connect Database Example](kafka-connect-database-example.md) diff --git a/md/avro-producer-and-consumer.md b/md/avro-producer-and-consumer.md index 6a88cd6..a9ad3ed 100644 --- a/md/avro-producer-and-consumer.md +++ b/md/avro-producer-and-consumer.md @@ -46,7 +46,7 @@ As you can see now we are using the autogenerated class `Supplier`. KafkaProducer producer = new KafkaProducer<>(props); for (int i = 0; i < messages; i++) { - Supplier supplier = createNewCustomer(); + Supplier supplier = createNew(); ProducerRecord record = new ProducerRecord<>( topic, supplier.getId().toString(), diff --git a/md/native-producer-and-consumer.md b/md/native-producer-and-consumer.md new file mode 100644 index 0000000..cff7027 --- /dev/null +++ b/md/native-producer-and-consumer.md @@ -0,0 +1 @@ +# Native Consumer and Producer diff --git a/md/producing-and-consuming-natives.md b/md/producing-and-consuming-natives.md new file mode 100644 index 0000000..693bb14 --- /dev/null +++ b/md/producing-and-consuming-natives.md @@ -0,0 +1,84 @@ +# Producing and Consuming Natives + +Now we are going to develop consumers and producer with java. + +## Setup + +Create a topic: + +```bash +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.string + +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.integer + +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.long + +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.float + +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.double + +kafka-topics --create --bootstrap-server localhost:19092 \ + --replication-factor 3 \ + --partitions 3 \ + --topic client.boolean +``` + +### Produce + +```java +props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer()); +KafkaProducer producer = new KafkaProducer<>(props); + +for (int i = 0; i < messages; i++) { + V value = createValue(); + ProducerRecord record = new ProducerRecord<>( + topic, + value + ); + producer.send( + record, + (metadata, exception) -> log.info("Producing message: {}", value) + ); +} +``` + +```bash +./gradlew kafka-native-clients:run --args="produce client.string string 100" +./gradlew kafka-native-clients:run --args="produce client.integer integer 100" +./gradlew kafka-native-clients:run --args="produce client.long long 100" +./gradlew kafka-native-clients:run --args="produce client.float float 100" +./gradlew kafka-native-clients:run --args="produce client.double double 100" +./gradlew kafka-native-clients:run --args="produce client.boolean boolean 100" +``` + +### Consume + +```java +props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getDeserializer()); +KafkaConsumer consumer = new KafkaConsumer<>(props); + +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + +for (ConsumerRecord record : records) { + log.info("Supplier ID: {}", record.value()); +} +``` + +```bash +./gradlew kafka-native-clients:run --args="consume client. " +``` diff --git a/settings.gradle b/settings.gradle index 50840c7..d2fcc17 100644 --- a/settings.gradle +++ b/settings.gradle @@ -3,6 +3,7 @@ include("kafka-avro") include("kafka-avro-clients") include("kafka-avro-union-clients") include("kafka-json-clients") +include("kafka-native-clients") include("kafka-streams") include("kafka-spring-boot") include("kafka-ksqldb-extensions")