Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <aindriu.lavelle@aiven.io>
  • Loading branch information
aindriu-aiven committed Jan 27, 2025
1 parent cb84ae7 commit 197ae5f
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,8 @@ public class OffsetManager<E extends OffsetManager.OffsetManagerEntry<E>> {
* the context for this instance to use.
*/
public OffsetManager(final SourceTaskContext context) {
this(context, new ConcurrentHashMap<>());
}

/**
* Package private for testing.
*
* @param context
* the context for this instance to use.
* @param offsets
* the offsets
*/
protected OffsetManager(final SourceTaskContext context,
final ConcurrentMap<Map<String, Object>, Map<String, Object>> offsets) {
this.context = context;
this.offsets = offsets;
this.offsets = new ConcurrentHashMap<>();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ static List<String> consumeByteMessages(final String topic, final int expectedMe
String bootstrapServers) {
final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class,
ByteArrayDeserializer.class);
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(90),
final List<byte[]> objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(120),
consumerProperties);
return objects.stream().map(String::new).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ public void setValueData(final SchemaAndValue valueData) {
}

public String getTopic() {
return context.getTopic().isPresent() ? context.getTopic().get() : null;
return context.getTopic().orElse(null);
}

public Integer getPartition() {
return context.getPartition().isPresent() ? context.getPartition().get() : null;
return context.getPartition().orElse(null);
}

public String getObjectKey() {
Expand Down

0 comments on commit 197ae5f

Please sign in to comment.