Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix optional fields in json #216

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.connect.errors.DataException;
import org.everit.json.schema.CombinedSchema;
import org.everit.json.schema.NullSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.ReferenceSchema;

import java.util.Collection;
Expand Down Expand Up @@ -93,7 +94,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema,
.anyMatch(schema -> schema instanceof NullSchema);

boolean isOptionalUnion =
CombinedSchema.ONE_CRITERION.equals(criterion) && subSchemas.size() == 2 && hasNullSchema;
(CombinedSchema.ONE_CRITERION.equals(criterion)
|| CombinedSchema.ANY_CRITERION.equals(criterion))
&& subSchemas.size() == 2 && hasNullSchema;
if (isOptionalUnion) {
return buildOptionalUnionSchema(subSchemas);
}
Expand All @@ -110,7 +113,9 @@ public Schema toConnectSchema(org.everit.json.schema.Schema jsonSchema,
populateConnectProperties(builder, jsonSchema, required, connectName);

Schema result = builder.build();
toConnectSchemaCache.put(jsonSchema, result);
if (jsonSchema instanceof ObjectSchema) {
toConnectSchemaCache.put(jsonSchema, result);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.everit.json.schema.NumberSchema;
import org.everit.json.schema.ObjectSchema;
import org.everit.json.schema.StringSchema;
import org.json.JSONObject;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -51,7 +52,10 @@ public TypeConverter get(@NonNull org.everit.json.schema.Schema jsonSchema,
typeConverter = get(Schema.Type.BOOLEAN);
} else if (jsonSchema instanceof NumberSchema) {
// If no connect type passed then assume that connect schema is for FLOAT64 type data
if (connectType == null) {
JSONObject parsedSchema = new JSONObject(jsonSchema.toString());
if (parsedSchema.get("type") == "integer") {
typeConverter = get(Schema.Type.INT64);
} else if (connectType == null) {
typeConverter = get(Schema.Type.valueOf("FLOAT64"));
} else {
typeConverter = get(Schema.Type.valueOf(connectType.toUpperCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.json.DecimalFormat;
import org.everit.json.schema.loader.SchemaLoader;
import org.json.JSONObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -102,6 +104,42 @@ public void testConverter_configure_notNull() {
assertNotNull(converter.getJsonSchemaToConnectSchemaConverter());
}

@Test
public void testConverter_optional_fields() {
converter = new JsonSchemaConverter();
converter.configure(getProperties(), false);

String jsonSchemaString = "{" +
" \"$schema\": \"http://json-schema.org/draft-07/schema#\"," +
" \"type\": \"object\"," +
" \"properties\": {" +
" \"name\": {" +
" \"type\": [" +
" \"string\"," +
" \"null\"" +
" ]" +
" }" +
" }," +
" \"additionalProperties\": false" +
"}";

org.everit.json.schema.Schema jsonSchema = null;

try {
JSONObject jsonSchemaObject = new JSONObject(jsonSchemaString);
jsonSchema = SchemaLoader.builder()
.schemaJson(jsonSchemaObject)
.build()
.load()
.build();
} catch (Exception e) {
throw new DataException("Failed to read JSON Schema : " + jsonSchemaString, e);
}

Schema connectSchema = converter.getJsonSchemaToConnectSchemaConverter().toConnectSchema(jsonSchema);
assertEquals(1, connectSchema.fields().size());
}

@ParameterizedTest
@MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#"
+ "testSchemaAndValueArgumentsProvider")
Expand Down Expand Up @@ -139,6 +177,8 @@ public void testConverter_fromConnectData_equalsToConnectData(org.everit.json.sc
}
}



@ParameterizedTest
@MethodSource(value = "com.amazonaws.services.schemaregistry.kafkaconnect.jsonschema.TestDataProvider#"
+ "testSchemaAndValueArgumentsProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.amazonaws.services.schemaregistry.common.Schema;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
Expand Down Expand Up @@ -57,7 +58,7 @@ public class JsonDeserializer implements GlueSchemaRegistryDataFormatDeserialize
public JsonDeserializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper = new ObjectMapper();
this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
this.objectMapper.setNodeFactory(jsonNodeFactory);
if (configs != null) {
if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) {
Expand Down Expand Up @@ -86,7 +87,6 @@ public Object deserialize(@NonNull ByteBuffer buffer,
try {
String schema = schemaObject.getSchemaDefinition();
byte[] data = DESERIALIZER_DATA_PARSER.getPlainData(buffer);

log.debug("Length of actual message: {}", data.length);

Object deserializedObject;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.amazonaws.services.schemaregistry.serializers.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;
import java.math.BigDecimal;

public class JsonDoubleSerializer extends JsonSerializer<Double> {

public JsonDoubleSerializer() {
super();
}

@Override
public void serialize(Double value, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
jgen.writeNumber(new BigDecimal(value));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.amazonaws.services.schemaregistry.serializers.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;
import java.math.BigDecimal;

public class JsonLongSerializer extends JsonSerializer<Long> {

public JsonLongSerializer() {
super();
}

@Override
public void serialize(Long value, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
jgen.writeNumber(new BigDecimal(value));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import com.amazonaws.services.schemaregistry.common.GlueSchemaRegistryDataFormatSerializer;
import com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.kjetland.jackson.jsonSchema.JsonSchemaGenerator;
import lombok.Builder;
Expand Down Expand Up @@ -52,8 +54,12 @@ public class JsonSerializer implements GlueSchemaRegistryDataFormatSerializer {
public JsonSerializer(GlueSchemaRegistryConfiguration configs) {
this.schemaRegistrySerDeConfigs = configs;
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper = new ObjectMapper();
this.objectMapper = new ObjectMapper().enable(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN);
this.objectMapper.setNodeFactory(jsonNodeFactory);
SimpleModule module = new SimpleModule();
module.addSerializer(Double.class, new JsonDoubleSerializer());
module.addSerializer(Long.class, new JsonLongSerializer());
this.objectMapper.registerModule(module);
if (configs != null) {
if (!CollectionUtils.isEmpty(configs.getJacksonSerializationFeatures())) {
configs.getJacksonSerializationFeatures()
Expand Down