Skip to content

Commit

Permalink
update protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 11, 2024
1 parent 08d8307 commit ff7c187
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 52 deletions.
5 changes: 4 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
confluentVersion=7.6.1
kafkaVersion=3.6.1
lombokVersion=1.18.26
lombokVersion=1.18.26
junitVersion=5.10.2
grpcVersion=1.65.0
protoVersion=4.27.2
2 changes: 1 addition & 1 deletion kafka-avro-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation project(':kafka-avro')
implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
Expand Down
2 changes: 1 addition & 1 deletion kafka-avro-union-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation project(':kafka-avro')
implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
Expand Down
4 changes: 2 additions & 2 deletions kafka-avro/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
implementation 'org.apache.avro:avro:1.11.+'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
implementation 'org.apache.avro:avro:1.11.3'
}

test {
Expand Down
2 changes: 1 addition & 1 deletion kafka-json-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation "io.confluent:kafka-json-serializer:${confluentVersion}"
Expand Down
2 changes: 1 addition & 1 deletion kafka-ksqldb-extensions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ java {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation("io.confluent.ksql:ksqldb-udf:${confluentVersion}") {
exclude group: 'io.confluent.observability', module: 'telemetry-client'
Expand Down
2 changes: 1 addition & 1 deletion kafka-native-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repositories {
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"

Expand Down
42 changes: 42 additions & 0 deletions kafka-protobuf/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
plugins {
id 'java'
id 'java-library'
id 'com.google.protobuf' version '0.9.4'
}

repositories {
mavenCentral()
}

dependencies {
api "io.grpc:grpc-protobuf:${grpcVersion}"
api "io.grpc:grpc-stub:${grpcVersion}"
implementation "com.google.protobuf:protobuf-java:${protoVersion}"
compileOnly 'org.apache.tomcat:annotations-api:6.0.53'

testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
}

test {
useJUnitPlatform()
}

protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protoVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
generateProtoTasks {
all()*.plugins { grpc {} }
}
}

sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ syntax = "proto3";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";
option java_outer_classname = "SuppliersProto";

package suppliers;

service CounterService {
service CountService {
rpc GetCountByCountry (CountRequest) returns (CountReply) {}
}

Expand Down
32 changes: 3 additions & 29 deletions kafka-streams/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
plugins {
id 'java'
id 'application'
id 'com.google.protobuf' version '0.9.2'
}

repositories {
Expand All @@ -11,18 +10,13 @@ repositories {
}
}

String grpcVersion = '1.54.0'
String protoVersion = '3.22.2'

dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation project(':kafka-protobuf')
runtimeOnly "io.grpc:grpc-netty:${grpcVersion}"

implementation project(':kafka-avro')
implementation "org.apache.kafka:kafka-streams:${kafkaVersion}"
implementation "io.confluent:kafka-streams-avro-serde:${confluentVersion}"
compileOnly 'org.apache.tomcat:annotations-api:6.0.53'
runtimeOnly "io.grpc:grpc-netty-shaded:${grpcVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'ch.qos.logback:logback-classic:1.5.6'
Expand All @@ -32,18 +26,7 @@ dependencies {

testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.2'
testImplementation "io.grpc:grpc-testing:${grpcVersion}"
}

protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protoVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
generateProtoTasks {
all()*.plugins { grpc {} }
}
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"
}

application {
Expand All @@ -53,12 +36,3 @@ application {
test {
useJUnitPlatform()
}

sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
8 changes: 4 additions & 4 deletions kafka-streams/src/main/java/kafka/sandbox/cli/Count.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import io.grpc.ManagedChannel;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
import kafka.sandbox.proto.CounterServiceGrpc;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceBlockingStub;
import kafka.sandbox.proto.CountServiceGrpc;
import kafka.sandbox.proto.CountServiceGrpc.CountServiceBlockingStub;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;
Expand All @@ -22,11 +22,11 @@ public class Count implements Callable<Integer> {
private String country;

@Override
public Integer call() throws Exception {
public Integer call() {
ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
.build();

CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
CountServiceBlockingStub blockingStub = CountServiceGrpc.newBlockingStub(channel);
CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
System.out.println(countByCountry.getMessage());

Expand Down
6 changes: 3 additions & 3 deletions kafka-streams/src/main/java/kafka/sandbox/cli/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import kafka.sandbox.avro.Supplier;
import kafka.sandbox.grpc.CounterService;
import kafka.sandbox.grpc.CountService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -52,7 +52,7 @@ public Integer call() throws Exception {
// aggregate the new supplier counts by country
KTable<String, Long> aggregated = suppliers
// map the country as key
.map((key, value) -> new KeyValue<>(value.getCountry().toString(), value))
.map((key, value) -> new KeyValue<>(value.getCountry(), value))
.groupByKey()
// aggregate and materialize the store
.count(Materialized.as("SupplierCountByCountry"));
Expand All @@ -73,7 +73,7 @@ public Integer call() throws Exception {

// GRPC Server
Server server = Grpc.newServerBuilderForPort(5050, InsecureServerCredentials.create())
.addService(new CounterService(streams))
.addService(new CountService(streams))
.build();

// attach shutdown handler to catch control-c and creating a latch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
import io.grpc.stub.StreamObserver;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceImplBase;
import kafka.sandbox.proto.CountServiceGrpc.CountServiceImplBase;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

public class CounterService extends CounterServiceImplBase {
public class CountService extends CountServiceImplBase {

private KafkaStreams streams;
private final KafkaStreams streams;

public CounterService(KafkaStreams streams) {
public CountService(KafkaStreams streams) {
this.streams = streams;
}

Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
rootProject.name = "kafka-sandbox"
include("kafka-avro")
include("kafka-protobuf")
include("kafka-avro-clients")
include("kafka-avro-union-clients")
include("kafka-json-clients")
Expand Down

0 comments on commit ff7c187

Please sign in to comment.