Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue Schema Registry schema replication converter #352

Open
wants to merge 49 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a724b16
Created MM2 Converter and Added Unit Tests
Jul 26, 2023
f26a158
Modified Converter Based on First CR Feedback
Jul 26, 2023
7c365e2
Modified Converter Based on First CR Feedback
Jul 26, 2023
0ef7d03
Added TODOs
Aug 18, 2023
b562512
Merge pull request #283 from JC-CJ2222/new_cross-region-replication-m…
YangSan0622 Sep 11, 2023
e4c45b3
Created MM2 Converter and Added Unit Tests
Jul 26, 2023
f095ead
Modified Converter Based on First CR Feedback
Jul 26, 2023
2f30def
Modified Converter Based on First CR Feedback
Jul 26, 2023
03f9328
Added TODOs
Aug 18, 2023
f5f0788
Change version in pom
Jun 12, 2024
07afed0
Added 2nd kafka and ZK in docker compose service file
Jun 13, 2024
0c93896
Added mm2 connector files for standalone run
Jun 13, 2024
d86bfcd
Added code for scenario where message doesn't have schema in it.
Jun 14, 2024
fc55cb6
Merge remote-tracking branch 'upstream/schema-replication-converter' …
Jun 14, 2024
e4d4003
Merge code
Jun 14, 2024
a23c8a9
Updated mm2 configs
Jun 14, 2024
13db222
Unit test to show when data doesn't have schema, it still gets replic…
Jun 14, 2024
925ebdd
Fixed some issues in the compose file and added mm2
Jun 15, 2024
66a847d
Dockerfile and mm2 config for mm2 container
Jun 15, 2024
0aa94f1
Added integration tests for schema replication
Jun 18, 2024
f296fd0
Added integration tests for schema replication
Jun 18, 2024
3bd91a0
loading aws credentials from host to docker
Jun 18, 2024
569d4b3
Configuring default aws profile
Jun 18, 2024
6f2bcc4
Renamed class to AWSGlueCrossRegionSchemaReplicationConverter
Jul 3, 2024
8c84686
Increased the sleep time
Jul 3, 2024
110b792
Source schema compatibility mode is persisted in target schema
Jul 8, 2024
dc9c91e
MM2 will pickup topics prefixed with SchemaReplicationTests only
Jul 9, 2024
f0c0210
mongo JAR file deleted as its created during integration tests
Jul 9, 2024
d4211a4
TESTS: Fixed Tests for SchemaReplicationConverter after adding _V2 ve…
Jul 9, 2024
efc21ea
TESTS: Added tests for Serializer-Deserializer module after adding _V…
Jul 9, 2024
837fc97
TESTS: Added tests for schema-registry-common module after adding _V2…
Jul 9, 2024
d68448a
Removed TODO comment for issue #294 as its already working as expected
Jul 9, 2024
c4d2a40
Issue Fix: Order of schema versions in the source are honoured in the…
Jul 11, 2024
8cec19e
Added tests for GlueSchemaRegistryConfiguration and AWSGlueCrossRegio…
Jul 11, 2024
5a492dd
Added tests for SchemaByDefinitionFetcher changes
Jul 11, 2024
d948e2f
Added tests for AWSSchemaRegistryClient changes
Jul 12, 2024
e1c515c
Added an integration test with the schema replication converter and f…
Jul 12, 2024
7698435
Deleted wget-log as its unnecessary
Jul 12, 2024
6b7f1aa
Code Cleanup: Moved the logic of createSchemaAndRegisterAllSchemaVers…
Aug 20, 2024
e0e46ed
Code cleanup and added cache for schema replication converter for ini…
Aug 21, 2024
c2841f7
Code Cleanup: Moved schema replication related GSR configurations to …
Aug 21, 2024
4e80dd6
Code Cleanup: Moved schema replication related GSR configurations to …
Aug 21, 2024
49c04c8
Code Cleanup: Moved schema replication related constants to the conve…
Aug 21, 2024
91b3757
Code refactorings to improve performance of schema replication
Sep 6, 2024
b1a81f7
Added unit test for schema replication and removed schema-replication…
Dec 16, 2024
1adc2b3
Removed redundant code
Dec 16, 2024
cd4a900
Code Refactorings to cleanup AWSSchemaRegistryClient
Dec 17, 2024
8a335aa
mockito library updated
Dec 17, 2024
3928403
Fixed checkstyle issues
Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
**/*.project
**/*.classpath
**/*.factorypath
**/dependency-reduced-pom.xml
**/dependency-reduced-pom.xml
/integration-tests/*

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import software.amazon.awssdk.services.glue.model.Compatibility;

/**
* Encapsulates general inputs for serializer
Expand All @@ -38,8 +39,11 @@ public class AWSSerializerInput {
@Getter
private String transportName;

@Getter
private Compatibility compatibility;

@Builder
public AWSSerializerInput(String schemaDefinition, String schemaName, String dataFormat, String transportName) {
public AWSSerializerInput(String schemaDefinition, String schemaName, String dataFormat, String transportName, Compatibility compatibility) {
this.schemaDefinition = schemaDefinition;

if (transportName != null) {
Expand All @@ -55,5 +59,11 @@ public AWSSerializerInput(String schemaDefinition, String schemaName, String dat
}

this.dataFormat = dataFormat;

if (compatibility != null) {
this.compatibility = compatibility;
} else {
this.compatibility = Compatibility.BACKWARD;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import software.amazon.awssdk.services.glue.model.Compatibility;

import java.util.Map;
import java.util.UUID;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above on imports.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restored back to original with no changes

import java.util.concurrent.TimeUnit;

/**
Expand All @@ -29,6 +29,10 @@ public class SchemaByDefinitionFetcher {
@VisibleForTesting
protected final LoadingCache<Schema, UUID> schemaDefinitionToVersionCache;

@NonNull
@VisibleForTesting
protected final LoadingCache<SchemaV2, UUID> schemaDefinitionToVersionCacheV2;

public SchemaByDefinitionFetcher(
final AWSSchemaRegistryClient awsSchemaRegistryClient,
final GlueSchemaRegistryConfiguration glueSchemaRegistryConfiguration) {
Expand All @@ -39,6 +43,11 @@ public SchemaByDefinitionFetcher(
.maximumSize(glueSchemaRegistryConfiguration.getCacheSize())
.refreshAfterWrite(glueSchemaRegistryConfiguration.getTimeToLiveMillis(), TimeUnit.MILLISECONDS)
.build(new SchemaDefinitionToVersionCache());

this.schemaDefinitionToVersionCacheV2 = CacheBuilder.newBuilder()
.maximumSize(glueSchemaRegistryConfiguration.getCacheSize())
.refreshAfterWrite(glueSchemaRegistryConfiguration.getTimeToLiveMillis(), TimeUnit.MILLISECONDS)
.build(new SchemaDefinitionToVersionCacheV2());
}

/**
Expand Down Expand Up @@ -103,6 +112,80 @@ public UUID getORRegisterSchemaVersionId(
return schemaVersionId;
}

/**
* Get Schema Version ID by following below steps :
* <p>
* 1) If schema version id exists in registry then get it from registry
* 2) If schema version id does not exist in registry
* then if auto registration is enabled
* then if schema exists but version doesn't exist
* then
* 2.1) Register schema version
* else if schema does not exist
* then
* 2.2) create schema and register schema version
*
* @param schemaDefinition Schema Definition
* @param schemaName Schema Name
* @param dataFormat Data Format
* @param metadata metadata for schema version
* @return Schema Version ID
* @throws AWSSchemaRegistryException on any error while fetching the schema version ID
*/
@SneakyThrows
public UUID getORRegisterSchemaVersionIdV2(
@NonNull String schemaDefinition,
@NonNull String schemaName,
@NonNull String dataFormat,
@NonNull Compatibility compatibility,
@NonNull Map<String, String> metadata) throws AWSSchemaRegistryException {
UUID schemaVersionId;
Map<SchemaV2, UUID> schemaWithVersionId;
final SchemaV2 schema = new SchemaV2(schemaDefinition, dataFormat, schemaName, compatibility);

try {
return schemaDefinitionToVersionCacheV2.get(schema);
} catch (Exception ex) {
Throwable schemaRegistryException = ex.getCause();
String exceptionCauseMessage = schemaRegistryException.getCause().getMessage();

if (exceptionCauseMessage.contains(AWSSchemaRegistryConstants.SCHEMA_VERSION_NOT_FOUND_MSG)) {
if (!glueSchemaRegistryConfiguration.isSchemaAutoRegistrationEnabled()) {
throw new AWSSchemaRegistryException(AWSSchemaRegistryConstants.AUTO_REGISTRATION_IS_DISABLED_MSG,
schemaRegistryException);
}
schemaVersionId =
awsSchemaRegistryClient.registerSchemaVersion(schemaDefinition, schemaName, dataFormat, metadata);
schemaDefinitionToVersionCacheV2.put(schema, schemaVersionId);
} else if (exceptionCauseMessage.contains(AWSSchemaRegistryConstants.SCHEMA_NOT_FOUND_MSG)) {
if (!glueSchemaRegistryConfiguration.isSchemaAutoRegistrationEnabled()) {
throw new AWSSchemaRegistryException(AWSSchemaRegistryConstants.AUTO_REGISTRATION_IS_DISABLED_MSG,
schemaRegistryException);
}

//When schema is not created in the target, create the schema in target and
// register all the existing schema version from the source schema to the target in the same order.
schemaWithVersionId =
awsSchemaRegistryClient.createSchemaV2(schemaName, dataFormat, schemaDefinition, compatibility, metadata);

//Cache all the schema versions for a Glue Schema Registry schema
schemaWithVersionId.entrySet()
.stream()
.forEach(item -> {
schemaDefinitionToVersionCacheV2.put(item.getKey(), item.getValue());
});
} else {
String msg =
String.format(
"Exception occurred while fetching or registering schema definition = %s, schema name = %s ",
schemaDefinition, schemaName);
throw new AWSSchemaRegistryException(msg, schemaRegistryException);
}
}
schemaVersionId = schemaDefinitionToVersionCacheV2.get(schema);
return schemaVersionId;
}

