-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Glue Schema Registry schema replication converter #352
base: master
Are you sure you want to change the base?
Changes from 38 commits
a724b16
f26a158
7c365e2
0ef7d03
b562512
e4c45b3
f095ead
2f30def
03f9328
f5f0788
07afed0
0c93896
d86bfcd
fc55cb6
e4d4003
a23c8a9
13db222
925ebdd
66a847d
0aa94f1
f296fd0
3bd91a0
569d4b3
6f2bcc4
8c84686
110b792
dc9c91e
f0c0210
d4211a4
efc21ea
837fc97
d68448a
c4d2a40
8cec19e
5a492dd
d948e2f
e1c515c
7698435
6b7f1aa
e0e46ed
c2841f7
4e80dd6
49c04c8
91b3757
b1a81f7
1adc2b3
cd4a900
8a335aa
3928403
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Copyright 2020 Amazon.com, Inc. or its affiliates. | ||
* Licensed under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.amazonaws.services.schemaregistry.common; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Value; | ||
import software.amazon.awssdk.services.glue.model.Compatibility; | ||
|
||
/** | ||
* Schema entity represents a schema and it's properties stored in Glue Schema Registry. | ||
*/ | ||
@AllArgsConstructor | ||
@Value | ||
public class SchemaV2 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we re-use the existing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Not needed with the new code. |
||
/** | ||
* Schema Definition contains the string representation of schema version stored during registration. | ||
*/ | ||
private String schemaDefinition; | ||
|
||
/** | ||
* Data Format represents the string notation of data format used during registration of schea. Ex: Avro, JSON etc. | ||
*/ | ||
private String dataFormat; | ||
|
||
/** | ||
* Schema Name represents name of the schema under which the schema version was registered. | ||
*/ | ||
private String schemaName; | ||
|
||
/** | ||
* Compatibility mode refers to the compatibility settings of the schema. | ||
*/ | ||
private Compatibility compatibilityMode; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,11 +46,20 @@ public class GlueSchemaRegistryConfiguration { | |
private AWSSchemaRegistryConstants.COMPRESSION compressionType = AWSSchemaRegistryConstants.COMPRESSION.NONE; | ||
private String endPoint; | ||
private String region; | ||
// TODO: Remove configs that are not useful non replication use-cases | ||
// https://github.com/awslabs/aws-glue-schema-registry/issues/292 | ||
private String sourceEndPoint; | ||
private String sourceRegion; | ||
private String targetEndPoint; | ||
private String targetRegion; | ||
private long timeToLiveMillis = 24 * 60 * 60 * 1000L; | ||
private int cacheSize = 200; | ||
private AvroRecordType avroRecordType; | ||
private ProtobufMessageType protobufMessageType; | ||
private String registryName; | ||
private String sourceRegistryName; | ||
private String targetRegistryName; | ||
private int replicateSchemaVersionCount; | ||
private Compatibility compatibilitySetting; | ||
private String description; | ||
private boolean schemaAutoRegistrationEnabled = false; | ||
|
@@ -89,8 +98,15 @@ private void buildConfigs(Map<String, ?> configs) { | |
|
||
private void buildSchemaRegistryConfigs(Map<String, ?> configs) { | ||
validateAndSetAWSRegion(configs); | ||
validateAndSetAWSSourceRegion(configs); | ||
validateAndSetAWSTargetRegion(configs); | ||
validateAndSetAWSEndpoint(configs); | ||
validateAndSetAWSSourceEndpoint(configs); | ||
validateAndSetAWSTargetEndpoint(configs); | ||
validateAndSetRegistryName(configs); | ||
validateAndSetSourceRegistryName(configs); | ||
validateAndSetTargetRegistryName(configs); | ||
validateAndSetReplicateSchemaVersionCount(configs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we limit these configurations to just the connector as they don't concern the core GSR library? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Moved this to converter module |
||
validateAndSetDescription(configs); | ||
validateAndSetAvroRecordType(configs); | ||
validateAndSetProtobufMessageType(configs); | ||
|
@@ -164,6 +180,20 @@ private void validateAndSetAWSRegion(Map<String, ?> configs) { | |
} | ||
} | ||
|
||
private void validateAndSetAWSSourceRegion(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_SOURCE_REGION)) { | ||
this.sourceRegion = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_SOURCE_REGION)); | ||
} | ||
} | ||
|
||
private void validateAndSetAWSTargetRegion(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_TARGET_REGION)) { | ||
this.targetRegion = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_TARGET_REGION)); | ||
} else { | ||
this.targetRegion = this.region; | ||
} | ||
} | ||
|
||
private void validateAndSetCompatibility(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.COMPATIBILITY_SETTING)) { | ||
this.compatibilitySetting = Compatibility.fromValue( | ||
|
@@ -190,21 +220,57 @@ private void validateAndSetRegistryName(Map<String, ?> configs) { | |
} | ||
} | ||
|
||
private void validateAndSetSourceRegistryName(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.SOURCE_REGISTRY_NAME)) { | ||
this.sourceRegistryName = String.valueOf(configs.get(AWSSchemaRegistryConstants.SOURCE_REGISTRY_NAME)); | ||
} | ||
} | ||
|
||
private void validateAndSetTargetRegistryName(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.TARGET_REGISTRY_NAME)) { | ||
this.targetRegistryName = String.valueOf(configs.get(AWSSchemaRegistryConstants.TARGET_REGISTRY_NAME)); | ||
} else { | ||
this.targetRegistryName = this.registryName; | ||
} | ||
} | ||
|
||
private void validateAndSetReplicateSchemaVersionCount(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.REPLICATE_SCHEMA_VERSION_COUNT)) { | ||
this.replicateSchemaVersionCount = Integer.valueOf(configs.get(AWSSchemaRegistryConstants.REPLICATE_SCHEMA_VERSION_COUNT).toString()); | ||
} else { | ||
this.replicateSchemaVersionCount = AWSSchemaRegistryConstants.DEFAULT_REPLICATE_SCHEMA_VERSION_COUNT; | ||
} | ||
} | ||
|
||
private void validateAndSetAWSEndpoint(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_ENDPOINT)) { | ||
this.endPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_ENDPOINT)); | ||
} | ||
} | ||
|
||
private void validateAndSetAWSSourceEndpoint(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_SOURCE_ENDPOINT)) { | ||
this.sourceEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_SOURCE_ENDPOINT)); | ||
} | ||
} | ||
|
||
private void validateAndSetAWSTargetEndpoint(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.AWS_TARGET_ENDPOINT)) { | ||
this.targetEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_TARGET_ENDPOINT)); | ||
} else { | ||
this.targetEndPoint = String.valueOf(configs.get(AWSSchemaRegistryConstants.AWS_ENDPOINT)); | ||
} | ||
} | ||
|
||
private void validateAndSetProxyUrl(Map<String, ?> configs) { | ||
if (isPresent(configs, AWSSchemaRegistryConstants.PROXY_URL)) { | ||
String value = (String) configs.get(AWSSchemaRegistryConstants.PROXY_URL); | ||
try { | ||
this.proxyUrl = URI.create(value); | ||
} catch (IllegalArgumentException e) { | ||
String message = String.format("Proxy URL property is not a valid URL: %s", value); | ||
throw new AWSSchemaRegistryException(message, e); | ||
} | ||
String value = (String) configs.get(AWSSchemaRegistryConstants.PROXY_URL); | ||
try { | ||
this.proxyUrl = URI.create(value); | ||
} catch (IllegalArgumentException e) { | ||
String message = String.format("Proxy URL property is not a valid URL: %s", value); | ||
throw new AWSSchemaRegistryException(message, e); | ||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
|
||
package com.amazonaws.services.schemaregistry.utils; | ||
|
||
import software.amazon.awssdk.services.glue.endpoints.internal.Value; | ||
import software.amazon.awssdk.services.glue.model.Compatibility; | ||
|
||
public final class AWSSchemaRegistryConstants { | ||
|
@@ -30,6 +31,30 @@ public final class AWSSchemaRegistryConstants { | |
* AWS region to use while initializing the client for service. | ||
*/ | ||
public static final String AWS_REGION = "region"; | ||
/** | ||
* AWS source endpoint to use while initializing the client for service. | ||
*/ | ||
public static final String AWS_SOURCE_ENDPOINT = "source.endpoint"; | ||
/** | ||
* AWS source region to use while initializing the client for service. | ||
*/ | ||
public static final String AWS_SOURCE_REGION = "source.region"; | ||
/** | ||
* AWS target endpoint to use while initializing the client for service. | ||
*/ | ||
public static final String AWS_TARGET_ENDPOINT = "target.endpoint"; | ||
/** | ||
* AWS target region to use while initializing the client for service. | ||
*/ | ||
public static final String AWS_TARGET_REGION = "target.region"; | ||
/** | ||
* Number of schema versions to replicate from source to target | ||
*/ | ||
public static final String REPLICATE_SCHEMA_VERSION_COUNT = "replicateSchemaVersionCount"; | ||
/** | ||
* Default number of schema versions to replicate from source to target | ||
*/ | ||
public static final Integer DEFAULT_REPLICATE_SCHEMA_VERSION_COUNT = 10; | ||
/** | ||
* Header Version Byte. | ||
*/ | ||
|
@@ -104,6 +129,14 @@ public final class AWSSchemaRegistryConstants { | |
* Default registry name if not passed by the client. | ||
*/ | ||
public static final String DEFAULT_REGISTRY_NAME = "default-registry"; | ||
/** | ||
* Source Registry Name. | ||
*/ | ||
public static final String SOURCE_REGISTRY_NAME = "source.registry.name"; | ||
/** | ||
* Target Registry Name. | ||
*/ | ||
public static final String TARGET_REGISTRY_NAME = "target.registry.name"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. Converter specific code must be in converter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed. Moved this to converter module |
||
/** | ||
* Compatibility setting, will be helpful at the time of schema evolution. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above on imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restored back to original with no changes