From f0b94ea74b1466c2421c15e7a4293f5155ba9a96 Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Mon, 26 Feb 2024 11:28:46 -0500 Subject: [PATCH] refactor: improve error msg when header types differ --- .../common/output/SinkSchemaBuilder.java | 4 +++- .../common/output/SchemaBuilderTest.java | 21 ++++++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java b/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java index d971d9bfe..2ae9dd4e4 100644 --- a/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java +++ b/src/main/java/io/aiven/kafka/connect/common/output/SinkSchemaBuilder.java @@ -147,7 +147,9 @@ private Schema headersSchema(final SinkRecord record) { headerSchema = h.schema(); } else if (headerSchema.type() != h.schema().type()) { throw new DataException("Header schema " + h.schema() - + " is not the same as " + headerSchema); + + " for '" + h.key() + "' " + + "is not the same as the already defined map type: " + headerSchema + ". " + + "To force the same type, consider using StringConverter or similar."); } } return SchemaBuilder.map().values(avroData.fromConnectSchema(headerSchema)); diff --git a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java index c7f74c028..cee4178a0 100644 --- a/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java +++ b/src/test/java/io/aiven/kafka/connect/common/output/SchemaBuilderTest.java @@ -288,7 +288,26 @@ void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaB } @ParameterizedTest - @MethodSource("multipleFieldsWithoutHeadersTestParameters") + @MethodSource("schemaBuilders") + void testBuildAivenCustomSchemaForMultipleFieldsWithDiffTypes(final SinkSchemaBuilder schemaBuilder) { + final Headers headers = new ConnectHeaders(); + headers.add("a", "b", Schema.STRING_SCHEMA); + headers.add("c", 1, Schema.INT32_SCHEMA); + final var sinkRecord = + new SinkRecord( + "some-topic", 1, + Schema.STRING_SCHEMA, "some-key", + Schema.STRING_SCHEMA, "some-value", + 100L, 1000L, + TimestampType.CREATE_TIME, headers); + + assertThatThrownBy(() -> schemaBuilder.buildSchema(sinkRecord)) + .isInstanceOf(DataException.class) + .hasMessage("Header schema Schema{INT32} for 'c' is not the same as the already defined map type: Schema{STRING}. " + + "To force the same type, consider using StringConverter or similar."); + } + + @ParameterizedTest @MethodSource("schemaBuilders") void testBuildSchemaForMultipleFieldsWithoutHeaders(final SinkSchemaBuilder schemaBuilder) { final var sinkRecord =