From 940448c5c39851e36e7aab1b3fdf85839e152d13 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 26 Feb 2024 11:28:01 -0500 Subject: [PATCH 1/2] refactor: rename params source --- .../aiven/kafka/connect/common/output/SchemaBuilderTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java index 8197db5a0..c7f74c028 100644 --- a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java @@ -238,7 +238,7 @@ void testSchemaForRecordValueSimpleType(final SinkSchemaBuilder schemaBuilder) { assertThat(avroSchema.getField("value").schema().getType()).isEqualTo(Type.STRING); } - private static Stream multipleFieldsWithoutHeadersTestParameters() { + private static Stream schemaBuilders() { final var fields = List.of( new OutputField(OutputFieldType.KEY, OutputFieldEncodingType.NONE), new OutputField(OutputFieldType.VALUE, OutputFieldEncodingType.NONE), @@ -255,7 +255,7 @@ private static Stream multipleFieldsWithoutHeadersTestParameters() { } @ParameterizedTest - @MethodSource("multipleFieldsWithoutHeadersTestParameters") + @MethodSource("schemaBuilders") void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaBuilder) { final Headers headers = new ConnectHeaders(); headers.add("a", "b", Schema.STRING_SCHEMA); @@ -289,6 +289,7 @@ void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaB @ParameterizedTest @MethodSource("multipleFieldsWithoutHeadersTestParameters") + @MethodSource("schemaBuilders") void testBuildSchemaForMultipleFieldsWithoutHeaders(final SinkSchemaBuilder schemaBuilder) { final var sinkRecord = new SinkRecord( From 91debd1dec50b0d4d5d447d1c1960ba27c895377 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 26 Feb 2024 11:28:46 -0500 Subject: [PATCH 2/2] refactor: improve error msg when header types differ --- .../common/output/SinkSchemaBuilder.java | 4 +++- .../common/output/SchemaBuilderTest.java | 22 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java b/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java index d971d9bfe..2ae9dd4e4 100644 --- a/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java +++ b/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java @@ -147,7 +147,9 @@ private Schema headersSchema(final SinkRecord record) { headerSchema = h.schema(); } else if (headerSchema.type() != h.schema().type()) { throw new DataException("Header schema " + h.schema() - + " is not the same as " + headerSchema); + + " for '" + h.key() + "' " + + "is not the same as the already defined map type: " + headerSchema + ". " + + "To force the same type, consider using StringConverter or similar."); } } return SchemaBuilder.map().values(avroData.fromConnectSchema(headerSchema)); diff --git a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java index c7f74c028..a55069579 100644 --- a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java @@ -288,7 +288,27 @@ void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaB } @ParameterizedTest - @MethodSource("multipleFieldsWithoutHeadersTestParameters") + @MethodSource("schemaBuilders") + void testBuildAivenCustomSchemaForMultipleFieldsWithDiffTypes(final SinkSchemaBuilder schemaBuilder) { + final Headers headers = new ConnectHeaders(); + headers.add("a", "b", Schema.STRING_SCHEMA); + headers.add("c", 1, Schema.INT32_SCHEMA); + final var sinkRecord = + new SinkRecord( + "some-topic", 1, + Schema.STRING_SCHEMA, "some-key", + Schema.STRING_SCHEMA, "some-value", + 100L, 1000L, + TimestampType.CREATE_TIME, headers); + + assertThatThrownBy(() -> schemaBuilder.buildSchema(sinkRecord)) + .isInstanceOf(DataException.class) + .hasMessage("Header schema Schema{INT32} for 'c' " + + "is not the same as the already defined map type: Schema{STRING}. " + + "To force the same type, consider using StringConverter or similar."); + } + + @ParameterizedTest @MethodSource("schemaBuilders") void testBuildSchemaForMultipleFieldsWithoutHeaders(final SinkSchemaBuilder schemaBuilder) { final var sinkRecord =