Skip to content

Commit

Permalink
add native producer and consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jun 29, 2024
1 parent 2350275 commit e9b926f
Show file tree
Hide file tree
Showing 27 changed files with 681 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-avro-clients",
name = "kafka-json-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
Expand Down
35 changes: 35 additions & 0 deletions kafka-native-clients/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
id 'java'
id 'application'
}

repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'net.datafaker:datafaker:2.0.2'
implementation 'org.slf4j:slf4j-simple:1.7.30'

compileOnly 'org.projectlombok:lombok:1.18.26'
annotationProcessor 'org.projectlombok:lombok:1.18.26'

testCompileOnly 'org.projectlombok:lombok:1.18.26'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.26'
}

application {
mainClass = 'kafka.sandbox.App'
}

test {
useJUnitPlatform()
}
29 changes: 29 additions & 0 deletions kafka-native-clients/src/main/java/kafka/sandbox/App.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka.sandbox;

import kafka.sandbox.cli.ConsumerCommand;
import kafka.sandbox.cli.KafkaClients;
import kafka.sandbox.cli.ProducerCommand;
import picocli.CommandLine;

import java.io.IOException;
import java.util.Properties;

public class App {

public static void main(String[] args) throws IOException {
Properties producerProps = getProperties("producer.properties");
Properties consumerProps = getProperties("consumer.properties");

CommandLine commandLine = new CommandLine(new KafkaClients())
.addSubcommand(new ProducerCommand(producerProps))
.addSubcommand(new ConsumerCommand(consumerProps));

System.exit(commandLine.execute(args));
}

private static Properties getProperties(String fileName) throws IOException {
Properties props = new Properties();
props.load(App.class.getClassLoader().getResourceAsStream(fileName));
return props;
}
}
80 changes: 80 additions & 0 deletions kafka-native-clients/src/main/java/kafka/sandbox/cli/Consumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package kafka.sandbox.cli;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

@Slf4j
public abstract class Consumer<V> {

private final KafkaConsumer<String, V> consumer;

public Consumer(Properties props) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getDeserializer());
consumer = new KafkaConsumer<>(props);
}

public void consume(String topic) {
consumer.subscribe(Collections.singleton(topic));
CountDownLatch latch = new CountDownLatch(1);
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
);

Thread infiniteLoop = new Thread(
() -> {
try {
while (true) {
ConsumerRecords<String, V> records = consumer.poll(
Duration.ofMillis(500)
);
for (ConsumerRecord<String, V> record : records) {
log.info(
"Consumed message: topic = {}, partition = {}, offset = {}, value = {}",
record.topic(),
record.partition(),
record.offset(),
record.value()
);
}
consumer.commitSync();
}
} catch (RecordDeserializationException rde) {
log.warn("{}", rde.getMessage());
} catch (WakeupException we) {
log.info("Shutdown gracefully");
} finally {
consumer.close();
}
},
"consumer-thread"
);

infiniteLoop.start();
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public abstract Class<? extends Deserializer<?>> getDeserializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package kafka.sandbox.cli;

import kafka.sandbox.cli.consumers.*;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.util.Properties;
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "consume", description = "Consumes messages from topic")
public class ConsumerCommand implements Callable<Integer> {

private final Properties props;

@CommandLine.Parameters(
index = "0",
description = "Topic name"
)
private String topic;

@CommandLine.Parameters(
index = "1",
description = "Type",
defaultValue = "string"
)
private String type;

public ConsumerCommand(Properties props) {
this.props = props;
}

@Override
public Integer call() {
getConsumer().consume(topic);
return CommandLine.ExitCode.OK;
}

private Consumer<?> getConsumer() {
return switch (type) {
case "integer" -> new IntegerConsumer(props);
case "string" -> new StringConsumer(props);
case "long" -> new LongConsumer(props);
case "double" -> new DoubleConsumer(props);
case "float" -> new FloatConsumer(props);
case "boolean" -> new BooleanConsumer(props);
default -> throw new RuntimeException("Type not allowed");
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.sandbox.cli;

import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;

import static picocli.CommandLine.ParameterException;
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-native-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {

@Spec
private CommandSpec spec;

@Override
public void run() {
throw new ParameterException(spec.commandLine(), "Missing required subcommand");
}
}
49 changes: 49 additions & 0 deletions kafka-native-clients/src/main/java/kafka/sandbox/cli/Producer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafka.sandbox.cli;

import lombok.extern.slf4j.Slf4j;
import net.datafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Properties;

@Slf4j
public abstract class Producer<V> {

protected final Faker faker = new Faker();
private final KafkaProducer<String, V> producer;

public Producer(Properties props) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer());
producer = new KafkaProducer<>(props);
}

public void produce(String topic, int message) {
for (int i = 0; i < message; i++) {
V value = newMessage();
ProducerRecord<String, V> record = new ProducerRecord<>(
topic,
value
);
producer.send(
record,
(metadata, exception) -> {
if (exception != null) {
log.error("Error producing {}", value, exception);
return;
}
log.info("Producing message: {}", value);
}
);
}

producer.flush();
producer.close();
}

public abstract V newMessage();

public abstract Class<? extends Serializer<?>> getSerializer();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package kafka.sandbox.cli;

import kafka.sandbox.cli.producers.*;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

import java.util.Properties;
import java.util.concurrent.Callable;

@Slf4j
@Command(name = "produce", description = "Produces messages to topic")
public class ProducerCommand implements Callable<Integer> {

private final Properties props;


@Parameters(
index = "2",
description = "Total new messages to produce"
)
private int messages;

@Parameters(
index = "0",
description = "Topic name"
)
private String topic;

@Parameters(
index = "1",
description = "Type",
defaultValue = "string"
)
private String type;


public ProducerCommand(Properties props) {
this.props = props;
}

@Override
public Integer call() {
getProducer().produce(topic, messages);
return CommandLine.ExitCode.OK;
}

private Producer<?> getProducer() {
return switch (type) {
case "integer" -> new IntegerProducer(props);
case "string" -> new StringProducer(props);
case "long" -> new LongProducer(props);
case "double" -> new DoubleProducer(props);
case "float" -> new FloatProducer(props);
case "boolean" -> new BooleanProducer(props);
default -> throw new RuntimeException("Type not allowed");
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kafka.sandbox.cli.consumers;

import kafka.sandbox.cli.Consumer;
import org.apache.kafka.common.serialization.BooleanDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Properties;

public class BooleanConsumer extends Consumer<Boolean> {
public BooleanConsumer(Properties props) {
super(props);
}

@Override
public Class<? extends Deserializer<?>> getDeserializer() {
return BooleanDeserializer.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kafka.sandbox.cli.consumers;

import kafka.sandbox.cli.Consumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.DoubleDeserializer;

import java.util.Properties;

public class DoubleConsumer extends Consumer<Double> {
public DoubleConsumer(Properties props) {
super(props);
}

@Override
public Class<? extends Deserializer<?>> getDeserializer() {
return DoubleDeserializer.class;
}
}
Loading

0 comments on commit e9b926f

Please sign in to comment.