@RequiredArgsConstructor
private class SchemaDefinitionToVersionCache extends CacheLoader<Schema, UUID> {
@Override
Expand All @@ -111,4 +194,13 @@ public UUID load(Schema schema) {
schema.getSchemaDefinition(), schema.getSchemaName(), schema.getDataFormat());
}
}

@RequiredArgsConstructor
private class SchemaDefinitionToVersionCacheV2 extends CacheLoader<SchemaV2, UUID> {
@Override
public UUID load(SchemaV2 schema) {
return awsSchemaRegistryClient.getSchemaVersionIdByDefinition(
schema.getSchemaDefinition(), schema.getSchemaName(), schema.getDataFormat());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazonaws.services.schemaregistry.common;

import lombok.AllArgsConstructor;
import lombok.Value;
import software.amazon.awssdk.services.glue.model.Compatibility;

/**
* Schema entity represents a schema and it's properties stored in Glue Schema Registry.
*/
@AllArgsConstructor
@Value
public class SchemaV2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we re-use the existing Schema class?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Not needed with the new code.

/**
* Schema Definition contains the string representation of schema version stored during registration.
*/
private String schemaDefinition;

/**
* Data Format represents the string notation of data format used during registration of schea. Ex: Avro, JSON etc.
*/
private String dataFormat;

/**
* Schema Name represents name of the schema under which the schema version was registered.
*/
private String schemaName;

/**
* Compatibility mode refers to the compatibility settings of the schema.
*/
private Compatibility compatibilityMode;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,20 @@ public class GlueSchemaRegistryConfiguration {
private AWSSchemaRegistryConstants.COMPRESSION compressionType = AWSSchemaRegistryConstants.COMPRESSION.NONE;
private String endPoint;
private String region;
// TODO: Remove configs that are not useful non replication use-cases
// https://github.com/awslabs/aws-glue-schema-registry/issues/292
private String sourceEndPoint;
private String sourceRegion;
private String targetEndPoint;
private String targetRegion;
private long timeToLiveMillis = 24 * 60 * 60 * 1000L;
private int cacheSize = 200;
private AvroRecordType avroRecordType;
private ProtobufMessageType protobufMessageType;
private String registryName;
private String sourceRegistryName;
private String targetRegistryName;
private int replicateSchemaVersionCount;
private Compatibility compatibilitySetting;
private String description;
private boolean schemaAutoRegistrationEnabled = false;
Expand Down Expand Up @@ -89,8 +98,15 @@ private void buildConfigs(Map<String, ?> configs) {

private void buildSchemaRegistryConfigs(Map<String, ?> configs) {
validateAndSetAWSRegion(configs);
validateAndSetAWSSourceRegion(configs);
validateAndSetAWSTargetRegion(configs);
validateAndSetAWSEndpoint(configs);
validateAndSetAWSSourceEndpoint(configs);
validateAndSetAWSTargetEndpoint(configs);
validateAndSetRegistryName(configs);
validateAndSetSourceRegistryName(configs);
validateAndSetTargetRegistryName(configs);
validateAndSetReplicateSchemaVersionCount(configs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit these configurations to just the connector as they don't concern the core GSR library?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Moved this to converter module

validateAndSetDescription(configs);
validateAndSetAvroRecordType(configs);
validateAndSetProtobufMessageType(configs);
Expand Down Expand Up @@ -164,6 +180,20 @@ private void validateAndSetAWSRegion(Map<String, ?> configs) {
}
}

private void validateAndSetAWSSourceRegion(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_SOURCE_REGION)) {
this.sourceRegion = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_SOURCE_REGION));
}
}

private void validateAndSetAWSTargetRegion(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_TARGET_REGION)) {
this.targetRegion = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_TARGET_REGION));
} else {
this.targetRegion = this.region;
}
}

