From bd88fc134dbfcf1b0677e005b2e5257bd32e5475 Mon Sep 17 00:00:00 2001 From: phshah95 Date: Sun, 25 Sep 2022 18:31:52 -0400 Subject: [PATCH 1/6] fix optional fields in json --- .../JsonSchemaToConnectSchemaConverter.java | 4 +- .../jsonschema/JsonSchemaConverterTest.java | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java index 2ba681ad..fa27e157 100644 --- a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java +++ b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java @@ -93,7 +93,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema, .anyMatch(schema -> schema instanceof NullSchema); boolean isOptionalUnion = - CombinedSchema.ONE_CRITERION.equals(criterion) && subSchemas.size() == 2 && hasNullSchema; + (CombinedSchema.ONE_CRITERION.equals(criterion) + || CombinedSchema.ANY_CRITERION.equals(criterion)) + && subSchemas.size() == 2 && hasNullSchema; if (isOptionalUnion) { return buildOptionalUnionSchema(subSchemas); } diff --git a/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java b/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java index ca057d37..78e040e7 100644 --- a/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java +++ b/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java @@ -31,6 +31,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.json.DecimalFormat; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -102,6 +104,42 @@ public void testConverter_configure_notNull() { assertNotNull(converter.getJsonSchemaToConnectSchemaConverter()); } + @Test + public void testConverter_optional_fields() { + converter = new JsonSchemaConverter(); + converter.configure(getProperties(), false); + + String jsonSchemaString = "{" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\"," + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"name\": {" + + " \"type\": [" + + " \"string\"," + + " \"null\"" + + " ]" + + " }" + + " }," + + " \"additionalProperties\": false" + + "}"; + + org.everit.json.schema.Schema jsonSchema = null; + + try { + JSONObject jsonSchemaObject = new JSONObject(jsonSchemaString); + jsonSchema = SchemaLoader.builder() + .schemaJson(jsonSchemaObject) + .build() + .load() + .build(); + } catch (Exception e) { + throw new DataException("Failed to read JSON Schema : " + jsonSchemaString, e); + } + + Schema connectSchema = converter.getJsonSchemaToConnectSchemaConverter().toConnectSchema(jsonSchema); + assertEquals(1, connectSchema.fields().size()); + } + @ParameterizedTest @MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#" + "testSchemaAndValueArgumentsProvider") @@ -139,6 +177,8 @@ public void testConverter_fromConnectData_equalsToConnectData(org.everit.json.sc } } + + @ParameterizedTest @MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#" + "testSchemaAndValueArgumentsProvider") From bc42026cf6be4633d07a37ab2d1f6aebcb083eaf Mon Sep 17 00:00:00 2001 From: phshah95 Date: Thu, 29 Sep 2022 14:49:00 -0400 Subject: [PATCH 2/6] fix issues with caching every field type --- .../jsonschema/JsonSchemaToConnectSchemaConverter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java index fa27e157..6b47782c 100644 --- a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java +++ b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.errors.DataException; import org.everit.json.schema.CombinedSchema; import org.everit.json.schema.NullSchema; +import org.everit.json.schema.ObjectSchema; import org.everit.json.schema.ReferenceSchema; import java.util.Collection; @@ -112,7 +113,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema, populateConnectProperties(builder, jsonSchema, required, connectName); Schema result = builder.build(); - toConnectSchemaCache.put(jsonSchema, result); + if (jsonSchema instanceof ObjectSchema) { + toConnectSchemaCache.put(jsonSchema, result); + } return result; } From 552c861c9e59350da8ee8b8f0a3d0daaa11b3768 Mon Sep 17 00:00:00 2001 From: phshah95 Date: Mon, 28 Nov 2022 16:19:48 -0500 Subject: [PATCH 3/6] fix longs --- .../schemaregistry/deserializers/json/JsonDeserializer.java | 3 ++- .../schemaregistry/serializers/json/JsonSerializer.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java index 31130e5c..ca9abf0d 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java @@ -20,6 +20,7 @@ import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.amazonaws.services.schemaregistry.common.Schema; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -57,7 +58,7 @@ public class JsonDeserializer implements GlueSchemaRegistryDataFormatDeserialize public JsonDeserializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); - this.objectMapper = new ObjectMapper(); + this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN); this.objectMapper.setNodeFactory(jsonNodeFactory); if (configs != null) { if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) { diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java index dd92ae4c..28922d2d 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java @@ -17,6 +17,7 @@ import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer; import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -52,7 +53,7 @@ public class JsonSerializer implements GlueSchemaRegistryDataFormatSerializer { public JsonSerializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); - this.objectMapper = new ObjectMapper(); + this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN); this.objectMapper.setNodeFactory(jsonNodeFactory); if (configs != null) { if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) { From 65ddfbbeac49639a6efbf664ae5af8da2e5fb6ea Mon Sep 17 00:00:00 2001 From: phshah95 Date: Mon, 28 Nov 2022 17:47:00 -0500 Subject: [PATCH 4/6] fix longs 2 --- .../deserializers/json/JsonDeserializer.java | 4 ++-- .../json/JsonDoubleSerializer.java | 23 +++++++++++++++++++ .../serializers/json/JsonLongSerializer.java | 23 +++++++++++++++++++ .../serializers/json/JsonSerializer.java | 5 ++++ 4 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java create mode 100644 serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java index ca9abf0d..cbba39f2 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java @@ -86,8 +86,8 @@ public Object deserialize(@NonNull ByteBuffer buffer, @NonNull Schema schemaObject) { try { String schema = schemaObject.getSchemaDefinition(); - byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer); - +// byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer); + byte[] data = buffer.array(); log.debug("Length of actual message: {}", data.length); Object deserializedObject; diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java new file mode 100644 index 00000000..29142883 --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java @@ -0,0 +1,23 @@ +package com.amazonaws.services.schemaregistry.serializers.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.math.BigDecimal; + +public class JsonDoubleSerializer extends JsonSerializer { + + public JsonDoubleSerializer() { + super(); + } + + @Override + public void serialize(Double value, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + System.out.println("value = " + value); + jgen.writeNumber(new BigDecimal(value)); + } + +} \ No newline at end of file diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java new file mode 100644 index 00000000..b10b18b9 --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java @@ -0,0 +1,23 @@ +package com.amazonaws.services.schemaregistry.serializers.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.math.BigDecimal; + +public class JsonLongSerializer extends JsonSerializer { + + public JsonLongSerializer() { + super(); + } + + @Override + public void serialize(Long value, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + System.out.println("value = " + value); + jgen.writeNumber(new BigDecimal(value)); + } + +} \ No newline at end of file diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java index 28922d2d..debf1f34 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.kjetland.jackson.jsonSchema.JsonSchemaGenerator; import lombok.Builder; @@ -55,6 +56,10 @@ public JsonSerializer(GlueSchemaRegistryConfiguration configs) { JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN); this.objectMapper.setNodeFactory(jsonNodeFactory); + SimpleModule module = new SimpleModule(); + module.addSerializer(Double.class, new JsonDoubleSerializer()); + module.addSerializer(Long.class, new JsonLongSerializer()); + this.objectMapper.registerModule(module); if (configs != null) { if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) { configs.getJacksonSerializationFeatures() From 04dde7396fe03c1e21e3879f44a60daad1bdcc40 Mon Sep 17 00:00:00 2001 From: phshah95 Date: Mon, 28 Nov 2022 17:56:34 -0500 Subject: [PATCH 5/6] fix longs 3 --- .../schemaregistry/deserializers/json/JsonDeserializer.java | 3 +-- .../schemaregistry/serializers/json/JsonDoubleSerializer.java | 1 - .../schemaregistry/serializers/json/JsonLongSerializer.java | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java index cbba39f2..5a40ffe0 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java @@ -86,8 +86,7 @@ public Object deserialize(@NonNull ByteBuffer buffer, @NonNull Schema schemaObject) { try { String schema = schemaObject.getSchemaDefinition(); -// byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer); - byte[] data = buffer.array(); + byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer); log.debug("Length of actual message: {}", data.length); Object deserializedObject; diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java index 29142883..93a68a68 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java @@ -16,7 +16,6 @@ public JsonDoubleSerializer() { @Override public void serialize(Double value, JsonGenerator jgen, SerializerProvider provider) throws IOException { - System.out.println("value = " + value); jgen.writeNumber(new BigDecimal(value)); } diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java index b10b18b9..b954fb4a 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java @@ -16,7 +16,6 @@ public JsonLongSerializer() { @Override public void serialize(Long value, JsonGenerator jgen, SerializerProvider provider) throws IOException { - System.out.println("value = " + value); jgen.writeNumber(new BigDecimal(value)); } From 9c2f55dea7b59f30c5484d7626ffc91add9d47d6 Mon Sep 17 00:00:00 2001 From: phshah95 Date: Wed, 30 Nov 2022 15:54:27 -0500 Subject: [PATCH 6/6] fix intgers --- .../jsonschema/typeconverters/TypeConverterFactory.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java index 51e0be92..d6189347 100644 --- a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java +++ b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java @@ -27,6 +27,7 @@ import org.everit.json.schema.NumberSchema; import org.everit.json.schema.ObjectSchema; import org.everit.json.schema.StringSchema; +import org.json.JSONObject; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -51,7 +52,10 @@ public TypeConverter get(@NonNull org.everit.json.schema.Schema jsonSchema, typeConverter = get(Schema.Type.BOOLEAN); } else if (jsonSchema instanceof NumberSchema) { // If no connect type passed then assume that connect schema is for FLOAT64 type data - if (connectType == null) { + JSONObject parsedSchema = new JSONObject(jsonSchema.toString()); + if (parsedSchema.get("type") == "integer") { + typeConverter = get(Schema.Type.INT64); + } else if (connectType == null) { typeConverter = get(Schema.Type.valueOf("FLOAT64")); } else { typeConverter = get(Schema.Type.valueOf(connectType.toUpperCase()));