Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for predefined consumers of CDC topics #119

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
*/
boolean initialScan() default false;

/**
* Initial consumers of the changefeed
*/
Consumer[] consumers() default {};

enum Mode {
/**
* Only the key component of the modified row
Expand Down Expand Up @@ -71,4 +76,22 @@ enum Mode {
enum Format {
JSON
}

@interface Consumer {
String name();

Codec[] codecs() default {};

String readFrom() default "1970-01-01T00:00:00Z";

boolean important() default false;

enum Codec {
RAW,
GZIP,
LZOP,
ZSTD,
CUSTOM
}
}
}
31 changes: 30 additions & 1 deletion databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -245,13 +247,23 @@ private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schem
var retentionPeriod = Duration.parse(changefeed.retentionPeriod());
Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()),
"RetentionPeriod value defined for %s must be positive", getType());
List<Changefeed.Consumer> consumers = Arrays.stream(changefeed.consumers())
.map(consumer -> new Changefeed.Consumer(
consumer.name(),
List.of(consumer.codecs()),
Instant.parse(consumer.readFrom()),
consumer.important()
))
.toList();

return new Changefeed(
changefeed.name(),
changefeed.mode(),
changefeed.format(),
changefeed.virtualTimestamps(),
retentionPeriod,
changefeed.initialScan()
changefeed.initialScan(),
consumers
);
}

Expand Down Expand Up @@ -813,5 +825,22 @@ public static class Changefeed {
Duration retentionPeriod;

boolean initialScan;

@NonNull
List<Consumer> consumers;

@Value
public static class Consumer {
@NonNull
String name;

@NonNull
List<tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec> codecs;

@NonNull
Instant readFrom;

boolean important;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -25,6 +27,7 @@ public void testChangefeedDefaultsEntity() {
assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod()).isEqualTo(Duration.ofHours(24));
assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestamps()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).isInitialScan()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).getConsumers()).isEmpty();
}

@Test
Expand All @@ -37,6 +40,38 @@ public void testConflictingChangefeedNameEntity() {
assertThatThrownBy(() -> schemaOf(ConflictingChangefeedNameEntity.class));
}

@Test
public void testPredefinedConsumersChangefeedEntity() {
var entitySchema = schemaOf(PredefinedConsumersChangefeedEntity.class);

Schema.Changefeed expectedChangefeed = new Schema.Changefeed(
"feed1",
Changefeed.Mode.NEW_IMAGE,
Changefeed.Format.JSON,
false,
Duration.ofHours(24),
false,
List.of(
new Schema.Changefeed.Consumer(
"consumer1",
List.of(),
Instant.EPOCH,
false
),
new Schema.Changefeed.Consumer(
"consumer2",
List.of(Changefeed.Consumer.Codec.RAW),
Instant.parse("2020-01-01T00:00:00Z"),
true
)
)
);

assertThat(entitySchema.getChangefeeds())
.singleElement()
.isEqualTo(expectedChangefeed);
}

private static <T> Schema<T> schemaOf(Class<T> entityType) {
return new TestSchema<>(entityType);
}
Expand Down Expand Up @@ -74,4 +109,19 @@ private static class ConflictingChangefeedNameEntity {
int field1;
int field2;
}

@Value
@Changefeed(name = "feed1", consumers = {
@Changefeed.Consumer(name = "consumer1"),
@Changefeed.Consumer(
name = "consumer2",
readFrom = "2020-01-01T00:00:00Z",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
})
private static class PredefinedConsumersChangefeedEntity {
int field1;
int field2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@
format = JSON,
virtualTimestamps = true,
retentionPeriod = "PT1H",
initialScan = false /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
initialScan = false, /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
consumers = {
@Changefeed.Consumer(name = "test-consumer1"),
@Changefeed.Consumer(
name = "test-consumer2",
readFrom = "2025-01-21T08:01:25+00:00",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
}
)
@Changefeed(name = "test-changefeed2")
public class ChangefeedEntity implements Entity<ChangefeedEntity> {
Expand Down
2 changes: 2 additions & 0 deletions repository-ydb-v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ java_library(
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
],
)

Expand Down Expand Up @@ -65,5 +66,6 @@ java_test_suite(
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
],
)
10 changes: 10 additions & 0 deletions repository-ydb-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-table</artifactId>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-topic</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.yoj.repository.ydb.client;

import com.google.common.collect.Sets;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -26,6 +27,11 @@
import tech.ydb.table.settings.PartitioningSettings;
import tech.ydb.table.settings.TtlSettings;
import tech.ydb.table.values.Type;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec;
import tech.ydb.yoj.databind.schema.Schema;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.exception.CreateTableException;
Expand All @@ -38,11 +44,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static lombok.AccessLevel.PRIVATE;
import static tech.ydb.core.StatusCode.SCHEME_ERROR;
Expand All @@ -53,12 +62,14 @@ public class YdbSchemaOperations {

private final SessionManager sessionManager;
private final SchemeClient schemeClient;
private final TopicClient topicClient;
private String tablespace;

public YdbSchemaOperations(String tablespace, @NonNull SessionManager sessionManager, GrpcTransport transport) {
this.tablespace = YdbPaths.canonicalTablespace(tablespace);
this.sessionManager = sessionManager;
this.schemeClient = SchemeClient.newClient(transport).build();
this.topicClient = TopicClient.newClient(transport).build();
}

public void setTablespace(String tablespace) {
Expand All @@ -81,7 +92,7 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
columns.forEach(c -> {
ValueProtos.Type.PrimitiveTypeId yqlType = YqlPrimitiveType.of(c).getYqlType();
int yqlTypeNumber = yqlType.getNumber();
ValueProtos.Type.PrimitiveTypeId primitiveTypeId = Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
.filter(id -> id.getNumber() == yqlTypeNumber)
.findFirst()
.orElseThrow(() -> new CreateTableException(String.format("Can't create table '%s'%n"
Expand Down Expand Up @@ -149,6 +160,46 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't alter table %s: %s", name, status));
}

if (changefeed.getConsumers().isEmpty()) {
continue;
}

nvamelichev marked this conversation as resolved.
Show resolved Hide resolved
String changeFeedTopicPath = YdbPaths.join(tablespace + name, changefeed.getName());
Result<TopicDescription> result = topicClient.describeTopic(changeFeedTopicPath).join();
if (result.getStatus().getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't describe CDC topic %s: %s", changeFeedTopicPath, result.getStatus()));
}

Set<String> existingConsumerNames = result.getValue().getConsumers().stream()
.map(Consumer::getName)
.collect(toSet());

Map<String, Schema.Changefeed.Consumer> specifiedConsumers = changefeed.getConsumers().stream()
.collect(toMap(Schema.Changefeed.Consumer::getName, Function.identity()));

Set<String> addedConsumers = Sets.difference(specifiedConsumers.keySet(), existingConsumerNames);

AlterTopicSettings.Builder addConsumersRequest = AlterTopicSettings.newBuilder();
for (String addedConsumer : addedConsumers) {
Schema.Changefeed.Consumer consumer = specifiedConsumers.get(addedConsumer);
Consumer.Builder consumerConfiguration = Consumer.newBuilder()
.setName(consumer.getName())
.setImportant(consumer.isImportant())
.setReadFrom(consumer.getReadFrom());

for (Codec consumerCodec : consumer.getCodecs()) {
consumerConfiguration.addSupportedCodec(
tech.ydb.topic.description.Codec.valueOf(consumerCodec.name())
);
}

addConsumersRequest.addAddConsumer(consumerConfiguration.build());
}
status = topicClient.alterTopic(changeFeedTopicPath, addConsumersRequest.build()).join();
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't alter CDC topic %s: %s", changeFeedTopicPath, status));
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;
import tech.ydb.proto.scheme.v1.SchemeServiceGrpc;
import tech.ydb.proto.table.v1.TableServiceGrpc;
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
import tech.ydb.table.Session;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.TableClient;
Expand Down Expand Up @@ -151,6 +152,7 @@ private YdbConfig getProxyServerConfig() {
.addService(new ProxyYdbTableService(channel))
.addService(proxyDiscoveryService)
.addService(new DelegateSchemeServiceImplBase(SchemeServiceGrpc.newStub(channel)))
.addService(new DelegateTopicServiceImplBase(TopicServiceGrpc.newStub(channel)))
.build();
proxyServer.start();
Runtime.getRuntime().addShutdownHook(new Thread(proxyServer::shutdown));
Expand Down Expand Up @@ -1023,6 +1025,12 @@ private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.Sch
final SchemeServiceGrpc.SchemeServiceStub schemeServiceStub;
}

@AllArgsConstructor
private static class DelegateTopicServiceImplBase extends TopicServiceGrpc.TopicServiceImplBase {
@Delegate
final TopicServiceGrpc.TopicServiceStub topicServiceStub;
}

private static class ProxyDiscoveryService extends DiscoveryServiceGrpc.DiscoveryServiceImplBase {
@Delegate(excludes = ProxyDiscoveryService.OverriddenMethod.class)
DiscoveryServiceGrpc.DiscoveryServiceStub stub;
Expand Down
Loading