diff --git a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java index 2ba681ad..6b47782c 100644 --- a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java +++ b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaToConnectSchemaConverter.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.errors.DataException; import org.everit.json.schema.CombinedSchema; import org.everit.json.schema.NullSchema; +import org.everit.json.schema.ObjectSchema; import org.everit.json.schema.ReferenceSchema; import java.util.Collection; @@ -93,7 +94,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema, .anyMatch(schema -> schema instanceof NullSchema); boolean isOptionalUnion = - CombinedSchema.ONE_CRITERION.equals(criterion) && subSchemas.size() == 2 && hasNullSchema; + (CombinedSchema.ONE_CRITERION.equals(criterion) + || CombinedSchema.ANY_CRITERION.equals(criterion)) + && subSchemas.size() == 2 && hasNullSchema; if (isOptionalUnion) { return buildOptionalUnionSchema(subSchemas); } @@ -110,7 +113,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema, populateConnectProperties(builder, jsonSchema, required, connectName); Schema result = builder.build(); - toConnectSchemaCache.put(jsonSchema, result); + if (jsonSchema instanceof ObjectSchema) { + toConnectSchemaCache.put(jsonSchema, result); + } return result; } diff --git a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java index 51e0be92..d6189347 100644 --- a/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java +++ b/jsonschema-kafkaconnect-converter/src/main/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/typeconverters/TypeConverterFactory.java @@ -27,6 +27,7 @@ import org.everit.json.schema.NumberSchema; import org.everit.json.schema.ObjectSchema; import org.everit.json.schema.StringSchema; +import org.json.JSONObject; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -51,7 +52,10 @@ public TypeConverter get(@NonNull org.everit.json.schema.Schema jsonSchema, typeConverter = get(Schema.Type.BOOLEAN); } else if (jsonSchema instanceof NumberSchema) { // If no connect type passed then assume that connect schema is for FLOAT64 type data - if (connectType == null) { + JSONObject parsedSchema = new JSONObject(jsonSchema.toString()); + if (parsedSchema.get("type") == "integer") { + typeConverter = get(Schema.Type.INT64); + } else if (connectType == null) { typeConverter = get(Schema.Type.valueOf("FLOAT64")); } else { typeConverter = get(Schema.Type.valueOf(connectType.toUpperCase())); diff --git a/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java b/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java index ca057d37..78e040e7 100644 --- a/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java +++ b/jsonschema-kafkaconnect-converter/src/test/java/com/amazonaws/services/schemaregistry/kafkaconnect/jsonschema/JsonSchemaConverterTest.java @@ -31,6 +31,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.json.DecimalFormat; +import org.everit.json.schema.loader.SchemaLoader; +import org.json.JSONObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -102,6 +104,42 @@ public void testConverter_configure_notNull() { assertNotNull(converter.getJsonSchemaToConnectSchemaConverter()); } + @Test + public void testConverter_optional_fields() { + converter = new JsonSchemaConverter(); + converter.configure(getProperties(), false); + + String jsonSchemaString = "{" + + " \"$schema\": \"http://json-schema.org/draft-07/schema#\"," + + " \"type\": \"object\"," + + " \"properties\": {" + + " \"name\": {" + + " \"type\": [" + + " \"string\"," + + " \"null\"" + + " ]" + + " }" + + " }," + + " \"additionalProperties\": false" + + "}"; + + org.everit.json.schema.Schema jsonSchema = null; + + try { + JSONObject jsonSchemaObject = new JSONObject(jsonSchemaString); + jsonSchema = SchemaLoader.builder() + .schemaJson(jsonSchemaObject) + .build() + .load() + .build(); + } catch (Exception e) { + throw new DataException("Failed to read JSON Schema : " + jsonSchemaString, e); + } + + Schema connectSchema = converter.getJsonSchemaToConnectSchemaConverter().toConnectSchema(jsonSchema); + assertEquals(1, connectSchema.fields().size()); + } + @ParameterizedTest @MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#" + "testSchemaAndValueArgumentsProvider") @@ -139,6 +177,8 @@ public void testConverter_fromConnectData_equalsToConnectData(org.everit.json.sc } } + + @ParameterizedTest @MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#" + "testSchemaAndValueArgumentsProvider") diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java index 31130e5c..5a40ffe0 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/deserializers/json/JsonDeserializer.java @@ -20,6 +20,7 @@ import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.amazonaws.services.schemaregistry.common.Schema; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; @@ -57,7 +58,7 @@ public class JsonDeserializer implements GlueSchemaRegistryDataFormatDeserialize public JsonDeserializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); - this.objectMapper = new ObjectMapper(); + this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN); this.objectMapper.setNodeFactory(jsonNodeFactory); if (configs != null) { if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) { @@ -86,7 +87,6 @@ public Object deserialize(@NonNull ByteBuffer buffer, try { String schema = schemaObject.getSchemaDefinition(); byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer); - log.debug("Length of actual message: {}", data.length); Object deserializedObject; diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java new file mode 100644 index 00000000..93a68a68 --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonDoubleSerializer.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.schemaregistry.serializers.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.math.BigDecimal; + +public class JsonDoubleSerializer extends JsonSerializer { + + public JsonDoubleSerializer() { + super(); + } + + @Override + public void serialize(Double value, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + jgen.writeNumber(new BigDecimal(value)); + } + +} \ No newline at end of file diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java new file mode 100644 index 00000000..b954fb4a --- /dev/null +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonLongSerializer.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.schemaregistry.serializers.json; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.math.BigDecimal; + +public class JsonLongSerializer extends JsonSerializer { + + public JsonLongSerializer() { + super(); + } + + @Override + public void serialize(Long value, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + jgen.writeNumber(new BigDecimal(value)); + } + +} \ No newline at end of file diff --git a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java index dd92ae4c..debf1f34 100644 --- a/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java +++ b/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/json/JsonSerializer.java @@ -17,9 +17,11 @@ import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer; import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration; import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.kjetland.jackson.jsonSchema.JsonSchemaGenerator; import lombok.Builder; @@ -52,8 +54,12 @@ public class JsonSerializer implements GlueSchemaRegistryDataFormatSerializer { public JsonSerializer(GlueSchemaRegistryConfiguration configs) { this.schemaRegistrySerDeConfigs = configs; JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); - this.objectMapper = new ObjectMapper(); + this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN); this.objectMapper.setNodeFactory(jsonNodeFactory); + SimpleModule module = new SimpleModule(); + module.addSerializer(Double.class, new JsonDoubleSerializer()); + module.addSerializer(Long.class, new JsonLongSerializer()); + this.objectMapper.registerModule(module); if (configs != null) { if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) { configs.getJacksonSerializationFeatures()