diff --git a/README.md b/README.md index 178d82e..e13e23a 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ This repository has the complete code related to kafka producers/consumers using spring boot. +## Kafka SetUp +- [Setup-Kafka-Using-Docker](SetUpKafkaDocker.md) + - [Setup-Kafka](https://github.com/dilipsundarraj1/kafka-for-developers-using-spring-boot/blob/master/SetUpKafka.md) diff --git a/SetUpKafkaDocker.md b/SetUpKafkaDocker.md new file mode 100644 index 0000000..bf18c2d --- /dev/null +++ b/SetUpKafkaDocker.md @@ -0,0 +1,231 @@ +# Set Up Kafka in Local using Docker + +## Set up broker and zookeeper + +- Navigate to the path where the **docker-compose.yml** is located and then run the below command. + +``` +docker-compose up +``` + +## Producer and Consume the Messages + +- Let's going to the container by running the below command. + +``` +docker exec -it kafka1 bash +``` + +- Create a Kafka topic using the **kafka-topics** command. + - **kafka1:19092** refers to the **KAFKA_ADVERTISED_LISTENERS** in the docker-compose.yml file. + +``` +kafka-topics --bootstrap-server kafka1:19092 \ + --create \ + --topic test-topic \ + --replication-factor 1 --partitions 1 +``` + +- Produce Messages to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server kafka1:19092 \ + --topic test-topic +``` + +- Consume Messages from the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --from-beginning +``` + +## Producer and Consume the Messages With Key and Value + +- Produce Messages with Key and Value to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --property "key.separator=-" --property "parse.key=true" +``` + +- Consuming messages with Key and Value from a topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic \ + --from-beginning \ + --property "key.separator= - " --property "print.key=true" +``` + +### Consume Messages using Consumer Groups + + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server kafka1:19092 \ + --topic test-topic --group console-consumer-41911\ + --property "key.separator= - " --property "print.key=true" +``` + +- Example Messages: + +``` +a-abc +b-bus +``` + +### Set up a Kafka Cluster with 3 brokers + +- Run this command and this will spin up a kafka cluster with 3 brokers. + +``` +docker-compose -f docker-compose-multi-broker.yml up +``` + +- Create topic with the replication factor as 3 + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 \ + --create \ + --topic test-topic \ + --replication-factor 3 --partitions 3 +``` + +- Produce Messages to the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-producer --bootstrap-server localhost:9092,kafka2:19093,kafka3:19094 \ + --topic test-topic +``` + +- Consume Messages from the topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-console-consumer --bootstrap-server localhost:9092,kafka2:19093,kafka3:19094 \ + --topic test-topic \ + --from-beginning +``` +#### Log files in Multi Kafka Cluster + +- Log files will be created for each partition in each of the broker instance of the Kafka cluster. + - Login to the container **kafka1**. + ``` + docker exec -it kafka1 bash + ``` + - Login to the container **kafka2**. + ``` + docker exec -it kafka2 bash + ``` + +- Shutdown the kafka cluster + +``` +docker-compose -f docker-compose-multi-broker.yml down +``` + +### Setting up min.insync.replica + +- Topic - test-topic + +``` +docker exec --interactive --tty kafka1 \ +kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name test-topic \ +--alter --add-config min.insync.replicas=2 +``` + +- Topic - library-events + +``` +docker exec --interactive --tty kafka1 \ +kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name library-events \ +--alter --add-config min.insync.replicas=2 +``` +## Advanced Kafka Commands + +### List the topics in a cluster + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --list + +``` + +### Describe topic + +- Command to describe all the Kafka topics. + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --describe +``` + +- Command to describe a specific Kafka topic. + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 --describe \ +--topic test-topic +``` + +### Alter topic Partitions + +``` +docker exec --interactive --tty kafka1 \ +kafka-topics --bootstrap-server kafka1:19092 \ +--alter --topic test-topic --partitions 40 +``` + +### How to view consumer groups + +``` +docker exec --interactive --tty kafka1 \ +kafka-consumer-groups --bootstrap-server kafka1:19092 --list +``` + +#### Consumer Groups and their Offset + +``` +docker exec --interactive --tty kafka1 \ +kafka-consumer-groups --bootstrap-server kafka1:19092 \ +--describe --group console-consumer-41911 +``` + +## Log file and related config + +- Log into the container. + +``` +docker exec -it kafka1 bash +``` + +- The config file is present in the below path. + +``` +/etc/kafka/server.properties +``` + +- The log file is present in the below path. + +``` +/var/lib/kafka/data/ +``` + +### How to view the commit log? + +``` +docker exec --interactive --tty kafka1 \ +kafka-run-class kafka.tools.DumpLogSegments \ +--deep-iteration \ +--files /var/lib/kafka/data/test-topic-0/00000000000000000000.log + +``` diff --git a/Undersrading-Kafka-DockerCompose.md b/Undersrading-Kafka-DockerCompose.md new file mode 100644 index 0000000..3c399a7 --- /dev/null +++ b/Undersrading-Kafka-DockerCompose.md @@ -0,0 +1,32 @@ +# Understanding Kafka Docker Compose works + +- More info is available in this link - https://rmoff.net/2018/08/02/kafka-listeners-explained/ + +## Kafka broker docker-compose config: + +``` +kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 +``` +- KAFKA_INTER_BROKER_LISTENER_NAME + - Kafka brokers communicate between themselves, usually on the internal network. This is where the **KAFKA_INTER_BROKER_LISTENER_NAME** property comes in handy. +- KAFKA_ADVERTISED_LISTENERS + - The config that's present in this property is the data that's shared to the clients when they are connected. + - Kafka clients may not be in the network where the kafka broker is running. + - For a broker that's running in the docker network, the client is very much likely possible outside the docker network. diff --git a/docker-compose-multi-broker.yml b/docker-compose-multi-broker.yml new file mode 100644 index 0000000..c581f27 --- /dev/null +++ b/docker-compose-multi-broker.yml @@ -0,0 +1,70 @@ +version: '2.1' + +services: + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + platform: linux/amd64 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + depends_on: + - zoo1 + + kafka2: + image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 + hostname: kafka2 + container_name: kafka2 + ports: + - "9093:9093" + - "29093:29093" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 2 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + depends_on: + - zoo1 + + + kafka3: + image: confluentinc/cp-kafka:7.3.2 + platform: linux/amd64 + hostname: kafka3 + container_name: kafka3 + ports: + - "9094:9094" + - "29094:29094" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 3 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + depends_on: + - zoo1 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..abca084 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,33 @@ +version: '2.1' + +services: + zoo1: + image: confluentinc/cp-zookeeper:7.3.2 + hostname: zoo1 + container_name: zoo1 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_SERVERS: zoo1:2888:3888 + + + kafka1: + image: confluentinc/cp-kafka:7.3.2 + hostname: kafka1 + container_name: kafka1 + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 diff --git a/library-events-consumer/build.gradle b/library-events-consumer/build.gradle index cb5b270..d901fe3 100644 --- a/library-events-consumer/build.gradle +++ b/library-events-consumer/build.gradle @@ -1,12 +1,12 @@ plugins { - id 'org.springframework.boot' version '2.6.5' - id 'io.spring.dependency-management' version '1.0.11.RELEASE' + id 'org.springframework.boot' version '3.0.5' + id 'io.spring.dependency-management' version '1.1.0' id 'java' } group = 'com.learnkafka' version = '0.0.1-SNAPSHOT' -sourceCompatibility = '11' +sourceCompatibility = '17' configurations { compileOnly { diff --git a/library-events-consumer/gradle/wrapper/gradle-wrapper.properties b/library-events-consumer/gradle/wrapper/gradle-wrapper.properties index e62e484..4c6d24e 100644 --- a/library-events-consumer/gradle/wrapper/gradle-wrapper.properties +++ b/library-events-consumer/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Wed Jan 08 05:18:41 CST 2020 -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-all.zip +#Fri Dec 27 05:51:26 CST 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -zipStorePath=wrapper/dists zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip \ No newline at end of file diff --git a/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java b/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java deleted file mode 100644 index d68f03b..0000000 --- a/library-events-consumer/src/main/java/com/learnkafka/config/LibraryEventsConsumerConfigLegacy.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.learnkafka.config; - -import com.learnkafka.service.LibraryEventsService; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.dao.RecoverableDataAccessException; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.retry.RetryPolicy; -import org.springframework.retry.backoff.FixedBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; -import org.springframework.retry.support.RetryTemplate; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -//@Configuration -//@EnableKafka -@Slf4j -public class LibraryEventsConsumerConfigLegacy { - - @Autowired - LibraryEventsService libraryEventsService; - - @Autowired - KafkaProperties kafkaProperties; - - @Bean - @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ObjectProvider> kafkaConsumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory - .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.kafkaProperties.buildConsumerProperties()))); - factory.setConcurrency(3); - // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); - factory.setErrorHandler(((thrownException, data) -> { - log.info("Exception in consumerConfig is {} and the record is {}", thrownException.getMessage(), data); - //persist - })); - factory.setRetryTemplate(retryTemplate()); - factory.setRecoveryCallback((context -> { - if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){ - //invoke recovery logic - log.info("Inside the recoverable logic"); - Arrays.asList(context.attributeNames()) - .forEach(attributeName -> { - log.info("Attribute name is : {} ", attributeName); - log.info("Attribute Value is : {} ", context.getAttribute(attributeName)); - }); - - ConsumerRecord consumerRecord = (ConsumerRecord) context.getAttribute("record"); - libraryEventsService.handleRecovery(consumerRecord); - }else{ - log.info("Inside the non recoverable logic"); - throw new RuntimeException(context.getLastThrowable().getMessage()); - } - - - return null; - })); - return factory; - } - - /* @Bean - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConcurrentKafkaListenerContainerFactoryConfigurer configurer, - ConsumerFactory kafkaConsumerFactory) { - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory); - factory.setConcurrency(3); - // factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); - factory.setErrorHandler(((thrownException, data) -> { - log.info("Exception in consumerConfig is {} and the record is {}", thrownException.getMessage(), data); - //persist - })); - factory.setRetryTemplate(retryTemplate()); - factory.setRecoveryCallback((context -> { - if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){ - //invoke recovery logic - log.info("Inside the recoverable logic"); - Arrays.asList(context.attributeNames()) - .forEach(attributeName -> { - log.info("Attribute name is : {} ", attributeName); - log.info("Attribute Value is : {} ", context.getAttribute(attributeName)); - }); - - ConsumerRecord consumerRecord = (ConsumerRecord) context.getAttribute("record"); - libraryEventsService.handleRecovery(consumerRecord); - }else{ - log.info("Inside the non recoverable logic"); - throw new RuntimeException(context.getLastThrowable().getMessage()); - } - - - return null; - })); - return factory; - }*/ - - private RetryTemplate retryTemplate() { - - FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); - fixedBackOffPolicy.setBackOffPeriod(1000); - RetryTemplate retryTemplate = new RetryTemplate(); - retryTemplate.setRetryPolicy(simpleRetryPolicy()); - retryTemplate.setBackOffPolicy(fixedBackOffPolicy); - return retryTemplate; - } - - private RetryPolicy simpleRetryPolicy() { - - /*SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(); - simpleRetryPolicy.setMaxAttempts(3);*/ - Map, Boolean> exceptionsMap = new HashMap<>(); - exceptionsMap.put(IllegalArgumentException.class, false); - exceptionsMap.put(RecoverableDataAccessException.class, true); - SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3,exceptionsMap,true); - return simpleRetryPolicy; - } -} \ No newline at end of file diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java b/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java index 99847cb..732b9ec 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/Book.java @@ -1,17 +1,15 @@ package com.learnkafka.entity; +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.JoinColumn; +import jakarta.persistence.OneToOne; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.JoinColumn; -import javax.persistence.OneToOne; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java b/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java index 77292b4..9ff24ee 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/FailureRecord.java @@ -1,12 +1,14 @@ package com.learnkafka.entity; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.persistence.*; @AllArgsConstructor @NoArgsConstructor @@ -18,7 +20,7 @@ public class FailureRecord { @GeneratedValue private Integer bookId; private String topic; - private Integer key; + private Integer key_value; private String errorRecord; private Integer partition; private Long offset_value; diff --git a/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java b/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java index 3ba650f..6e20182 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java +++ b/library-events-consumer/src/main/java/com/learnkafka/entity/LibraryEvent.java @@ -1,12 +1,9 @@ package com.learnkafka.entity; +import jakarta.persistence.*; import lombok.*; -import javax.persistence.*; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; - @AllArgsConstructor @NoArgsConstructor @Data diff --git a/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java b/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java index dad6ac8..17f394b 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java +++ b/library-events-consumer/src/main/java/com/learnkafka/scheduler/RetryScheduler.java @@ -1,18 +1,13 @@ package com.learnkafka.scheduler; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.learnkafka.config.LibraryEventsConsumerConfig; -import com.learnkafka.consumer.LibraryEventsConsumer; import com.learnkafka.entity.FailureRecord; -import com.learnkafka.entity.LibraryEvent; import com.learnkafka.jpa.FailureRecordRepository; import com.learnkafka.service.LibraryEventsService; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; //@Component @Slf4j @@ -49,7 +44,7 @@ public void retryFailedRecords(){ private ConsumerRecord buildConsumerRecord(FailureRecord failureRecord) { return new ConsumerRecord<>(failureRecord.getTopic(), - failureRecord.getPartition(), failureRecord.getOffset_value(), failureRecord.getKey(), + failureRecord.getPartition(), failureRecord.getOffset_value(), failureRecord.getKey_value(), failureRecord.getErrorRecord()); } diff --git a/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java b/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java index 2845286..5fb5f55 100644 --- a/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java +++ b/library-events-consumer/src/main/java/com/learnkafka/service/LibraryEventsService.java @@ -75,16 +75,13 @@ public void handleRecovery(ConsumerRecord record){ Integer key = record.key(); String message = record.value(); - ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key, message); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, message, ex); - } + var listenableFuture = kafkaTemplate.sendDefault(key, message); + listenableFuture.whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, message, throwable); + } else { + handleSuccess(key, message, sendResult); - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, message, result); } }); } diff --git a/library-events-producer/build.gradle b/library-events-producer/build.gradle index f161eb9..8314808 100644 --- a/library-events-producer/build.gradle +++ b/library-events-producer/build.gradle @@ -1,12 +1,12 @@ plugins { - id 'org.springframework.boot' version '2.6.5' - id 'io.spring.dependency-management' version '1.0.11.RELEASE' + id 'org.springframework.boot' version '3.0.5' + id 'io.spring.dependency-management' version '1.1.0' id 'java' } group = 'com.learnkafka' version = '0.0.1-SNAPSHOT' -sourceCompatibility = '11' +sourceCompatibility = '17' configurations { compileOnly { @@ -22,6 +22,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-validation' + compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' testImplementation('org.springframework.boot:spring-boot-starter-test') { @@ -36,6 +37,7 @@ sourceSets{ } } -test { +tasks.named('test') { useJUnitPlatform() } + diff --git a/library-events-producer/gradle/wrapper/gradle-wrapper.properties b/library-events-producer/gradle/wrapper/gradle-wrapper.properties index 68a9d70..4c6d24e 100644 --- a/library-events-producer/gradle/wrapper/gradle-wrapper.properties +++ b/library-events-producer/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip \ No newline at end of file diff --git a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java index 85b1897..75f4e71 100644 --- a/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java +++ b/library-events-producer/src/main/java/com/learnkafka/controller/LibraryEventsController.java @@ -1,22 +1,18 @@ package com.learnkafka.controller; import com.fasterxml.jackson.core.JsonProcessingException; -import com.learnkafka.domain.LibraryEvent; -import com.learnkafka.domain.LibraryEventType; +import com.learnkafka.domain.LibraryEventRecord; import com.learnkafka.producer.LibraryEventProducer; +import jakarta.validation.Valid; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.kafka.support.SendResult; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; -import javax.validation.Valid; -import java.util.concurrent.ExecutionException; - @RestController @Slf4j public class LibraryEventsController { @@ -25,10 +21,9 @@ public class LibraryEventsController { LibraryEventProducer libraryEventProducer; @PostMapping("/v1/libraryevent") - public ResponseEntity postLibraryEvent(@RequestBody @Valid LibraryEvent libraryEvent) throws JsonProcessingException, ExecutionException, InterruptedException { + public ResponseEntity postLibraryEvent(@RequestBody @Valid LibraryEventRecord libraryEvent) throws JsonProcessingException { //invoke kafka producer - libraryEvent.setLibraryEventType(LibraryEventType.NEW); libraryEventProducer.sendLibraryEvent_Approach2(libraryEvent); //libraryEventProducer.sendLibraryEvent(libraryEvent); return ResponseEntity.status(HttpStatus.CREATED).body(libraryEvent); @@ -36,14 +31,13 @@ public ResponseEntity postLibraryEvent(@RequestBody @Valid Library //PUT @PutMapping("/v1/libraryevent") - public ResponseEntity putLibraryEvent(@RequestBody @Valid LibraryEvent libraryEvent) throws JsonProcessingException, ExecutionException, InterruptedException { + public ResponseEntity putLibraryEvent(@RequestBody @Valid LibraryEventRecord libraryEvent) throws JsonProcessingException { log.info("LibraryEvent : {} ",libraryEvent ); - if(libraryEvent.getLibraryEventId()==null){ + if(libraryEvent.libraryEventId()==null){ return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Please pass the LibraryEventId"); } - libraryEvent.setLibraryEventType(LibraryEventType.UPDATE); libraryEventProducer.sendLibraryEvent_Approach2(libraryEvent); return ResponseEntity.status(HttpStatus.OK).body(libraryEvent); } diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/Book.java b/library-events-producer/src/main/java/com/learnkafka/domain/Book.java index fcaad45..e5cb115 100644 --- a/library-events-producer/src/main/java/com/learnkafka/domain/Book.java +++ b/library-events-producer/src/main/java/com/learnkafka/domain/Book.java @@ -1,13 +1,13 @@ package com.learnkafka.domain; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java b/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java new file mode 100644 index 0000000..53892d1 --- /dev/null +++ b/library-events-producer/src/main/java/com/learnkafka/domain/BookRecord.java @@ -0,0 +1,13 @@ +package com.learnkafka.domain; + +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; + +public record BookRecord( + @NotNull + Integer bookId, + @NotBlank + String bookName, + @NotBlank + String bookAuthor) { +} diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java index 2252fd0..2d97de6 100644 --- a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java +++ b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEvent.java @@ -1,13 +1,13 @@ package com.learnkafka.domain; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.validation.Valid; -import javax.validation.constraints.NotNull; @AllArgsConstructor @NoArgsConstructor @@ -19,6 +19,6 @@ public class LibraryEvent { private LibraryEventType libraryEventType; @NotNull @Valid - private Book book; + private BookRecord book; } diff --git a/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java new file mode 100644 index 0000000..bdf49fe --- /dev/null +++ b/library-events-producer/src/main/java/com/learnkafka/domain/LibraryEventRecord.java @@ -0,0 +1,13 @@ +package com.learnkafka.domain; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; + +public record LibraryEventRecord( + Integer libraryEventId, + LibraryEventType libraryEventType, + @NotNull + @Valid + BookRecord book +) { +} diff --git a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java index 11e9347..a741722 100644 --- a/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java +++ b/library-events-producer/src/main/java/com/learnkafka/producer/LibraryEventProducer.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; @@ -11,10 +12,9 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -24,53 +24,47 @@ public class LibraryEventProducer { @Autowired - KafkaTemplate kafkaTemplate; + KafkaTemplate kafkaTemplate; String topic = "library-events"; @Autowired ObjectMapper objectMapper; - public void sendLibraryEvent(LibraryEvent libraryEvent) throws JsonProcessingException { + public CompletableFuture> sendLibraryEvent(LibraryEvent libraryEvent) throws JsonProcessingException { Integer key = libraryEvent.getLibraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - ListenableFuture> listenableFuture = kafkaTemplate.sendDefault(key,value); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, value, ex); - } - - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, value, result); - } - }); + var completableFuture = kafkaTemplate.sendDefault(key, value); + return completableFuture + .whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, value, throwable); + } else { + handleSuccess(key, value, sendResult); + + } + }); } - public ListenableFuture> sendLibraryEvent_Approach2(LibraryEvent libraryEvent) throws JsonProcessingException { + public CompletableFuture> sendLibraryEvent_Approach2(LibraryEventRecord libraryEvent) throws JsonProcessingException { - Integer key = libraryEvent.getLibraryEventId(); + Integer key = libraryEvent.libraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - ProducerRecord producerRecord = buildProducerRecord(key, value, topic); - - ListenableFuture> listenableFuture = kafkaTemplate.send(producerRecord); + ProducerRecord producerRecord = buildProducerRecord(key, value, topic); - listenableFuture.addCallback(new ListenableFutureCallback>() { - @Override - public void onFailure(Throwable ex) { - handleFailure(key, value, ex); - } + var completableFuture = kafkaTemplate.send(producerRecord); - @Override - public void onSuccess(SendResult result) { - handleSuccess(key, value, result); - } - }); + return completableFuture + .whenComplete((sendResult, throwable) -> { + if (throwable != null) { + handleFailure(key, value, throwable); + } else { + handleSuccess(key, value, sendResult); - return listenableFuture; + } + }); } private ProducerRecord buildProducerRecord(Integer key, String value, String topic) { @@ -86,9 +80,9 @@ public SendResult sendLibraryEventSynchronous(LibraryEvent libr Integer key = libraryEvent.getLibraryEventId(); String value = objectMapper.writeValueAsString(libraryEvent); - SendResult sendResult=null; + SendResult sendResult = null; try { - sendResult = kafkaTemplate.sendDefault(key,value).get(1, TimeUnit.SECONDS); + sendResult = kafkaTemplate.sendDefault(key, value).get(1, TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException e) { log.error("ExecutionException/InterruptedException Sending the Message and the exception is {}", e.getMessage()); throw e; diff --git a/library-events-producer/src/main/resources/application.yml b/library-events-producer/src/main/resources/application.yml index 4789455..ae87ab1 100644 --- a/library-events-producer/src/main/resources/application.yml +++ b/library-events-producer/src/main/resources/application.yml @@ -21,6 +21,34 @@ spring: properties: bootstrap.servers: localhost:9092,localhost:9093,localhost:9094 --- +spring: + config: + activate: + on-profile: ssl + kafka: + template: + default-topic: library-events + producer: + bootstrap-servers: localhost:9092 + key-serializer: org.apache.kafka.common.serialization.IntegerSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + ssl: + trust-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/kafka.producer.truststore.jks + trust-store-password: confluent + key-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/kafka.producer.keystore.jks + key-store-password: confluent + properties: + acks: all + retries: 10 + retry.backoff.ms: 1000 + security: + protocol: SSL + ssl.endpoint.identification.algorithm: + + admin: + properties: + bootstrap.servers: localhost:9092 +--- spring: config: activate: @@ -33,9 +61,9 @@ spring: key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ssl: - trust-store-location: file:/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.truststore.jks + trust-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.truststore.jks trust-store-password: password - key-store-location: file:/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.keystore.jks + key-store-location: file:/System/Volumes/Data/Dilip/udemy/kafka-for-developers-using-spring-boot/ssl/client.keystore.jks key-store-password: password properties: acks: all diff --git a/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java b/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java index a305b4b..8d1d869 100644 --- a/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java +++ b/library-events-producer/src/test/java/intg/com/learnkafka/controller/LibraryEventsControllerIntegrationTest.java @@ -1,10 +1,10 @@ package com.learnkafka.controller; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.util.TestUtil; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -39,6 +39,9 @@ public class LibraryEventsControllerIntegrationTest { @Autowired EmbeddedKafkaBroker embeddedKafkaBroker; + @Autowired + ObjectMapper objectMapper; + private Consumer consumer; @BeforeEach @@ -58,35 +61,25 @@ void tearDown() { @Timeout(5) void postLibraryEvent() throws InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); HttpHeaders headers = new HttpHeaders(); headers.set("content-type", MediaType.APPLICATION_JSON.toString()); - HttpEntity request = new HttpEntity<>(libraryEvent, headers); + HttpEntity request = new HttpEntity<>(libraryEventRecord, headers); //when - ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.POST, request, LibraryEvent.class); + ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.POST, request, LibraryEventRecord.class); //then assertEquals(HttpStatus.CREATED, responseEntity.getStatusCode()); - ConsumerRecords consumerRecords = KafkaTestUtils.getRecords(consumer); //Thread.sleep(3000); assert consumerRecords.count() == 1; - consumerRecords.forEach(record-> { - String expectedRecord = "{\"libraryEventId\":null,\"libraryEventType\":\"NEW\",\"book\":{\"bookId\":123,\"bookName\":\"Kafka using Spring Boot\",\"bookAuthor\":\"Dilip\"}}"; - String value = record.value(); - assertEquals(expectedRecord, value); + consumerRecords.forEach(record -> { + var libraryEventActual = TestUtil.parseLibraryEventRecord(objectMapper, record.value()); + assertEquals(libraryEventRecord, libraryEventActual); + }); @@ -96,23 +89,14 @@ void postLibraryEvent() throws InterruptedException { @Timeout(5) void putLibraryEvent() throws InterruptedException { //given - Book book = Book.builder() - .bookId(456) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(123) - .book(book) - .build(); + var libraryEventUpdate = TestUtil.libraryEventRecordUpdate(); HttpHeaders headers = new HttpHeaders(); headers.set("content-type", MediaType.APPLICATION_JSON.toString()); - HttpEntity request = new HttpEntity<>(libraryEvent, headers); + HttpEntity request = new HttpEntity<>(libraryEventUpdate, headers); //when - ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.PUT, request, LibraryEvent.class); + ResponseEntity responseEntity = restTemplate.exchange("/v1/libraryevent", HttpMethod.PUT, request, LibraryEventRecord.class); //then assertEquals(HttpStatus.OK, responseEntity.getStatusCode()); @@ -121,11 +105,10 @@ void putLibraryEvent() throws InterruptedException { ConsumerRecords consumerRecords = KafkaTestUtils.getRecords(consumer); //Thread.sleep(3000); assert consumerRecords.count() == 2; - consumerRecords.forEach(record-> { - if(record.key()!=null){ - String expectedRecord = "{\"libraryEventId\":123,\"libraryEventType\":\"UPDATE\",\"book\":{\"bookId\":456,\"bookName\":\"Kafka using Spring Boot\",\"bookAuthor\":\"Dilip\"}}"; - String value = record.value(); - assertEquals(expectedRecord, value); + consumerRecords.forEach(record -> { + if (record.key() != null) { + var libraryEventActual = TestUtil.parseLibraryEventRecord(objectMapper, record.value()); + assertEquals(libraryEventUpdate, libraryEventActual); } }); diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java b/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java index 3cb0a26..4390d92 100644 --- a/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java +++ b/library-events-producer/src/test/java/unit/com/learnkafka/controller/LibraryEventControllerUnitTest.java @@ -1,10 +1,9 @@ package com.learnkafka.controller; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; import com.learnkafka.producer.LibraryEventProducer; +import com.learnkafka.util.TestUtil; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; @@ -13,9 +12,7 @@ import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MockMvc; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.when; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; @@ -37,19 +34,11 @@ public class LibraryEventControllerUnitTest { @Test void postLibraryEvent() throws Exception { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + String json = objectMapper.writeValueAsString(libraryEventRecord); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform(post("/v1/libraryevent") @@ -63,21 +52,12 @@ void postLibraryEvent() throws Exception { void postLibraryEvent_4xx() throws Exception { //given - Book book = Book.builder() - .bookId(null) - .bookAuthor(null) - .bookName("Kafka using Spring Boot") - .build(); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecordWithInvalidBook(); - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + String json = objectMapper.writeValueAsString(libraryEventRecord); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect - String expectedErrorMessage = "book.bookAuthor - must not be blank, book.bookId - must not be null"; + String expectedErrorMessage = "book.bookId - must not be null, book.bookName - must not be blank"; mockMvc.perform(post("/v1/libraryevent") .content(json) .contentType(MediaType.APPLICATION_JSON)) @@ -90,18 +70,10 @@ void postLibraryEvent_4xx() throws Exception { void updateLibraryEvent() throws Exception { //given - Book book = new Book().builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka Using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(123) - .book(book) - .build(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + + + String json = objectMapper.writeValueAsString(TestUtil.libraryEventRecordUpdate()); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform( @@ -116,18 +88,9 @@ void updateLibraryEvent() throws Exception { void updateLibraryEvent_withNullLibraryEventId() throws Exception { //given - Book book = new Book().builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka Using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - String json = objectMapper.writeValueAsString(libraryEvent); - when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEvent.class))).thenReturn(null); + + String json = objectMapper.writeValueAsString(TestUtil.libraryEventRecordUpdateWithNullLibraryEventId()); + when(libraryEventProducer.sendLibraryEvent_Approach2(isA(LibraryEventRecord.class))).thenReturn(null); //expect mockMvc.perform( diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java index e38c47f..18ea71d 100644 --- a/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java +++ b/library-events-producer/src/test/java/unit/com/learnkafka/producer/LibraryEventProducerUnitTest.java @@ -2,12 +2,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.learnkafka.domain.Book; -import com.learnkafka.domain.LibraryEvent; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.util.TestUtil; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.Field; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -16,10 +15,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.SettableListenableFuture; -import scala.Int; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -41,55 +38,34 @@ public class LibraryEventProducerUnitTest { @Test void sendLibraryEvent_Approach2_failure() throws JsonProcessingException, ExecutionException, InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - SettableListenableFuture future = new SettableListenableFuture(); - - future.setException(new RuntimeException("Exception Calling Kafka")); - when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(future); + + when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(CompletableFuture.supplyAsync(()-> new RuntimeException("Exception Calling Kafka"))); //when - assertThrows(Exception.class, ()->eventProducer.sendLibraryEvent_Approach2(libraryEvent).get()); + assertThrows(Exception.class, ()->eventProducer.sendLibraryEvent_Approach2(TestUtil.libraryEventRecord()).get()); } @Test void sendLibraryEvent_Approach2_success() throws JsonProcessingException, ExecutionException, InterruptedException { //given - Book book = Book.builder() - .bookId(123) - .bookAuthor("Dilip") - .bookName("Kafka using Spring Boot") - .build(); - - LibraryEvent libraryEvent = LibraryEvent.builder() - .libraryEventId(null) - .book(book) - .build(); - String record = objectMapper.writeValueAsString(libraryEvent); - SettableListenableFuture future = new SettableListenableFuture(); - - ProducerRecord producerRecord = new ProducerRecord("library-events", libraryEvent.getLibraryEventId(),record ); + LibraryEventRecord libraryEventRecord = TestUtil.libraryEventRecord(); + String record = objectMapper.writeValueAsString(libraryEventRecord); + + + ProducerRecord producerRecord = new ProducerRecord("library-events", libraryEventRecord.libraryEventId(),record ); RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition("library-events", 1), 1,1,System.currentTimeMillis(), 1, 2); SendResult sendResult = new SendResult(producerRecord,recordMetadata); - future.set(sendResult); + var future = CompletableFuture.supplyAsync(()-> sendResult); when(kafkaTemplate.send(isA(ProducerRecord.class))).thenReturn(future); //when - ListenableFuture> listenableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEvent); + var completableFuture = eventProducer.sendLibraryEvent_Approach2(libraryEventRecord); //then - SendResult sendResult1 = listenableFuture.get(); + SendResult sendResult1 = completableFuture.get(); assert sendResult1.getRecordMetadata().partition()==1; } diff --git a/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java b/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java new file mode 100644 index 0000000..d35a4cd --- /dev/null +++ b/library-events-producer/src/test/java/unit/com/learnkafka/util/TestUtil.java @@ -0,0 +1,62 @@ +package com.learnkafka.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.learnkafka.domain.BookRecord; +import com.learnkafka.domain.LibraryEventRecord; +import com.learnkafka.domain.LibraryEventType; + +public class TestUtil { + + public static BookRecord bookRecord(){ + + return new BookRecord(123, "Dilip","Kafka Using Spring Boot" ); + } + + public static BookRecord bookRecordWithInvalidValues(){ + + return new BookRecord(null, "","Kafka Using Spring Boot" ); + } + + public static LibraryEventRecord libraryEventRecord(){ + + return + new LibraryEventRecord(null, + LibraryEventType.NEW, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordUpdate(){ + + return + new LibraryEventRecord(123, + LibraryEventType.UPDATE, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordUpdateWithNullLibraryEventId(){ + + return + new LibraryEventRecord(null, + LibraryEventType.UPDATE, + bookRecord()); + } + + public static LibraryEventRecord libraryEventRecordWithInvalidBook(){ + + return + new LibraryEventRecord(null, + LibraryEventType.NEW, + bookRecordWithInvalidValues()); + } + + public static LibraryEventRecord parseLibraryEventRecord(ObjectMapper objectMapper , String json){ + + try { + return objectMapper.readValue(json, LibraryEventRecord.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } +}