Skip to content

Commit

Permalink
removed unused methods
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Jan 10, 2025
1 parent 3e948ba commit 81d46a4
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private boolean tryAdd(final List<SourceRecord> results, final Iterator<SourceRe
*
* @return {@code true} if the connector is not stopped and the timer has not expired.
*/
protected boolean stillPolling() {
protected final boolean stillPolling() {
final boolean result = !connectorStopped.get() && !timer.isExpired();
logger.debug("Still polling: {}", result);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,6 @@ default String getString(final String key) {
*/
int getPartition();

/**
* Gets the number of records to skip to get to this record. This is the same as the zero-based index of this
* record if all records were in an array.
*
* @return The number of records to skip to get to this record.
*/
default long skipRecords() {
return 0;
}

/**
* Increments the record count.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public AbstractSourceTask.AbortTrigger getAbortTrigger() {
assertThat(backoff.estimatedDelay()).isEqualTo(expected);
backoff.delay();
expected *= 2;
assertThat(abortTrigger).isFalse();
}
assertThat(backoff.estimatedDelay()).isEqualTo(maxDelay);
assertThat(abortTrigger).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
Expand All @@ -34,15 +33,14 @@

final class OffsetManagerTest {

private SourceTaskContext sourceTaskContext;

private OffsetStorageReader offsetStorageReader;

private OffsetManager<TestingOffsetManagerEntry> offsetManager;

@BeforeEach
void setup() {
offsetStorageReader = mock(OffsetStorageReader.class);
sourceTaskContext = mock(SourceTaskContext.class);
final SourceTaskContext sourceTaskContext = mock(SourceTaskContext.class);
when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader);
offsetManager = new OffsetManager<>(sourceTaskContext);
}
Expand Down Expand Up @@ -76,54 +74,6 @@ void testNewEntryWithoutDataFromContext() {
assertThat(result).isNotPresent();
}

@Test
void testUpdateCurrentEntry() {
final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "thing");

final ConcurrentHashMap<Map<String, Object>, Map<String, Object>> offsets = new ConcurrentHashMap<>();
offsets.put(offsetEntry.getManagerKey().getPartitionMap(), offsetEntry.getProperties());

offsetManager = new OffsetManager<>(sourceTaskContext, offsets);
offsetEntry.setProperty("MyProperty", "WOW");

offsetManager.updateCurrentOffsets(offsetEntry);

final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(offsetEntry.getManagerKey(),
TestingOffsetManagerEntry::new);
assertThat(result).isPresent();
assertThat(result.get().getProperty("MyProperty")).isEqualTo("WOW");
assertThat(result.get().getProperties()).isEqualTo(offsetEntry.getProperties());
}

@Test
void testUpdateNonExistentEntry() {
final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "0");
offsetEntry.setProperty("Random-property", "random value");
offsetManager.updateCurrentOffsets(offsetEntry);

final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(offsetEntry.getManagerKey(),
offsetEntry::fromProperties);
assertThat(result).isPresent();
assertThat(result.get().getProperties()).isEqualTo(offsetEntry.getProperties());
}

@Test
void updateCurrentOffsetsDataNotLost() {
final TestingOffsetManagerEntry offsetEntry = new TestingOffsetManagerEntry("bucket", "topic1", "0");
offsetEntry.setProperty("test", "WOW");
offsetManager.updateCurrentOffsets(offsetEntry);

final TestingOffsetManagerEntry offsetEntry2 = new TestingOffsetManagerEntry("bucket", "topic1", "0");
offsetEntry2.setProperty("test2", "a thing");
offsetManager.updateCurrentOffsets(offsetEntry2);

final Optional<TestingOffsetManagerEntry> result = offsetManager.getEntry(offsetEntry.getManagerKey(),
offsetEntry::fromProperties);
assertThat(result).isPresent();
assertThat(result.get().getProperty("test")).isEqualTo("WOW");
assertThat(result.get().getProperty("test2")).isEqualTo("a thing");
}

@SuppressWarnings("PMD.TestClassWithoutTestCases") // TODO figure out why this fails.
public static class TestingOffsetManagerEntry // NOPMD the above suppress warnings does not work.
implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ static Map<String, Object> consumeOffsetMessages(KafkaConsumer<byte[], byte[]> c
});
final List<Object> key = OBJECT_MAPPER.readValue(record.key(), new TypeReference<>() { // NOPMD
});
// key.get(0) will return the name of the connector the commit is from.
final Map<String, Object> keyDetails = (Map<String, Object>) key.get(1);
messages.put((String) keyDetails.get(OBJECT_KEY), offsetRec.get(RECORD_COUNT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ public boolean hasNext() {
public SourceRecord next() {
final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next();
final S3OffsetManagerEntry entry = s3SourceRecord.getOffsetManagerEntry();
offsetManager.updateCurrentOffsets(entry);
return RecordProcessor.createSourceRecord(s3SourceRecord, s3SourceConfig, awsv2SourceClient, entry);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void setUp() {
sourceTaskContext = mock(SourceTaskContext.class);
when(sourceTaskContext.offsetStorageReader()).thenReturn(offsetStorageReader);
offsetManager = new OffsetManager<>(sourceTaskContext);

}

private Map<String, Object> createPartitionMap() {
Expand Down Expand Up @@ -104,31 +105,6 @@ void testGetEntry() {
verify(sourceTaskContext, times(1)).offsetStorageReader();
}

@Test
void testUpdate() {
final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION);
assertThat(entry.getRecordCount()).isEqualTo(0L);
assertThat(entry.getProperty("random_entry")).isNull();

offsetManager.updateCurrentOffsets(entry);

entry.setProperty("random_entry", 5L);
entry.incrementRecordCount();
assertThat(entry.getRecordCount()).isEqualTo(1L);

offsetManager.updateCurrentOffsets(entry);

final Optional<S3OffsetManagerEntry> entry2 = offsetManager.getEntry(entry.getManagerKey(),
entry::fromProperties);
assertThat(entry2).isPresent();
assertThat(entry2.get().getPartition()).isEqualTo(PARTITION);
assertThat(entry2.get().getRecordCount()).isEqualTo(1L);
assertThat(entry2.get().getTopic()).isEqualTo(TOPIC);
assertThat(entry2.get().getBucket()).isEqualTo(TEST_BUCKET);
assertThat(entry2.get().getProperty("random_entry")).isEqualTo(5L);
verify(sourceTaskContext, times(0)).offsetStorageReader();
}

@Test
void testFromProperties() {
final S3OffsetManagerEntry entry = new S3OffsetManagerEntry(TEST_BUCKET, OBJECT_KEY, TOPIC, PARTITION);
Expand Down

0 comments on commit 81d46a4

Please sign in to comment.