Skip to content

Commit

Permalink
Use Apache Kafka ConfigDef
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 13, 2025
1 parent ad54b62 commit 444e161
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 15 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ implementation group: 'com.bakdata.kafka', name: 'large-message-serde', version:

For other build tools or versions, refer to the [latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/large-message-serde/latest).

Make sure to also add [Confluent Maven Repository](http://packages.confluent.io/maven/) to your build file.

#### Usage

You can use it from your Kafka Streams application like any other Serde
Expand Down Expand Up @@ -228,8 +226,8 @@ We also provide a method for cleaning up all files on the blob storage associate

```java
final Map<String, Object> properties = ...;
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
final LargeMessageStoringClient storer = config.getStorer()
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(this.properties);
final LargeMessageStoringClient storer = this.config.getStorer()
storer.deleteAllFiles("topic");
```

Expand Down
1 change: 0 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ allprojects {

repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
}
}

Expand Down
1 change: 0 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ org.gradle.caching=true
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx2048m
kafkaVersion=3.8.1
confluentVersion=7.8.0
junitVersion=5.11.4
log4jVersion=2.24.3
assertJVersion=3.27.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.config.ConfigDef;
import java.util.Map;
import java.util.stream.Stream;
import lombok.Builder;
Expand Down Expand Up @@ -110,8 +109,7 @@ private LargeMessageStoringClient createStorer(final Map<String, Object> basePro

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.azure.core.util.BinaryData;
import com.azure.storage.blob.BlobContainerClient;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.config.ConfigDef;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -75,8 +74,7 @@ private Map<String, Object> createProperties() {

private LargeMessageRetrievingClient createRetriever() {
final Map<String, Object> properties = this.createProperties();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static com.bakdata.kafka.LargeMessageRetrievingClientTest.serializeUri;
import static org.assertj.core.api.Assertions.assertThat;

import io.confluent.common.config.ConfigDef;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -62,8 +61,7 @@ void shouldReadBackedText() {

private LargeMessageRetrievingClient createRetriever() {
final Map<String, String> properties = this.getLargeMessageConfig();
final ConfigDef configDef = AbstractLargeMessageConfig.baseConfigDef();
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(configDef, properties);
final AbstractLargeMessageConfig config = new AbstractLargeMessageConfig(properties);
return config.getRetriever();
}

Expand Down

0 comments on commit 444e161

Please sign in to comment.