Skip to content

Commit

Permalink
Merge pull request #226 from Aiven-Open/jeqo/test-header-diff-type
Browse files Browse the repository at this point in the history
refactor: improve error message when header types differ
  • Loading branch information
tvainika authored Feb 27, 2024
2 parents 2fde8c3 + 91debd1 commit 6289287
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private Schema headersSchema(final SinkRecord record) {
headerSchema = h.schema();
} else if (headerSchema.type() != h.schema().type()) {
throw new DataException("Header schema " + h.schema()
+ " is not the same as " + headerSchema);
+ " for '" + h.key() + "' "
+ "is not the same as the already defined map type: " + headerSchema + ". "
+ "To force the same type, consider using StringConverter or similar.");
}
}
return SchemaBuilder.map().values(avroData.fromConnectSchema(headerSchema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ void testSchemaForRecordValueSimpleType(final SinkSchemaBuilder schemaBuilder) {
assertThat(avroSchema.getField("value").schema().getType()).isEqualTo(Type.STRING);
}

private static Stream<Arguments> multipleFieldsWithoutHeadersTestParameters() {
private static Stream<Arguments> schemaBuilders() {
final var fields = List.of(
new OutputField(OutputFieldType.KEY, OutputFieldEncodingType.NONE),
new OutputField(OutputFieldType.VALUE, OutputFieldEncodingType.NONE),
Expand All @@ -255,7 +255,7 @@ private static Stream<Arguments> multipleFieldsWithoutHeadersTestParameters() {
}

@ParameterizedTest
@MethodSource("multipleFieldsWithoutHeadersTestParameters")
@MethodSource("schemaBuilders")
void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaBuilder) {
final Headers headers = new ConnectHeaders();
headers.add("a", "b", Schema.STRING_SCHEMA);
Expand Down Expand Up @@ -288,7 +288,28 @@ void testBuildAivenCustomSchemaForMultipleFields(final SinkSchemaBuilder schemaB
}

@ParameterizedTest
@MethodSource("multipleFieldsWithoutHeadersTestParameters")
@MethodSource("schemaBuilders")
void testBuildAivenCustomSchemaForMultipleFieldsWithDiffTypes(final SinkSchemaBuilder schemaBuilder) {
final Headers headers = new ConnectHeaders();
headers.add("a", "b", Schema.STRING_SCHEMA);
headers.add("c", 1, Schema.INT32_SCHEMA);
final var sinkRecord =
new SinkRecord(
"some-topic", 1,
Schema.STRING_SCHEMA, "some-key",
Schema.STRING_SCHEMA, "some-value",
100L, 1000L,
TimestampType.CREATE_TIME, headers);

assertThatThrownBy(() -> schemaBuilder.buildSchema(sinkRecord))
.isInstanceOf(DataException.class)
.hasMessage("Header schema Schema{INT32} for 'c' "
+ "is not the same as the already defined map type: Schema{STRING}. "
+ "To force the same type, consider using StringConverter or similar.");
}

@ParameterizedTest
@MethodSource("schemaBuilders")
void testBuildSchemaForMultipleFieldsWithoutHeaders(final SinkSchemaBuilder schemaBuilder) {
final var sinkRecord =
new SinkRecord(
Expand Down

0 comments on commit 6289287

Please sign in to comment.