Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 8, 2024
1 parent a3078c3 commit ac2ccbc
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* </li>
* </ul>
*
* @param endpointConfig endpoint to run app on
* @return Kafka configuration
*/
public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpointConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ private static Map<String, Object> createKafkaProperties(final KafkaEndpointConf
* </li>
* </ul>
*
* @param endpointConfig endpoint to run app on
* @return Kafka configuration
*/
public Map<String, Object> getKafkaProperties(final KafkaEndpointConfig endpointConfig) {
Expand Down Expand Up @@ -174,11 +175,11 @@ public void close() {
}

private void setupApp(final Map<String, Object> kafkaProperties) {
final StreamsAppSetupConfiguration configuration = StreamsAppSetupConfiguration.builder()
final StreamsAppSetupConfiguration setupConfiguration = StreamsAppSetupConfiguration.builder()
.kafkaProperties(kafkaProperties)
.topics(this.getTopics())
.build();
this.app.setup(configuration);
this.app.setup(setupConfiguration);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ default void deleted(final String topic) {
*/
@FunctionalInterface
interface TopicHookFactory {
/**
* Create a new {@code TopicHook}
* @param kafkaConfig Kafka configuration for creating {@code TopicHook}
* @return {@code TopicHook}
*/
TopicHook create(Map<String, Object> kafkaConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import lombok.NonNull;
import lombok.Value;

/**
* Configuration for setting up a {@link ProducerApp}
* @see ProducerApp#setup(ProducerAppSetupConfiguration)
*/
@Builder
@Value
public class ProducerAppSetupConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import lombok.NonNull;
import lombok.Value;

/**
* Configuration for setting up a {@link StreamsApp}
* @see StreamsApp#setup(StreamsAppSetupConfiguration)
*/
@Builder
@Value
public class StreamsAppSetupConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,21 @@ public final class ImprovedAdminClient implements Closeable {
private final SchemaRegistryClient schemaRegistryClient;
private final @NonNull Duration timeout;

/**
* Create a new admin client with default timeout
* @param properties Kafka configuration
* @return admin client
*/
public static ImprovedAdminClient create(@NonNull final Map<String, Object> properties) {
return create(properties, ADMIN_TIMEOUT);
}

/**
* Create a new admin client
* @param properties Kafka configuration
* @param timeout timeout when performing admin operations
* @return admin client
*/
public static ImprovedAdminClient create(@NonNull final Map<String, Object> properties,
@NonNull final Duration timeout) {
Preconditions.checkNotNull(properties.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Test;

public class StreamsExecutionOptionsTest {
class StreamsExecutionOptionsTest {

@Test
void shouldLeaveGroup() {
Expand Down

0 comments on commit ac2ccbc

Please sign in to comment.