Skip to content

Commit

Permalink
refactor: improve error msg when header types differ
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Feb 26, 2024
1 parent 940448c commit f0b94ea
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private Schema headersSchema(final SinkRecord record) {
headerSchema = h.schema();
} else if (headerSchema.type() != h.schema().type()) {
throw new DataException("Header schema " + h.schema()
+ " is not the same as " + headerSchema);
+ " for '" + h.key() + "' "
+ "is not the same as the already defined map type: " + headerSchema + ". "
+ "To force the same type, consider using StringConverter or similar.");
}
}
return SchemaBuilder.map().values(avroData.fromConnectSchema(headerSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,26 @@ void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaB
}

@ParameterizedTest
@MethodSource("multipleFieldsWithoutHeadersTestParameters")
@MethodSource("schemaBuilders")
void testBuildAivenCustomSchemaForMultipleFieldsWithDiffTypes(final SinkSchemaBuilder schemaBuilder) {
final Headers headers = new ConnectHeaders();
headers.add("a", "b", Schema.STRING_SCHEMA);
headers.add("c", 1, Schema.INT32_SCHEMA);
final var sinkRecord =
new SinkRecord(
"some-topic", 1,
Schema.STRING_SCHEMA, "some-key",
Schema.STRING_SCHEMA, "some-value",
100L, 1000L,
TimestampType.CREATE_TIME, headers);

assertThatThrownBy(() -> schemaBuilder.buildSchema(sinkRecord))
.isInstanceOf(DataException.class)
.hasMessage("Header schema Schema{INT32} for 'c' is not the same as the already defined map type: Schema{STRING}. "
+ "To force the same type, consider using StringConverter or similar.");
}

@ParameterizedTest
@MethodSource("schemaBuilders")
void testBuildSchemaForMultipleFieldsWithoutHeaders(final SinkSchemaBuilder schemaBuilder) {
final var sinkRecord =
Expand Down

0 comments on commit f0b94ea

Please sign in to comment.