From 9c5dae010a9cf4325a26d73404f10f0617ba41b7 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 15 Jun 2020 16:46:20 +0200 Subject: [PATCH 01/12] Use lambdas for Avro data conversion --- .../producer/rest/AvroDataMapperFactory.java | 176 +++++------------- 1 file changed, 49 insertions(+), 127 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java index e7a84623..6b046813 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java @@ -103,19 +103,9 @@ public AvroDataMapper createMapper(Schema from, Schema to, final Object defaultV } catch (SchemaValidationException ex) { if (defaultVal != null) { if (defaultVal == NULL_VALUE) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return null; - } - }; + return obj -> null; } else { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return defaultVal; - } - }; + return obj -> defaultVal; } } else { throw ex; @@ -142,12 +132,7 @@ private static AvroDataMapper mapEnum(Schema from, final Schema to, Object defau "Cannot map enum from non-string or enum type")); } if (containsAll) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return new GenericData.EnumSymbol(to, obj.toString()); - } - }; + return obj -> new GenericData.EnumSymbol(to, obj.toString()); } else { String defaultString = (String) defaultVal; if (defaultString == null && to.hasEnumSymbol("UNKNOWN")) { @@ -158,26 +143,18 @@ public Object convertAvro(Object obj) { "Cannot map enum symbols without default value")); } else { final GenericEnumSymbol symbol = new GenericData.EnumSymbol(to, defaultString); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - String value = obj.toString(); - if (to.hasEnumSymbol(value)) { - return new GenericData.EnumSymbol(to, value); - } else { - return symbol; - } + return obj -> { + String value = obj.toString(); + if (to.hasEnumSymbol(value)) { + return new GenericData.EnumSymbol(to, value); + } else { + return symbol; } }; } } } else if (from.getType() == Schema.Type.ENUM && to.getType() == Schema.Type.STRING) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return obj.toString(); - } - }; + return Object::toString; } else { throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map unknown type with enum.")); @@ -244,40 +221,15 @@ public Number stringToNumber(String obj) { } else { switch (to.getType()) { case INT: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).intValue(); - } - }; + return obj -> ((Number) obj).intValue(); case LONG: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).longValue(); - } - }; + return obj -> ((Number) obj).longValue(); case DOUBLE: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return Double.valueOf(obj.toString()); - } - }; + return obj -> Double.valueOf(obj.toString()); case FLOAT: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return ((Number) obj).floatValue(); - } - }; + return obj -> ((Number) obj).floatValue(); case STRING: - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - return obj.toString(); - } - }; + return Object::toString; default: throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map numeric type with non-numeric type")); @@ -318,14 +270,11 @@ private AvroDataMapper mapUnion(Schema from, Schema to, Object defaultVal) if (defaultVal != null) { final Object actualDefault = getDefaultValue(defaultVal, to); final AvroDataMapper subMapper = createMapper(resolvedFrom, to, defaultVal); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - if (obj == null) { - return actualDefault; - } else { - return subMapper.convertAvro(obj); - } + return obj -> { + if (obj == null) { + return actualDefault; + } else { + return subMapper.convertAvro(obj); } }; } else { @@ -335,14 +284,11 @@ public Object convertAvro(Object obj) { } else { Schema toNonNull = nonNullUnionSchema(to); final AvroDataMapper unionMapper = createMapper(resolvedFrom, toNonNull, defaultVal); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - if (obj == null) { - return null; - } else { - return unionMapper.convertAvro(obj); - } + return obj -> { + if (obj == null) { + return null; + } else { + return unionMapper.convertAvro(obj); } }; } @@ -357,16 +303,13 @@ private AvroDataMapper mapArray(Schema from, Schema to) } final AvroDataMapper subMapper = createMapper(from.getElementType(), to.getElementType(), null); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - List array = (List) obj; - List toArray = new ArrayList<>(array.size()); - for (Object val : array) { - toArray.add(subMapper.convertAvro(val)); - } - return toArray; + return obj -> { + List array = (List) obj; + List toArray = new ArrayList<>(array.size()); + for (Object val : array) { + toArray.add(subMapper.convertAvro(val)); } + return toArray; }; } @@ -378,17 +321,14 @@ private AvroDataMapper mapMap(Schema from, Schema to) throws SchemaValidationExc } final AvroDataMapper subMapper = createMapper(from.getValueType(), to.getValueType(), null); - return new AvroDataMapper() { - @Override - public Object convertAvro(Object obj) { - @SuppressWarnings("unchecked") - Map map = (Map) obj; - Map toMap = new HashMap<>(map.size() * 4 / 3 + 1); - for (Map.Entry entry : map.entrySet()) { - toMap.put(entry.getKey().toString(), subMapper.convertAvro(entry.getValue())); - } - return toMap; + return obj -> { + @SuppressWarnings("unchecked") + Map map = (Map) obj; + Map toMap = new HashMap<>(map.size() * 4 / 3 + 1); + for (Map.Entry entry : map.entrySet()) { + toMap.put(entry.getKey().toString(), subMapper.convertAvro(entry.getValue())); } + return toMap; }; } @@ -399,46 +339,28 @@ private AvroDataMapper mapBytes(Schema from, final Schema to, final Object defau || (from.getType() == Type.FIXED && from.getFixedSize() == to.getFixedSize()))) { return IDENTITY_MAPPER; } else if (from.getType() == Type.FIXED && to.getType() == Schema.Type.BYTES) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return ByteBuffer.wrap(((Fixed)object).bytes()); - } - }; + return object -> ByteBuffer.wrap(((Fixed)object).bytes()); } else if (from.getType() == Type.BYTES && to.getType() == Type.FIXED) { if (defaultVal == null) { throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map bytes to fixed without default value")); } - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - byte[] bytes = ((ByteBuffer) object).array(); - if (bytes.length == to.getFixedSize()) { - return GenericData.get().createFixed(null, bytes, to); - } else { - return GenericData.get().createFixed(null, (byte[]) defaultVal, to); - } + return object -> { + byte[] bytes = ((ByteBuffer) object).array(); + if (bytes.length == to.getFixedSize()) { + return GenericData.get().createFixed(null, bytes, to); + } else { + return GenericData.get().createFixed(null, (byte[]) defaultVal, to); } }; } else if (to.getType() == Type.STRING) { final Encoder encoder = Base64.getEncoder(); if (from.getType() == Type.FIXED) { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return new String(encoder.encode(((Fixed) object).bytes()), - StandardCharsets.UTF_8); - } - }; + return object -> new String(encoder.encode(((Fixed) object).bytes()), + StandardCharsets.UTF_8); } else { - return new AvroDataMapper() { - @Override - public Object convertAvro(Object object) { - return new String(encoder.encode(((ByteBuffer) object).array()), - StandardCharsets.UTF_8); - } - }; + return object -> new String(encoder.encode(((ByteBuffer) object).array()), + StandardCharsets.UTF_8); } } else { throw new SchemaValidationException(to, from, From 1fac302ad661c64cd7c8ae24524956b7f9506faf Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 15 Jun 2020 16:48:15 +0200 Subject: [PATCH 02/12] Added base functionality to schema retriever --- .../producer/rest/SchemaRetriever.java | 107 ++++++++++++++---- 1 file changed, 88 insertions(+), 19 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index ef5e5ce7..f5ff4532 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -17,7 +17,9 @@ package org.radarbase.producer.rest; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -30,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericContainer; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.radarbase.config.ServerConfig; @@ -60,11 +63,18 @@ public class SchemaRetriever { PRIMITIVE_SCHEMAS.put(byte[].class, Schema.create(Type.BYTES)); } - private final ConcurrentMap cache = new ConcurrentHashMap<>(); + private final ConcurrentMap> cache = + new ConcurrentHashMap<>(); + private final ConcurrentMap> idCache = + new ConcurrentHashMap<>(); + private final ConcurrentMap>> versionCache = + new ConcurrentHashMap<>(); private final RestClient restClient; + private final long cacheValidity; - private SchemaRetriever(RestClient client) { + private SchemaRetriever(RestClient client, long cacheValidity) { restClient = client; + this.cacheValidity = cacheValidity; } /** @@ -76,9 +86,23 @@ public SchemaRetriever(ServerConfig config, long connectionTimeout) { this(RestClient.global() .server(Objects.requireNonNull(config)) .timeout(connectionTimeout, TimeUnit.SECONDS) - .build()); + .build(), MAX_VALIDITY); } + /** + * Schema retriever for a Confluent Schema Registry. + * @param config schema registry configuration. + * @param connectionTimeout timeout in seconds. + * @param cacheValidity timeout in seconds for considering a schema stale. + */ + public SchemaRetriever(ServerConfig config, long connectionTimeout, long cacheValidity) { + this(RestClient.global() + .server(Objects.requireNonNull(config)) + .timeout(connectionTimeout, TimeUnit.SECONDS) + .build(), cacheValidity); + } + + /** The subject in the Avro Schema Registry, given a Kafka topic. */ protected static String subject(String topic, boolean ofValue) { return topic + (ofValue ? "-value" : "-key"); @@ -96,28 +120,41 @@ protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int versio } else { pathBuilder.append("latest"); } - Request request = restClient.requestBuilder(pathBuilder.toString()) - .addHeader("Accept", "application/json") - .build(); - String response = restClient.requestString(request); - JSONObject node = new JSONObject(response); + JSONObject node = requestJson(pathBuilder.toString()); int newVersion = version < 1 ? node.getInt("version") : version; int schemaId = node.getInt("id"); Schema schema = parseSchema(node.getString("schema")); return new ParsedSchemaMetadata(schemaId, newVersion, schema); } + private JSONObject requestJson(String path) throws IOException { + Request request = restClient.requestBuilder(path) + .addHeader("Accept", "application/json") + .build(); + + String response = restClient.requestString(request); + return new JSONObject(response); + } + + /** Retrieve schema metadata from server. */ + protected ParsedSchemaMetadata retrieveSchemaById(int id) + throws JSONException, IOException { + JSONObject node = requestJson("/schemas/ids/" + id); + Schema schema = parseSchema(node.getString("schema")); + return new ParsedSchemaMetadata(id, null, schema); + } + /** Get schema metadata. Cached schema metadata will be used if present. */ public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) throws JSONException, IOException { String subject = subject(topic, ofValue); - TimedSchemaMetadata value = cache.get(subject); + TimedValue value = cache.get(subject); if (value == null || value.isExpired()) { - value = new TimedSchemaMetadata(retrieveSchemaMetadata(subject, version)); + value = new TimedValue<>(retrieveSchemaMetadata(subject, version), cacheValidity); cache.put(subject, value); } - return value.metadata; + return value.value; } /** Parse a schema from string. */ @@ -144,7 +181,7 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat int schemaId = node.getInt("id"); metadata.setId(schemaId); } - cache.put(subject, new TimedSchemaMetadata(metadata)); + cache.put(subject, new TimedValue<>(metadata, cacheValidity)); } /** @@ -184,6 +221,38 @@ public void writeTo(BufferedSink sink) throws IOException { } } + /** Get a schema by its ID. */ + public ParsedSchemaMetadata getById(int id) throws IOException { + TimedValue value = idCache.get(id); + if (value == null || value.isExpired()) { + value = new TimedValue<>(retrieveSchemaById(id), cacheValidity); + idCache.put(id, value); + } + return value.value; + } + + /** Get all schema versions in a subject. */ + public List getVersions(String subject) throws IOException { + TimedValue> value = versionCache.get(subject); + + if (value == null || value.isExpired()) { + Request request = restClient.requestBuilder("/subjects/" + subject + "/versions") + .addHeader("Accept", "application/json") + .build(); + + String response = restClient.requestString(request); + JSONArray node = new JSONArray(response); + + List versions = new ArrayList<>(node.length()); + for (int i = 0; i < node.length(); i++) { + versions.add(node.getInt(i)); + } + value = new TimedValue<>(versions, cacheValidity); + versionCache.put(subject, value); + } + return value.value; + } + /** * Get the schema of a generic object. This supports null, primitive types, String, and * {@link org.apache.avro.generic.GenericContainer}. @@ -206,13 +275,13 @@ public static Schema getSchema(Object object) { + "Pass null, a primitive CONTENT_TYPE or a GenericContainer."); } - private static class TimedSchemaMetadata { - private final ParsedSchemaMetadata metadata; + private static class TimedValue { + private final T value; private final long expiry; - TimedSchemaMetadata(ParsedSchemaMetadata metadata) { - expiry = System.currentTimeMillis() + MAX_VALIDITY * 1000L; - this.metadata = Objects.requireNonNull(metadata); + TimedValue(T value, long maxValidity) { + expiry = System.currentTimeMillis() + maxValidity * 1000L; + this.value = Objects.requireNonNull(value); } boolean isExpired() { @@ -227,12 +296,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return metadata.equals(((TimedSchemaMetadata)o).metadata); + return value.equals(((TimedValue)o).value); } @Override public int hashCode() { - return metadata.hashCode(); + return value.hashCode(); } } } From d2e7181897e2b8d58b030c6f8118c30fd305c220 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 15 Jun 2020 16:49:01 +0200 Subject: [PATCH 03/12] Bump dev version --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 05c480b0..ae0034e5 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ subprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.12.3' + version = '0.12.4-SNAPSHOT' group = 'org.radarbase' ext.githubRepoName = 'RADAR-base/radar-commons' From 7ca4b97559b470d12b30ddb2aada401148e8a168 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Mon, 15 Jun 2020 16:59:39 +0200 Subject: [PATCH 04/12] Cache schemas based on version --- .../org/radarbase/producer/rest/SchemaRetriever.java | 10 ++++++---- .../radarbase/producer/rest/SchemaRetrieverTest.java | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index f5ff4532..48efba67 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -149,10 +149,11 @@ protected ParsedSchemaMetadata retrieveSchemaById(int id) public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) throws JSONException, IOException { String subject = subject(topic, ofValue); - TimedValue value = cache.get(subject); + String key = subject + '#' + version; + TimedValue value = cache.get(key); if (value == null || value.isExpired()) { value = new TimedValue<>(retrieveSchemaMetadata(subject, version), cacheValidity); - cache.put(subject, value); + cache.put(key, value); } return value.value; } @@ -170,7 +171,6 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat throws JSONException, IOException { String subject = subject(topic, ofValue); if (metadata.getId() == null) { - Request request = restClient.requestBuilder("/subjects/" + subject + "/versions") .addHeader("Accept", "application/json") .post(new SchemaRequestBody(metadata.getSchema())) @@ -181,7 +181,9 @@ public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadat int schemaId = node.getInt("id"); metadata.setId(schemaId); } - cache.put(subject, new TimedValue<>(metadata, cacheValidity)); + int version = metadata.getVersion() != null ? metadata.getVersion() : -1; + String key = subject + '#' + version; + cache.put(key, new TimedValue<>(metadata, cacheValidity)); } /** diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java index a56abae0..6a8ee63b 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java @@ -89,7 +89,7 @@ public void getSchemaMetadata() throws Exception { assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); // Already queried schema is cached and does not need another request - ParsedSchemaMetadata metadata2 = retriever.getSchemaMetadata("bla", true, -1); + ParsedSchemaMetadata metadata2 = retriever.getSchemaMetadata("bla", true, 2); assertEquals(Integer.valueOf(10), metadata2.getId()); assertEquals(Integer.valueOf(2), metadata2.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata2.getSchema()); From 46ed698abc301a5cc2e11043f371eec02b949790 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 16 Jun 2020 16:27:44 +0200 Subject: [PATCH 05/12] Expanded schema retriever functionality and caching behaviour --- .../producer/rest/SchemaRestClient.java | 117 ++++++++ .../producer/rest/SchemaRetriever.java | 254 +++++++----------- .../java/org/radarbase/util/TimedInt.java | 31 +++ .../java/org/radarbase/util/TimedValue.java | 33 +++ .../producer/rest/SchemaRestClientTest.java | 80 ++++++ .../producer/rest/SchemaRetrieverTest.java | 35 +-- 6 files changed, 370 insertions(+), 180 deletions(-) create mode 100644 radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java create mode 100644 radar-commons/src/main/java/org/radarbase/util/TimedInt.java create mode 100644 radar-commons/src/main/java/org/radarbase/util/TimedValue.java create mode 100644 radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java new file mode 100644 index 00000000..ae1a3343 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRestClient.java @@ -0,0 +1,117 @@ +package org.radarbase.producer.rest; + +import java.io.IOException; +import okhttp3.MediaType; +import okhttp3.Request; +import okhttp3.RequestBody; +import okio.BufferedSink; +import org.apache.avro.Schema; +import org.json.JSONException; +import org.json.JSONObject; +import org.radarbase.util.Strings; + +/** REST client for Confluent schema registry. */ +public class SchemaRestClient { + private final RestClient client; + + public SchemaRestClient(RestClient client) { + this.client = client; + } + + /** Retrieve schema metadata from server. */ + public ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int version) + throws JSONException, IOException { + boolean isLatest = version <= 0; + + StringBuilder pathBuilder = new StringBuilder(50) + .append("/subjects/") + .append(subject) + .append("/versions/"); + + if (isLatest) { + pathBuilder.append("latest"); + } else { + pathBuilder.append(version); + } + + JSONObject node = requestJson(pathBuilder.toString()); + int newVersion = isLatest ? node.getInt("version") : version; + int schemaId = node.getInt("id"); + Schema schema = parseSchema(node.getString("schema")); + return new ParsedSchemaMetadata(schemaId, newVersion, schema); + } + + private JSONObject requestJson(String path) throws IOException { + Request request = client.requestBuilder(path) + .addHeader("Accept", "application/json") + .build(); + + String response = client.requestString(request); + return new JSONObject(response); + } + + + /** Parse a schema from string. */ + public Schema parseSchema(String schemaString) { + Schema.Parser parser = new Schema.Parser(); + return parser.parse(schemaString); + } + + /** Add a schema to a subject. */ + public int addSchema(String subject, Schema schema) throws IOException { + Request request = client.requestBuilder("/subjects/" + subject + "/versions") + .addHeader("Accept", "application/json") + .post(new SchemaRequestBody(schema)) + .build(); + + String response = client.requestString(request); + JSONObject node = new JSONObject(response); + return node.getInt("id"); + } + + /** Request metadata for a schema on a subject. */ + public ParsedSchemaMetadata requestMetadata(String subject, Schema schema) + throws IOException { + Request request = client.requestBuilder("/subjects/" + subject) + .addHeader("Accept", "application/json") + .post(new SchemaRequestBody(schema)) + .build(); + + String response = client.requestString(request); + JSONObject node = new JSONObject(response); + + return new ParsedSchemaMetadata(node.getInt("id"), + node.getInt("version"), schema); + } + + /** Retrieve schema metadata from server. */ + public Schema retrieveSchemaById(int id) + throws JSONException, IOException { + JSONObject node = requestJson("/schemas/ids/" + id); + return parseSchema(node.getString("schema")); + } + + private static class SchemaRequestBody extends RequestBody { + private static final byte[] SCHEMA = Strings.utf8("{\"schema\":"); + private static final MediaType CONTENT_TYPE = MediaType.parse( + "application/vnd.schemaregistry.v1+json; charset=utf-8"); + + private final Schema schema; + + private SchemaRequestBody(Schema schema) { + this.schema = schema; + } + + @Override + public MediaType contentType() { + return CONTENT_TYPE; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + sink.write(SCHEMA); + sink.writeUtf8(JSONObject.quote(schema.toString())); + sink.writeByte('}'); + } + } +} diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index 48efba67..bec22ef3 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -17,26 +17,20 @@ package org.radarbase.producer.rest; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import okhttp3.MediaType; -import okhttp3.Request; -import okhttp3.RequestBody; -import okio.BufferedSink; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericContainer; -import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; import org.radarbase.config.ServerConfig; -import org.radarbase.util.Strings; +import org.radarbase.util.TimedInt; +import org.radarbase.util.TimedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,11 +40,8 @@ */ public class SchemaRetriever { private static final Logger logger = LoggerFactory.getLogger(SchemaRetriever.class); - private static final MediaType CONTENT_TYPE = MediaType.parse( - "application/vnd.schemaregistry.v1+json; charset=utf-8"); private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); private static final Map, Schema> PRIMITIVE_SCHEMAS = new HashMap<>(); - private static final byte[] SCHEMA = Strings.utf8("{\"schema\":"); private static final long MAX_VALIDITY = 86400L; static { @@ -63,20 +54,24 @@ public class SchemaRetriever { PRIMITIVE_SCHEMAS.put(byte[].class, Schema.create(Type.BYTES)); } - private final ConcurrentMap> cache = + private final ConcurrentMap> idCache = new ConcurrentHashMap<>(); - private final ConcurrentMap> idCache = + private final ConcurrentMap schemaCache = new ConcurrentHashMap<>(); + private final ConcurrentMap> subjectVersionCache = new ConcurrentHashMap<>(); - private final ConcurrentMap>> versionCache = - new ConcurrentHashMap<>(); - private final RestClient restClient; + + private final SchemaRestClient restClient; private final long cacheValidity; - private SchemaRetriever(RestClient client, long cacheValidity) { - restClient = client; + public SchemaRetriever(RestClient client, long cacheValidity) { + restClient = new SchemaRestClient(client); this.cacheValidity = cacheValidity; } + public SchemaRetriever(RestClient client) { + this(client, MAX_VALIDITY); + } + /** * Schema retriever for a Confluent Schema Registry. * @param config schema registry configuration. @@ -86,7 +81,7 @@ public SchemaRetriever(ServerConfig config, long connectionTimeout) { this(RestClient.global() .server(Objects.requireNonNull(config)) .timeout(connectionTimeout, TimeUnit.SECONDS) - .build(), MAX_VALIDITY); + .build()); } /** @@ -108,82 +103,40 @@ protected static String subject(String topic, boolean ofValue) { return topic + (ofValue ? "-value" : "-key"); } - /** Retrieve schema metadata from server. */ - protected ParsedSchemaMetadata retrieveSchemaMetadata(String subject, int version) - throws JSONException, IOException { - StringBuilder pathBuilder = new StringBuilder(50) - .append("/subjects/") - .append(subject) - .append("/versions/"); - if (version > 0) { - pathBuilder.append(version); - } else { - pathBuilder.append("latest"); - } - - JSONObject node = requestJson(pathBuilder.toString()); - int newVersion = version < 1 ? node.getInt("version") : version; - int schemaId = node.getInt("id"); - Schema schema = parseSchema(node.getString("schema")); - return new ParsedSchemaMetadata(schemaId, newVersion, schema); - } - - private JSONObject requestJson(String path) throws IOException { - Request request = restClient.requestBuilder(path) - .addHeader("Accept", "application/json") - .build(); - - String response = restClient.requestString(request); - return new JSONObject(response); - } - - /** Retrieve schema metadata from server. */ - protected ParsedSchemaMetadata retrieveSchemaById(int id) - throws JSONException, IOException { - JSONObject node = requestJson("/schemas/ids/" + id); - Schema schema = parseSchema(node.getString("schema")); - return new ParsedSchemaMetadata(id, null, schema); - } - /** Get schema metadata. Cached schema metadata will be used if present. */ public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) throws JSONException, IOException { String subject = subject(topic, ofValue); - String key = subject + '#' + version; - TimedValue value = cache.get(key); - if (value == null || value.isExpired()) { - value = new TimedValue<>(retrieveSchemaMetadata(subject, version), cacheValidity); - cache.put(key, value); + ConcurrentMap versionMap = computeIfAbsent(subjectVersionCache, subject, + new ConcurrentHashMap<>()); + TimedInt id = versionMap.get(Math.max(version, 0)); + if (id == null || id.isExpired()) { + ParsedSchemaMetadata metadata = restClient.retrieveSchemaMetadata(subject, version); + cacheMetadata(metadata, subject, version <= 0); + return metadata; + } else { + Schema schema = getById(id.value); + ParsedSchemaMetadata metadata = getCachedMetadata(subject, id.value, version, schema); + + return metadata != null ? metadata : getMetadata(topic, ofValue, schema, version <= 0); } - return value.value; } - /** Parse a schema from string. */ - protected Schema parseSchema(String schemaString) { - Schema.Parser parser = new Schema.Parser(); - return parser.parse(schemaString); + private V computeIfAbsent(ConcurrentMap original, K key, V newValue) { + V existingValue = original.putIfAbsent(key, newValue); + return existingValue != null ? existingValue : newValue; } /** * Add schema metadata to the retriever. This implementation only adds it to the cache. + * @return schema ID */ - public void addSchemaMetadata(String topic, boolean ofValue, ParsedSchemaMetadata metadata) + public int addSchema(String topic, boolean ofValue, Schema schema) throws JSONException, IOException { String subject = subject(topic, ofValue); - if (metadata.getId() == null) { - Request request = restClient.requestBuilder("/subjects/" + subject + "/versions") - .addHeader("Accept", "application/json") - .post(new SchemaRequestBody(metadata.getSchema())) - .build(); - - String response = restClient.requestString(request); - JSONObject node = new JSONObject(response); - int schemaId = node.getInt("id"); - metadata.setId(schemaId); - } - int version = metadata.getVersion() != null ? metadata.getVersion() : -1; - String key = subject + '#' + version; - cache.put(key, new TimedValue<>(metadata, cacheValidity)); + int id = restClient.addSchema(subject, schema); + cacheMetadata(new ParsedSchemaMetadata(id, null, schema), subject, false); + return id; } /** @@ -197,62 +150,91 @@ public ParsedSchemaMetadata getOrSetSchemaMetadata(String topic, boolean ofValue return getSchemaMetadata(topic, ofValue, version); } catch (IOException ex) { logger.warn("Schema for {} value was not yet added to the schema registry.", topic); - ParsedSchemaMetadata metadata = new ParsedSchemaMetadata(null, null, schema); - addSchemaMetadata(topic, ofValue, metadata); - return metadata; - } - } - - private static class SchemaRequestBody extends RequestBody { - private final Schema schema; - - private SchemaRequestBody(Schema schema) { - this.schema = schema; - } - - @Override - public MediaType contentType() { - return CONTENT_TYPE; - } - - @Override - public void writeTo(BufferedSink sink) throws IOException { - sink.write(SCHEMA); - sink.writeUtf8(JSONObject.quote(schema.toString())); - sink.writeByte('}'); + addSchema(topic, ofValue, schema); + return getMetadata(topic, ofValue, schema, version <= 0); } } /** Get a schema by its ID. */ - public ParsedSchemaMetadata getById(int id) throws IOException { - TimedValue value = idCache.get(id); + public Schema getById(int id) throws IOException { + TimedValue value = idCache.get(id); if (value == null || value.isExpired()) { - value = new TimedValue<>(retrieveSchemaById(id), cacheValidity); + value = new TimedValue<>(restClient.retrieveSchemaById(id), cacheValidity); idCache.put(id, value); + schemaCache.put(value.value, new TimedInt(id, cacheValidity)); } return value.value; } + /** Gets a schema by ID and check that it is present in the given topic. */ + public ParsedSchemaMetadata getBySubjectAndId(String topic, boolean ofValue, int id) + throws IOException { + Schema schema = getById(id); + String subject = subject(topic, ofValue); + ParsedSchemaMetadata metadata = getCachedMetadata(subject, id, null, schema); + return metadata != null ? metadata : getMetadata(topic, ofValue, schema); + } + /** Get all schema versions in a subject. */ - public List getVersions(String subject) throws IOException { - TimedValue> value = versionCache.get(subject); + public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema) + throws IOException { + return getMetadata(topic, ofValue, schema, false); + } - if (value == null || value.isExpired()) { - Request request = restClient.requestBuilder("/subjects/" + subject + "/versions") - .addHeader("Accept", "application/json") - .build(); - String response = restClient.requestString(request); - JSONArray node = new JSONArray(response); + /** Get all schema versions in a subject. */ + public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema, + boolean ofLatestVersion) throws IOException { + TimedInt id = schemaCache.get(schema); + String subject = subject(topic, ofValue); - List versions = new ArrayList<>(node.length()); - for (int i = 0; i < node.length(); i++) { - versions.add(node.getInt(i)); + if (id != null && !id.isExpired()) { + ParsedSchemaMetadata metadata = getCachedMetadata(subject, id.value, null, schema); + if (metadata != null) { + return metadata; } - value = new TimedValue<>(versions, cacheValidity); - versionCache.put(subject, value); } - return value.value; + + ParsedSchemaMetadata metadata = restClient.requestMetadata(subject, schema); + cacheMetadata(metadata, subject, ofLatestVersion); + return metadata; + } + + + protected ParsedSchemaMetadata getCachedMetadata(String subject, int id, + Integer reportedVersion, Schema schema) { + Integer version = reportedVersion; + if (version == null || version <= 0) { + ConcurrentMap versions = subjectVersionCache.get(subject); + if (versions != null) { + for (Map.Entry entry : versions.entrySet()) { + if (!entry.getValue().isExpired() && entry.getKey() != 0 + && entry.getValue().value == id) { + version = entry.getKey(); + break; + } + } + } + if (version == null || version < 1) { + return null; + } + } + return new ParsedSchemaMetadata(id, version, schema); + } + + protected void cacheMetadata(ParsedSchemaMetadata metadata, String subject, boolean latest) { + TimedInt id = new TimedInt(metadata.getId(), cacheValidity); + schemaCache.put(metadata.getSchema(), id); + if (subject != null && metadata.getVersion() != null) { + ConcurrentMap versionCache = computeIfAbsent(subjectVersionCache, + subject, new ConcurrentHashMap<>()); + + versionCache.put(metadata.getVersion(), id); + if (latest) { + versionCache.put(0, id); + } + } + idCache.put(metadata.getId(), new TimedValue<>(metadata.getSchema(), cacheValidity)); } /** @@ -276,34 +258,4 @@ public static Schema getSchema(Object object) { + object.getClass() + " can not be schematized. " + "Pass null, a primitive CONTENT_TYPE or a GenericContainer."); } - - private static class TimedValue { - private final T value; - private final long expiry; - - TimedValue(T value, long maxValidity) { - expiry = System.currentTimeMillis() + maxValidity * 1000L; - this.value = Objects.requireNonNull(value); - } - - boolean isExpired() { - return expiry < System.currentTimeMillis(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - return value.equals(((TimedValue)o).value); - } - - @Override - public int hashCode() { - return value.hashCode(); - } - } } diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedInt.java b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java new file mode 100644 index 00000000..7bd0d0b7 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java @@ -0,0 +1,31 @@ +package org.radarbase.util; + +public class TimedInt { + public final int value; + private final long expiry; + + public TimedInt(int value, long validity) { + expiry = System.currentTimeMillis() + validity * 1000L; + this.value = value; + } + + public boolean isExpired() { + return expiry < System.currentTimeMillis(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return value == ((TimedInt)o).value; + } + + @Override + public int hashCode() { + return value; + } +} diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedValue.java b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java new file mode 100644 index 00000000..9f5e50f9 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java @@ -0,0 +1,33 @@ +package org.radarbase.util; + +import java.util.Objects; + +public class TimedValue { + public final T value; + private final long expiry; + + public TimedValue(T value, long validity) { + expiry = System.currentTimeMillis() + validity * 1000L; + this.value = Objects.requireNonNull(value); + } + + public boolean isExpired() { + return expiry < System.currentTimeMillis(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return value.equals(((TimedValue)o).value); + } + + @Override + public int hashCode() { + return value.hashCode(); + } +} diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java new file mode 100644 index 00000000..fd10e838 --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRestClientTest.java @@ -0,0 +1,80 @@ +/* + * Copyright 2017 The Hyve and King's College London + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.radarbase.producer.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import okhttp3.mockwebserver.RecordedRequest; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.radarbase.config.ServerConfig; + +public class SchemaRestClientTest { + private MockWebServer server; + private SchemaRestClient retriever; + + @Before + public void setUp() { + server = new MockWebServer(); + ServerConfig config = new ServerConfig(); + config.setProtocol("http"); + config.setHost(server.getHostName()); + config.setPort(server.getPort()); + config.setPath("base"); + retriever = new SchemaRestClient(RestClient.global() + .server(Objects.requireNonNull(config)) + .timeout(1L, TimeUnit.SECONDS) + .build()); + } + + @After + public void tearDown() throws IOException { + server.close(); + } + + @Test + public void retrieveSchemaMetadata() throws Exception { + server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); + ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", -1); + assertEquals(Integer.valueOf(10), metadata.getId()); + assertEquals(Integer.valueOf(2), metadata.getVersion()); + assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + assertEquals("/base/subjects/bla-value/versions/latest", server.takeRequest().getPath()); + } + + + @Test + public void retrieveSchemaMetadataVersion() throws Exception { + server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); + ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", 2); + assertEquals(Integer.valueOf(10), metadata.getId()); + assertEquals(Integer.valueOf(2), metadata.getVersion()); + assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); + } +} diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java index 6a8ee63b..026a8cfb 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java @@ -58,27 +58,6 @@ public void subject() { assertEquals("bla-key", SchemaRetriever.subject("bla", false)); } - @Test - public void retrieveSchemaMetadata() throws Exception { - server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", -1); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Integer.valueOf(2), metadata.getVersion()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals("/base/subjects/bla-value/versions/latest", server.takeRequest().getPath()); - } - - - @Test - public void retrieveSchemaMetadataVersion() throws Exception { - server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.retrieveSchemaMetadata("bla-value", 2); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Integer.valueOf(2), metadata.getVersion()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); - } - @Test public void getSchemaMetadata() throws Exception { server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); @@ -103,11 +82,9 @@ public void getSchemaMetadata() throws Exception { @Test public void addSchemaMetadata() throws Exception { - ParsedSchemaMetadata metadata = new ParsedSchemaMetadata(null, null, Schema.create(Schema.Type.STRING)); server.enqueue(new MockResponse().setBody("{\"id\":10}")); - retriever.addSchemaMetadata("bla", true, metadata); - assertEquals(Integer.valueOf(10), metadata.getId()); - assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); + int id = retriever.addSchema("bla", true, Schema.create(Schema.Type.STRING)); + assertEquals(10, id); assertEquals(1, server.getRequestCount()); RecordedRequest request = server.takeRequest(); @@ -117,10 +94,9 @@ public void addSchemaMetadata() throws Exception { new Field("a", Schema.create(Schema.Type.INT), "that a", 10)); Schema record = Schema.createRecord("C", "that C", "org.radarcns", false, schemaFields); - metadata = new ParsedSchemaMetadata(null, null, record); server.enqueue(new MockResponse().setBody("{\"id\":11}")); - retriever.addSchemaMetadata("bla", true, metadata); - assertEquals(Integer.valueOf(11), metadata.getId()); + id = retriever.addSchema("bla", true, record); + assertEquals(11, id); request = server.takeRequest(); assertEquals("{\"schema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"C\\\",\\\"namespace\\\":\\\"org.radarcns\\\",\\\"doc\\\":\\\"that C\\\",\\\"fields\\\":[{\\\"name\\\":\\\"a\\\",\\\"type\\\":\\\"int\\\",\\\"doc\\\":\\\"that a\\\",\\\"default\\\":10}]}\"}", request.getBody().readUtf8()); } @@ -129,11 +105,12 @@ public void addSchemaMetadata() throws Exception { public void getOrSetSchemaMetadataSet() throws Exception { server.enqueue(new MockResponse().setResponseCode(404)); server.enqueue(new MockResponse().setBody("{\"id\":10}")); + server.enqueue(new MockResponse().setBody("{\"id\":10, \"version\": 2}")); ParsedSchemaMetadata metadata = retriever.getOrSetSchemaMetadata("bla", true, Schema.create(Schema.Type.STRING), -1); assertEquals(Integer.valueOf(10), metadata.getId()); assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); - assertEquals(2, server.getRequestCount()); + assertEquals(3, server.getRequestCount()); server.takeRequest(); RecordedRequest request = server.takeRequest(); assertEquals("{\"schema\":\"\\\"string\\\"\"}", request.getBody().readUtf8()); From 700750281a465cdd2bea0a8e943a45379f8cb016 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 09:25:36 +0200 Subject: [PATCH 06/12] Allow SchemaRetriever caches to be cleared --- .../producer/rest/SchemaRetriever.java | 31 +++++++++++++++++++ .../java/org/radarbase/util/TimedInt.java | 7 +++-- .../java/org/radarbase/util/TimedValue.java | 7 +++-- .../org/radarbase/util/TimedVariable.java | 5 +++ 4 files changed, 46 insertions(+), 4 deletions(-) create mode 100644 radar-commons/src/main/java/org/radarbase/util/TimedVariable.java diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index bec22ef3..a46697bf 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -18,7 +18,9 @@ import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,6 +33,7 @@ import org.radarbase.config.ServerConfig; import org.radarbase.util.TimedInt; import org.radarbase.util.TimedValue; +import org.radarbase.util.TimedVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,6 +240,34 @@ protected void cacheMetadata(ParsedSchemaMetadata metadata, String subject, bool idCache.put(metadata.getId(), new TimedValue<>(metadata.getSchema(), cacheValidity)); } + /** + * Remove expired entries from cache. + */ + public void pruneCache() { + prune(schemaCache); + prune(idCache); + for (ConcurrentMap versionMap : subjectVersionCache.values()) { + prune(versionMap); + } + } + + private void prune(Map map) { + for (Entry entry : map.entrySet()) { + if (entry.getValue().isExpired()) { + map.remove(entry.getKey(), entry.getValue()); + } + } + } + + /** + * Remove all entries from cache. + */ + public void clearCache() { + subjectVersionCache.clear(); + idCache.clear(); + schemaCache.clear(); + } + /** * Get the schema of a generic object. This supports null, primitive types, String, and * {@link org.apache.avro.generic.GenericContainer}. diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedInt.java b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java index 7bd0d0b7..108bbc54 100644 --- a/radar-commons/src/main/java/org/radarbase/util/TimedInt.java +++ b/radar-commons/src/main/java/org/radarbase/util/TimedInt.java @@ -1,6 +1,6 @@ package org.radarbase.util; -public class TimedInt { +public class TimedInt implements TimedVariable { public final int value; private final long expiry; @@ -9,6 +9,7 @@ public TimedInt(int value, long validity) { this.value = value; } + @Override public boolean isExpired() { return expiry < System.currentTimeMillis(); } @@ -21,7 +22,9 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return value == ((TimedInt)o).value; + TimedInt other = (TimedInt)o; + return value == other.value + && expiry == other.expiry; } @Override diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedValue.java b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java index 9f5e50f9..cf186055 100644 --- a/radar-commons/src/main/java/org/radarbase/util/TimedValue.java +++ b/radar-commons/src/main/java/org/radarbase/util/TimedValue.java @@ -2,7 +2,7 @@ import java.util.Objects; -public class TimedValue { +public class TimedValue implements TimedVariable { public final T value; private final long expiry; @@ -11,6 +11,7 @@ public TimedValue(T value, long validity) { this.value = Objects.requireNonNull(value); } + @Override public boolean isExpired() { return expiry < System.currentTimeMillis(); } @@ -23,7 +24,9 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) { return false; } - return value.equals(((TimedValue)o).value); + TimedValue other = (TimedValue)o; + return value.equals(other.value) + && expiry == other.expiry; } @Override diff --git a/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java b/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java new file mode 100644 index 00000000..c4affa99 --- /dev/null +++ b/radar-commons/src/main/java/org/radarbase/util/TimedVariable.java @@ -0,0 +1,5 @@ +package org.radarbase.util; + +public interface TimedVariable { + boolean isExpired(); +} From 44c7c3e3a908186801a92934dc364e2bfa65c721 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 09:39:47 +0200 Subject: [PATCH 07/12] Renaming and documenting some SchemaRetriever values --- .../producer/rest/SchemaRetriever.java | 84 ++++++++++--------- .../producer/rest/SchemaRetrieverTest.java | 6 +- 2 files changed, 48 insertions(+), 42 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index a46697bf..ec6754cd 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -100,36 +99,6 @@ public SchemaRetriever(ServerConfig config, long connectionTimeout, long cacheVa .build(), cacheValidity); } - - /** The subject in the Avro Schema Registry, given a Kafka topic. */ - protected static String subject(String topic, boolean ofValue) { - return topic + (ofValue ? "-value" : "-key"); - } - - /** Get schema metadata. Cached schema metadata will be used if present. */ - public ParsedSchemaMetadata getSchemaMetadata(String topic, boolean ofValue, int version) - throws JSONException, IOException { - String subject = subject(topic, ofValue); - ConcurrentMap versionMap = computeIfAbsent(subjectVersionCache, subject, - new ConcurrentHashMap<>()); - TimedInt id = versionMap.get(Math.max(version, 0)); - if (id == null || id.isExpired()) { - ParsedSchemaMetadata metadata = restClient.retrieveSchemaMetadata(subject, version); - cacheMetadata(metadata, subject, version <= 0); - return metadata; - } else { - Schema schema = getById(id.value); - ParsedSchemaMetadata metadata = getCachedMetadata(subject, id.value, version, schema); - - return metadata != null ? metadata : getMetadata(topic, ofValue, schema, version <= 0); - } - } - - private V computeIfAbsent(ConcurrentMap original, K key, V newValue) { - V existingValue = original.putIfAbsent(key, newValue); - return existingValue != null ? existingValue : newValue; - } - /** * Add schema metadata to the retriever. This implementation only adds it to the cache. * @return schema ID @@ -138,7 +107,7 @@ public int addSchema(String topic, boolean ofValue, Schema schema) throws JSONException, IOException { String subject = subject(topic, ofValue); int id = restClient.addSchema(subject, schema); - cacheMetadata(new ParsedSchemaMetadata(id, null, schema), subject, false); + cache(new ParsedSchemaMetadata(id, null, schema), subject, false); return id; } @@ -150,7 +119,7 @@ public int addSchema(String topic, boolean ofValue, Schema schema) public ParsedSchemaMetadata getOrSetSchemaMetadata(String topic, boolean ofValue, Schema schema, int version) throws JSONException, IOException { try { - return getSchemaMetadata(topic, ofValue, version); + return getBySubjectAndVersion(topic, ofValue, version); } catch (IOException ex) { logger.warn("Schema for {} value was not yet added to the schema registry.", topic); addSchema(topic, ofValue, schema); @@ -174,10 +143,29 @@ public ParsedSchemaMetadata getBySubjectAndId(String topic, boolean ofValue, int throws IOException { Schema schema = getById(id); String subject = subject(topic, ofValue); - ParsedSchemaMetadata metadata = getCachedMetadata(subject, id, null, schema); + ParsedSchemaMetadata metadata = getCachedVersion(subject, id, null, schema); return metadata != null ? metadata : getMetadata(topic, ofValue, schema); } + /** Get schema metadata. Cached schema metadata will be used if present. */ + public ParsedSchemaMetadata getBySubjectAndVersion(String topic, boolean ofValue, int version) + throws JSONException, IOException { + String subject = subject(topic, ofValue); + ConcurrentMap versionMap = computeIfAbsent(subjectVersionCache, subject, + new ConcurrentHashMap<>()); + TimedInt id = versionMap.get(Math.max(version, 0)); + if (id == null || id.isExpired()) { + ParsedSchemaMetadata metadata = restClient.retrieveSchemaMetadata(subject, version); + cache(metadata, subject, version <= 0); + return metadata; + } else { + Schema schema = getById(id.value); + ParsedSchemaMetadata metadata = getCachedVersion(subject, id.value, version, schema); + + return metadata != null ? metadata : getMetadata(topic, ofValue, schema, version <= 0); + } + } + /** Get all schema versions in a subject. */ public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema) throws IOException { @@ -185,26 +173,34 @@ public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema sc } - /** Get all schema versions in a subject. */ + /** Get the metadata of a specific schema in a topic. */ public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema schema, boolean ofLatestVersion) throws IOException { TimedInt id = schemaCache.get(schema); String subject = subject(topic, ofValue); if (id != null && !id.isExpired()) { - ParsedSchemaMetadata metadata = getCachedMetadata(subject, id.value, null, schema); + ParsedSchemaMetadata metadata = getCachedVersion(subject, id.value, null, schema); if (metadata != null) { return metadata; } } ParsedSchemaMetadata metadata = restClient.requestMetadata(subject, schema); - cacheMetadata(metadata, subject, ofLatestVersion); + cache(metadata, subject, ofLatestVersion); return metadata; } - protected ParsedSchemaMetadata getCachedMetadata(String subject, int id, + /** + * Get cached metadata. + * @param subject schema registry subject + * @param id schema ID. + * @param reportedVersion version requested by the client. Null if no version was requested. This version will be used if the actual version was not cached. + * @param schema schema to use. + * @return metadata if present. Returns null if no metadata is cached or if no version is cached and the reportedVersion is null. + */ + protected ParsedSchemaMetadata getCachedVersion(String subject, int id, Integer reportedVersion, Schema schema) { Integer version = reportedVersion; if (version == null || version <= 0) { @@ -225,7 +221,7 @@ protected ParsedSchemaMetadata getCachedMetadata(String subject, int id, return new ParsedSchemaMetadata(id, version, schema); } - protected void cacheMetadata(ParsedSchemaMetadata metadata, String subject, boolean latest) { + protected void cache(ParsedSchemaMetadata metadata, String subject, boolean latest) { TimedInt id = new TimedInt(metadata.getId(), cacheValidity); schemaCache.put(metadata.getSchema(), id); if (subject != null && metadata.getVersion() != null) { @@ -268,6 +264,16 @@ public void clearCache() { schemaCache.clear(); } + /** The subject in the Avro Schema Registry, given a Kafka topic. */ + protected static String subject(String topic, boolean ofValue) { + return topic + (ofValue ? "-value" : "-key"); + } + + private static V computeIfAbsent(ConcurrentMap original, K key, V newValue) { + V existingValue = original.putIfAbsent(key, newValue); + return existingValue != null ? existingValue : newValue; + } + /** * Get the schema of a generic object. This supports null, primitive types, String, and * {@link org.apache.avro.generic.GenericContainer}. diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java index 026a8cfb..2042611e 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java @@ -61,14 +61,14 @@ public void subject() { @Test public void getSchemaMetadata() throws Exception { server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.getSchemaMetadata("bla", true, 2); + ParsedSchemaMetadata metadata = retriever.getByVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata.getId()); assertEquals(Integer.valueOf(2), metadata.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); // Already queried schema is cached and does not need another request - ParsedSchemaMetadata metadata2 = retriever.getSchemaMetadata("bla", true, 2); + ParsedSchemaMetadata metadata2 = retriever.getByVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata2.getId()); assertEquals(Integer.valueOf(2), metadata2.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata2.getSchema()); @@ -77,7 +77,7 @@ public void getSchemaMetadata() throws Exception { // Not yet queried schema needs a new request, so if the server does not respond, an // IOException is thrown. server.enqueue(new MockResponse().setResponseCode(500)); - assertThrows(IOException.class, () -> retriever.getSchemaMetadata("bla", false, 2)); + assertThrows(IOException.class, () -> retriever.getByVersion("bla", false, 2)); } @Test From 8b695780e190fcb92edca6f1078db2c1dbc53b99 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 09:49:37 +0200 Subject: [PATCH 08/12] Fix code style --- .../producer/rest/SchemaRetriever.java | 76 ++++++------------- .../producer/rest/SchemaRetrieverTest.java | 6 +- 2 files changed, 25 insertions(+), 57 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java index ec6754cd..8b609397 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/SchemaRetriever.java @@ -17,7 +17,6 @@ package org.radarbase.producer.rest; import java.io.IOException; -import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -25,8 +24,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericContainer; import org.json.JSONException; import org.json.JSONObject; import org.radarbase.config.ServerConfig; @@ -42,20 +39,8 @@ */ public class SchemaRetriever { private static final Logger logger = LoggerFactory.getLogger(SchemaRetriever.class); - private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); - private static final Map, Schema> PRIMITIVE_SCHEMAS = new HashMap<>(); private static final long MAX_VALIDITY = 86400L; - static { - PRIMITIVE_SCHEMAS.put(Long.class, Schema.create(Type.LONG)); - PRIMITIVE_SCHEMAS.put(Integer.class, Schema.create(Type.INT)); - PRIMITIVE_SCHEMAS.put(Float.class, Schema.create(Type.FLOAT)); - PRIMITIVE_SCHEMAS.put(Double.class, Schema.create(Type.DOUBLE)); - PRIMITIVE_SCHEMAS.put(String.class, Schema.create(Type.STRING)); - PRIMITIVE_SCHEMAS.put(Boolean.class, Schema.create(Type.BOOLEAN)); - PRIMITIVE_SCHEMAS.put(byte[].class, Schema.create(Type.BYTES)); - } - private final ConcurrentMap> idCache = new ConcurrentHashMap<>(); private final ConcurrentMap schemaCache = new ConcurrentHashMap<>(); @@ -120,10 +105,14 @@ public ParsedSchemaMetadata getOrSetSchemaMetadata(String topic, boolean ofValue int version) throws JSONException, IOException { try { return getBySubjectAndVersion(topic, ofValue, version); - } catch (IOException ex) { - logger.warn("Schema for {} value was not yet added to the schema registry.", topic); - addSchema(topic, ofValue, schema); - return getMetadata(topic, ofValue, schema, version <= 0); + } catch (RestException ex) { + if (ex.getStatusCode() == 404) { + logger.warn("Schema for {} value was not yet added to the schema registry.", topic); + addSchema(topic, ofValue, schema); + return getMetadata(topic, ofValue, schema, version <= 0); + } else { + throw ex; + } } } @@ -161,7 +150,6 @@ public ParsedSchemaMetadata getBySubjectAndVersion(String topic, boolean ofValue } else { Schema schema = getById(id.value); ParsedSchemaMetadata metadata = getCachedVersion(subject, id.value, version, schema); - return metadata != null ? metadata : getMetadata(topic, ofValue, schema, version <= 0); } } @@ -196,9 +184,11 @@ public ParsedSchemaMetadata getMetadata(String topic, boolean ofValue, Schema sc * Get cached metadata. * @param subject schema registry subject * @param id schema ID. - * @param reportedVersion version requested by the client. Null if no version was requested. This version will be used if the actual version was not cached. + * @param reportedVersion version requested by the client. Null if no version was requested. + * This version will be used if the actual version was not cached. * @param schema schema to use. - * @return metadata if present. Returns null if no metadata is cached or if no version is cached and the reportedVersion is null. + * @return metadata if present. Returns null if no metadata is cached or if no version is cached + * and the reportedVersion is null. */ protected ParsedSchemaMetadata getCachedVersion(String subject, int id, Integer reportedVersion, Schema schema) { @@ -214,7 +204,7 @@ protected ParsedSchemaMetadata getCachedVersion(String subject, int id, } } } - if (version == null || version < 1) { + if (version == null || version <= 0) { return null; } } @@ -224,7 +214,7 @@ protected ParsedSchemaMetadata getCachedVersion(String subject, int id, protected void cache(ParsedSchemaMetadata metadata, String subject, boolean latest) { TimedInt id = new TimedInt(metadata.getId(), cacheValidity); schemaCache.put(metadata.getSchema(), id); - if (subject != null && metadata.getVersion() != null) { + if (metadata.getVersion() != null) { ConcurrentMap versionCache = computeIfAbsent(subjectVersionCache, subject, new ConcurrentHashMap<>()); @@ -247,14 +237,6 @@ public void pruneCache() { } } - private void prune(Map map) { - for (Entry entry : map.entrySet()) { - if (entry.getValue().isExpired()) { - map.remove(entry.getKey(), entry.getValue()); - } - } - } - /** * Remove all entries from cache. */ @@ -269,30 +251,16 @@ protected static String subject(String topic, boolean ofValue) { return topic + (ofValue ? "-value" : "-key"); } + private static void prune(Map map) { + for (Entry entry : map.entrySet()) { + if (entry.getValue().isExpired()) { + map.remove(entry.getKey(), entry.getValue()); + } + } + } + private static V computeIfAbsent(ConcurrentMap original, K key, V newValue) { V existingValue = original.putIfAbsent(key, newValue); return existingValue != null ? existingValue : newValue; } - - /** - * Get the schema of a generic object. This supports null, primitive types, String, and - * {@link org.apache.avro.generic.GenericContainer}. - * @param object object of recognized CONTENT_TYPE - * @throws IllegalArgumentException if passed object is not a recognized CONTENT_TYPE - */ - public static Schema getSchema(Object object) { - if (object == null) { - return NULL_SCHEMA; - } - Schema schema = PRIMITIVE_SCHEMAS.get(object.getClass()); - if (schema != null) { - return schema; - } - if (object instanceof GenericContainer) { - return ((GenericContainer)object).getSchema(); - } - throw new IllegalArgumentException("Passed object " + object + " of class " - + object.getClass() + " can not be schematized. " - + "Pass null, a primitive CONTENT_TYPE or a GenericContainer."); - } } diff --git a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java index 2042611e..d78f9211 100644 --- a/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java +++ b/radar-commons/src/test/java/org/radarbase/producer/rest/SchemaRetrieverTest.java @@ -61,14 +61,14 @@ public void subject() { @Test public void getSchemaMetadata() throws Exception { server.enqueue(new MockResponse().setBody("{\"id\":10,\"version\":2,\"schema\":\"\\\"string\\\"\"}")); - ParsedSchemaMetadata metadata = retriever.getByVersion("bla", true, 2); + ParsedSchemaMetadata metadata = retriever.getBySubjectAndVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata.getId()); assertEquals(Integer.valueOf(2), metadata.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata.getSchema()); assertEquals("/base/subjects/bla-value/versions/2", server.takeRequest().getPath()); // Already queried schema is cached and does not need another request - ParsedSchemaMetadata metadata2 = retriever.getByVersion("bla", true, 2); + ParsedSchemaMetadata metadata2 = retriever.getBySubjectAndVersion("bla", true, 2); assertEquals(Integer.valueOf(10), metadata2.getId()); assertEquals(Integer.valueOf(2), metadata2.getVersion()); assertEquals(Schema.create(Schema.Type.STRING), metadata2.getSchema()); @@ -77,7 +77,7 @@ public void getSchemaMetadata() throws Exception { // Not yet queried schema needs a new request, so if the server does not respond, an // IOException is thrown. server.enqueue(new MockResponse().setResponseCode(500)); - assertThrows(IOException.class, () -> retriever.getByVersion("bla", false, 2)); + assertThrows(IOException.class, () -> retriever.getBySubjectAndVersion("bla", false, 2)); } @Test From 6e1934e60eb5b076c0c50b2c8c146bdafd32d070 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 12:12:05 +0200 Subject: [PATCH 09/12] Fix IDEA warnings --- .../producer/rest/AvroDataMapperFactory.java | 11 +-- .../producer/rest/BinaryRecordRequest.java | 2 +- .../producer/rest/TopicRequestBody.java | 4 +- .../java/org/radarbase/topic/AvroTopic.java | 2 +- .../main/java/org/radarbase/util/Base64.java | 75 +++++++------------ .../java/org/radarbase/util/RestUtils.java | 8 +- .../main/java/org/radarbase/util/Strings.java | 3 +- .../java/org/radarbase/util/Base64Test.java | 25 +++++++ 8 files changed, 63 insertions(+), 67 deletions(-) create mode 100644 radar-commons/src/test/java/org/radarbase/util/Base64Test.java diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java index 6b046813..51b68772 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/AvroDataMapperFactory.java @@ -3,7 +3,6 @@ import static org.apache.avro.JsonProperties.NULL_VALUE; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -142,7 +141,7 @@ private static AvroDataMapper mapEnum(Schema from, final Schema to, Object defau throw new SchemaValidationException(to, from, new IllegalArgumentException( "Cannot map enum symbols without default value")); } else { - final GenericEnumSymbol symbol = new GenericData.EnumSymbol(to, defaultString); + GenericEnumSymbol symbol = new GenericData.EnumSymbol(to, defaultString); return obj -> { String value = obj.toString(); if (to.hasEnumSymbol(value)) { @@ -304,7 +303,7 @@ private AvroDataMapper mapArray(Schema from, Schema to) final AvroDataMapper subMapper = createMapper(from.getElementType(), to.getElementType(), null); return obj -> { - List array = (List) obj; + List array = (List) obj; List toArray = new ArrayList<>(array.size()); for (Object val : array) { toArray.add(subMapper.convertAvro(val)); @@ -356,11 +355,9 @@ private AvroDataMapper mapBytes(Schema from, final Schema to, final Object defau } else if (to.getType() == Type.STRING) { final Encoder encoder = Base64.getEncoder(); if (from.getType() == Type.FIXED) { - return object -> new String(encoder.encode(((Fixed) object).bytes()), - StandardCharsets.UTF_8); + return object -> encoder.encode(((Fixed) object).bytes()); } else { - return object -> new String(encoder.encode(((ByteBuffer) object).array()), - StandardCharsets.UTF_8); + return object -> encoder.encode(((ByteBuffer) object).array()); } } else { throw new SchemaValidationException(to, from, diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java index 89b53ccc..4fe2a5c1 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/BinaryRecordRequest.java @@ -41,7 +41,7 @@ public class BinaryRecordRequest implements RecordRequest { private RecordData records; private BinaryEncoder binaryEncoder; private final AvroWriter valueEncoder; - private int sourceIdPos; + private final int sourceIdPos; /** * Binary record request for given topic. diff --git a/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java b/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java index 43ed4fcd..353afb9c 100644 --- a/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java +++ b/radar-commons/src/main/java/org/radarbase/producer/rest/TopicRequestBody.java @@ -26,10 +26,10 @@ * TopicRequestData in a RequestBody. */ class TopicRequestBody extends RequestBody { - protected final RecordRequest data; + protected final RecordRequest data; private final MediaType mediaType; - TopicRequestBody(RecordRequest requestData, MediaType mediaType) { + TopicRequestBody(RecordRequest requestData, MediaType mediaType) { this.data = requestData; this.mediaType = mediaType; } diff --git a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java index 351098fa..2b63875e 100644 --- a/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java +++ b/radar-commons/src/main/java/org/radarbase/topic/AvroTopic.java @@ -145,7 +145,7 @@ public boolean equals(Object o) { return false; } - AvroTopic topic = (AvroTopic) o; + AvroTopic topic = (AvroTopic) o; return keyClass == topic.getKeyClass() && valueClass == topic.getValueClass(); } diff --git a/radar-commons/src/main/java/org/radarbase/util/Base64.java b/radar-commons/src/main/java/org/radarbase/util/Base64.java index 60e6df86..be1a87f5 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Base64.java +++ b/radar-commons/src/main/java/org/radarbase/util/Base64.java @@ -25,8 +25,6 @@ package org.radarbase.util; -import java.util.Arrays; - /** * This class consists exclusively of static methods for obtaining * encoders and decoders for the Base64 encoding scheme. The @@ -84,7 +82,7 @@ public static class Encoder { * index values into their "Base64 Alphabet" equivalents as specified * in "Table 1: The Base64 Alphabet" of RFC 2045 (and RFC 4648). */ - private static final byte[] BASE_64_BYTE = { + private static final char[] BASE_64_CHAR = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', @@ -97,10 +95,6 @@ public static class Encoder { private Encoder() { } - private int outLength(int srclen) { - return 4 * ((srclen + 2) / 3); - } - /** * Encodes all bytes from the specified byte array into a newly-allocated * byte array using the {@link Base64} encoding scheme. The returned byte @@ -111,51 +105,36 @@ private int outLength(int srclen) { * @return A newly-allocated byte array containing the resulting * encoded bytes. */ - public byte[] encode(byte[] src) { - int len = outLength(src.length); // dst array size - byte[] dst = new byte[len]; - int ret = encode0(src, src.length, dst); - if (ret != dst.length) { - return Arrays.copyOf(dst, ret); - } - return dst; - } - - private int encode0(byte[] src, int end, byte[] dst) { - int sp = 0; - int slen = end / 3 * 3; - int dp = 0; - while (sp < slen) { - int sl0 = Math.min(sp + slen, slen); - int dp0 = dp; - for (int sp0 = sp; sp0 < sl0; sp0 += 3) { - int bits = (src[sp0] & 0xff) << 16 - | (src[sp0 + 1] & 0xff) << 8 - | (src[sp0 + 2] & 0xff); - dst[dp0++] = BASE_64_BYTE[(bits >>> 18) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[(bits >>> 12) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[(bits >>> 6) & 0x3f]; - dst[dp0++] = BASE_64_BYTE[bits & 0x3f]; - } - int dlen = (sl0 - sp) / 3 * 4; - dp += dlen; - sp = sl0; + public String encode(byte[] src) { + int dstLen = 4 * ((src.length + 2) / 3); + char[] dst = new char[dstLen]; + int srcLen = src.length / 3 * 3; + int dstP = 0; + for (int srcP = 0; srcP < srcLen; srcP += 3) { + int bits = (src[srcP] & 0xff) << 16 + | (src[srcP + 1] & 0xff) << 8 + | (src[srcP + 2] & 0xff); + dst[dstP++] = BASE_64_CHAR[(bits >>> 18) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[(bits >>> 12) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[(bits >>> 6) & 0x3f]; + dst[dstP++] = BASE_64_CHAR[bits & 0x3f]; } - if (sp < end) { // 1 or 2 leftover bytes - int b0 = src[sp++] & 0xff; - dst[dp++] = BASE_64_BYTE[b0 >> 2]; - if (sp == end) { - dst[dp++] = BASE_64_BYTE[(b0 << 4) & 0x3f]; - dst[dp++] = '='; - dst[dp++] = '='; + if (srcLen < src.length) { // 1 or 2 leftover bytes + int srcP = srcLen; + int b0 = src[srcP++] & 0xff; + dst[dstP++] = BASE_64_CHAR[b0 >> 2]; + if (srcP == src.length) { + dst[dstP++] = BASE_64_CHAR[(b0 << 4) & 0x3f]; + dst[dstP++] = '='; } else { - int b1 = src[sp] & 0xff; - dst[dp++] = BASE_64_BYTE[(b0 << 4) & 0x3f | (b1 >> 4)]; - dst[dp++] = BASE_64_BYTE[(b1 << 2) & 0x3f]; - dst[dp++] = '='; + int b1 = src[srcP] & 0xff; + dst[dstP++] = BASE_64_CHAR[(b0 << 4) & 0x3f | (b1 >> 4)]; + dst[dstP++] = BASE_64_CHAR[(b1 << 2) & 0x3f]; } + dst[dstP] = '='; } - return dp; + + return new String(dst); } } } diff --git a/radar-commons/src/main/java/org/radarbase/util/RestUtils.java b/radar-commons/src/main/java/org/radarbase/util/RestUtils.java index 71b63702..6b90181a 100644 --- a/radar-commons/src/main/java/org/radarbase/util/RestUtils.java +++ b/radar-commons/src/main/java/org/radarbase/util/RestUtils.java @@ -23,7 +23,6 @@ import java.util.Arrays; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; @@ -40,12 +39,7 @@ public final class RestUtils { /** OkHttp3 default hostname verifier. */ public static final HostnameVerifier DEFAULT_HOSTNAME_VERIFIER = OkHostnameVerifier.INSTANCE; /** OkHttp3 hostname verifier for unsafe connections. */ - public static final HostnameVerifier UNSAFE_HOSTNAME_VERIFIER = new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; + public static final HostnameVerifier UNSAFE_HOSTNAME_VERIFIER = (hostname, session) -> true; /** Unsafe OkHttp3 trust manager that trusts all certificates. */ public static final TrustManager[] UNSAFE_TRUST_MANAGER = { diff --git a/radar-commons/src/main/java/org/radarbase/util/Strings.java b/radar-commons/src/main/java/org/radarbase/util/Strings.java index 72229cfe..e613eb8f 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Strings.java +++ b/radar-commons/src/main/java/org/radarbase/util/Strings.java @@ -17,6 +17,7 @@ package org.radarbase.util; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Iterator; import java.util.regex.Pattern; @@ -26,7 +27,7 @@ */ @SuppressWarnings("PMD.ClassNamingConventions") public final class Strings { - private static final Charset UTF_8 = Charset.forName("UTF-8"); + private static final Charset UTF_8 = StandardCharsets.UTF_8; private static final char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray(); private Strings() { diff --git a/radar-commons/src/test/java/org/radarbase/util/Base64Test.java b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java new file mode 100644 index 00000000..cf0bfb0f --- /dev/null +++ b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java @@ -0,0 +1,25 @@ +package org.radarbase.util; + +import static org.junit.Assert.*; + +import java.util.concurrent.ThreadLocalRandom; +import kotlin.text.Charsets; +import org.junit.Test; +import org.radarbase.util.Base64.Encoder; + +public class Base64Test { + @Test + public void encoderTest() { + Encoder encoder = Base64.getEncoder(); + java.util.Base64.Encoder javaEncoder = java.util.Base64.getEncoder(); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = 0; i < 2_000; i++) { + byte[] src = new byte[i]; + random.nextBytes(src); + String actual = encoder.encode(src); + String expected = new String(javaEncoder.encode(src), Charsets.UTF_8); + assertEquals(expected, actual); + } + } +} From ba5dbabcfe34e72373c4618cf42f01265ad7c114 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 14:09:19 +0200 Subject: [PATCH 10/12] Small speed increase Base64 computation --- .../src/main/java/org/radarbase/util/Base64.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/util/Base64.java b/radar-commons/src/main/java/org/radarbase/util/Base64.java index be1a87f5..7f0945a9 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Base64.java +++ b/radar-commons/src/main/java/org/radarbase/util/Base64.java @@ -82,7 +82,7 @@ public static class Encoder { * index values into their "Base64 Alphabet" equivalents as specified * in "Table 1: The Base64 Alphabet" of RFC 2045 (and RFC 4648). */ - private static final char[] BASE_64_CHAR = { + private static final byte[] BASE_64_CHAR = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', @@ -106,11 +106,12 @@ private Encoder() { * encoded bytes. */ public String encode(byte[] src) { - int dstLen = 4 * ((src.length + 2) / 3); - char[] dst = new char[dstLen]; - int srcLen = src.length / 3 * 3; + int srcLen = src.length; + byte[] dst = new byte[4 * ((srcLen + 2) / 3)]; + int fullDataLen = srcLen / 3 * 3; int dstP = 0; - for (int srcP = 0; srcP < srcLen; srcP += 3) { + int srcP = 0; + for (; srcP < fullDataLen; srcP += 3) { int bits = (src[srcP] & 0xff) << 16 | (src[srcP + 1] & 0xff) << 8 | (src[srcP + 2] & 0xff); @@ -119,11 +120,10 @@ public String encode(byte[] src) { dst[dstP++] = BASE_64_CHAR[(bits >>> 6) & 0x3f]; dst[dstP++] = BASE_64_CHAR[bits & 0x3f]; } - if (srcLen < src.length) { // 1 or 2 leftover bytes - int srcP = srcLen; + if (srcP < srcLen) { // 1 or 2 leftover bytes int b0 = src[srcP++] & 0xff; dst[dstP++] = BASE_64_CHAR[b0 >> 2]; - if (srcP == src.length) { + if (srcP == srcLen) { dst[dstP++] = BASE_64_CHAR[(b0 << 4) & 0x3f]; dst[dstP++] = '='; } else { From 70e51d259f3d83342cfb72137aa67dc2ebd74881 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 08:57:36 +0200 Subject: [PATCH 11/12] Bump version --- README.md | 10 +++++----- build.gradle | 2 +- .../serializers/AbstractKafkaAvroDeserializer.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 6521172b..ff5813e7 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } dependencies { - implementation group: 'org.radarbase', name: 'radar-commons', version: '0.12.3' + implementation group: 'org.radarbase', name: 'radar-commons', version: '0.13.0' } ``` @@ -69,7 +69,7 @@ repositories { } dependencies { - implementation group: 'org.radarbase', name: 'radar-commons-server', version: '0.12.3' + implementation group: 'org.radarbase', name: 'radar-commons-server', version: '0.13.0' } ``` @@ -83,7 +83,7 @@ repositories { } dependencies { - testImplementation group: 'org.radarbase', name: 'radar-commons-testing', version: '0.12.3' + testImplementation group: 'org.radarbase', name: 'radar-commons-testing', version: '0.13.0' } ``` @@ -96,7 +96,7 @@ repositories { } dependencies { - runtimeOnly group: 'org.radarbase', name: 'radar-commons-unsafe', version: '0.12.3' + runtimeOnly group: 'org.radarbase', name: 'radar-commons-unsafe', version: '0.13.0' } ``` @@ -121,7 +121,7 @@ configurations.all { } dependencies { - compile group: 'org.radarbase', name: 'radar-commons', version: '0.12.4-SNAPSHOT' + compile group: 'org.radarbase', name: 'radar-commons', version: '0.13.1-SNAPSHOT' } ``` diff --git a/build.gradle b/build.gradle index ae0034e5..58b8a60b 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ subprojects { // Configuration // //---------------------------------------------------------------------------// - version = '0.12.4-SNAPSHOT' + version = '0.13.0' group = 'org.radarbase' ext.githubRepoName = 'RADAR-base/radar-commons' diff --git a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java index 6f847a60..4a74187c 100644 --- a/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java +++ b/radar-commons-unsafe/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java @@ -188,7 +188,7 @@ protected DatumReader getDatumReader(Schema writerSchema, Schema readerSchema } /** - * Normalizes the reader schema, puts the resolved schema into the cache. + * Normalizes the reader schema, puts the resolved schema into the cache. *
  • *
      if the reader schema is provided, use the provided one
    *
      if the reader schema is cached for the writer schema full name, use the cached value
    From 8136c94b8ef619ca81845f8d1c7b8544aa40c282 Mon Sep 17 00:00:00 2001 From: Joris Borgdorff Date: Tue, 23 Jun 2020 13:40:05 +0200 Subject: [PATCH 12/12] Slightly faster Base64 implementation --- radar-commons/src/main/java/org/radarbase/util/Base64.java | 4 +++- .../src/test/java/org/radarbase/util/Base64Test.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/radar-commons/src/main/java/org/radarbase/util/Base64.java b/radar-commons/src/main/java/org/radarbase/util/Base64.java index 7f0945a9..62d630b1 100644 --- a/radar-commons/src/main/java/org/radarbase/util/Base64.java +++ b/radar-commons/src/main/java/org/radarbase/util/Base64.java @@ -25,6 +25,8 @@ package org.radarbase.util; +import static java.nio.charset.StandardCharsets.UTF_8; // Since Android API 19 + /** * This class consists exclusively of static methods for obtaining * encoders and decoders for the Base64 encoding scheme. The @@ -134,7 +136,7 @@ public String encode(byte[] src) { dst[dstP] = '='; } - return new String(dst); + return new String(dst, UTF_8); } } } diff --git a/radar-commons/src/test/java/org/radarbase/util/Base64Test.java b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java index cf0bfb0f..a22f6bf9 100644 --- a/radar-commons/src/test/java/org/radarbase/util/Base64Test.java +++ b/radar-commons/src/test/java/org/radarbase/util/Base64Test.java @@ -14,7 +14,7 @@ public void encoderTest() { java.util.Base64.Encoder javaEncoder = java.util.Base64.getEncoder(); ThreadLocalRandom random = ThreadLocalRandom.current(); - for (int i = 0; i < 2_000; i++) { + for (int i = 0; i < 2_000; i += 7) { byte[] src = new byte[i]; random.nextBytes(src); String actual = encoder.encode(src);