private void validateAndSetCompatibility(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.COMPATIBILITY_SETTING)) {
this.compatibilitySetting = Compatibility.fromValue(
Expand All @@ -190,21 +220,57 @@ private void validateAndSetRegistryName(Map<String, ?> configs) {
}
}

private void validateAndSetSourceRegistryName(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.SOURCE_REGISTRY_NAME)) {
this.sourceRegistryName = String.valueOf(configs.get(AWSSchemaRegistryConstants.SOURCE_REGISTRY_NAME));
}
}

private void validateAndSetTargetRegistryName(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.TARGET_REGISTRY_NAME)) {
this.targetRegistryName = String.valueOf(configs.get(AWSSchemaRegistryConstants.TARGET_REGISTRY_NAME));
} else {
this.targetRegistryName = this.registryName;
}
}

private void validateAndSetReplicateSchemaVersionCount(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.REPLICATE_SCHEMA_VERSION_COUNT)) {
this.replicateSchemaVersionCount = Integer.valueOf(configs.get(AWSSchemaRegistryConstants.REPLICATE_SCHEMA_VERSION_COUNT).toString());
} else {
this.replicateSchemaVersionCount = AWSSchemaRegistryConstants.DEFAULT_REPLICATE_SCHEMA_VERSION_COUNT;
}
}

private void validateAndSetAWSEndpoint(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_ENDPOINT)) {
this.endPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_ENDPOINT));
}
}

private void validateAndSetAWSSourceEndpoint(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_SOURCE_ENDPOINT)) {
this.sourceEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_SOURCE_ENDPOINT));
}
}

private void validateAndSetAWSTargetEndpoint(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_TARGET_ENDPOINT)) {
this.targetEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_TARGET_ENDPOINT));
} else {
this.targetEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_ENDPOINT));
}
}

private void validateAndSetProxyUrl(Map<String, ?> configs) {
if (isPresent(configs, AWSSchemaRegistryConstants.PROXY_URL)) {
String value = (String) configs.get(AWSSchemaRegistryConstants.PROXY_URL);
try {
this.proxyUrl = URI.create(value);
} catch (IllegalArgumentException e) {
String message = String.format("Proxy URL property is not a valid URL: %s", value);
throw new AWSSchemaRegistryException(message, e);
}
String value = (String) configs.get(AWSSchemaRegistryConstants.PROXY_URL);
try {
this.proxyUrl = URI.create(value);
} catch (IllegalArgumentException e) {
String message = String.format("Proxy URL property is not a valid URL: %s", value);
throw new AWSSchemaRegistryException(message, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amazonaws.services.schemaregistry.utils;

import software.amazon.awssdk.services.glue.endpoints.internal.Value;
import software.amazon.awssdk.services.glue.model.Compatibility;

public final class AWSSchemaRegistryConstants {
Expand All @@ -30,6 +31,30 @@ public final class AWSSchemaRegistryConstants {
* AWS region to use while initializing the client for service.
*/
public static final String AWS_REGION = "region";
/**
* AWS source endpoint to use while initializing the client for service.
*/
public static final String AWS_SOURCE_ENDPOINT = "source.endpoint";
/**
* AWS source region to use while initializing the client for service.
*/
public static final String AWS_SOURCE_REGION = "source.region";
/**
* AWS target endpoint to use while initializing the client for service.
*/
public static final String AWS_TARGET_ENDPOINT = "target.endpoint";
/**
* AWS target region to use while initializing the client for service.
*/
public static final String AWS_TARGET_REGION = "target.region";
/**
* Number of schema versions to replicate from source to target
*/
public static final String REPLICATE_SCHEMA_VERSION_COUNT = "replicateSchemaVersionCount";
/**
* Default number of schema versions to replicate from source to target
*/
public static final Integer DEFAULT_REPLICATE_SCHEMA_VERSION_COUNT = 10;
/**
* Header Version Byte.
*/
Expand Down Expand Up @@ -104,6 +129,14 @@ public final class AWSSchemaRegistryConstants {
* Default registry name if not passed by the client.
*/
public static final String DEFAULT_REGISTRY_NAME = "default-registry";
/**
* Source Registry Name.
*/
public static final String SOURCE_REGISTRY_NAME = "source.registry.name";
/**
* Target Registry Name.
*/
public static final String TARGET_REGISTRY_NAME = "target.registry.name";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Converter specific code must be in converter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Moved this to converter module

/**
* Compatibility setting, will be helpful at the time of schema evolution.
*/
Expand Down
Loading
Loading