Skip to content

Commit

Permalink
remove easymock library (apache#12859)
Browse files Browse the repository at this point in the history
  • Loading branch information
sullis authored Apr 9, 2024
1 parent 425eea9 commit c93de37
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 70 deletions.
6 changes: 2 additions & 4 deletions pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<phase.prop>package</phase.prop>
<easymock.version>5.2.0</easymock.version>
<reactive.version>1.0.2</reactive.version>
<localstack-utils.version>0.2.23</localstack-utils.version>
</properties>
Expand Down Expand Up @@ -122,9 +121,8 @@
</dependency>

<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>${easymock.version}</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Map;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.easymock.Capture;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.core.SdkBytes;
Expand All @@ -38,10 +38,8 @@
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -80,7 +78,7 @@ private KinesisConfig getKinesisConfig() {

@BeforeMethod
public void setupTest() {
_kinesisClient = createMock(KinesisClient.class);
_kinesisClient = mock(KinesisClient.class);
_kinesisConfig = getKinesisConfig();
_records = new ArrayList<>(NUM_RECORDS);
for (int i = 0; i < NUM_RECORDS; i++) {
Expand All @@ -93,19 +91,18 @@ public void setupTest() {

@Test
public void testBasicConsumer() {
Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
ArgumentCaptor<GetRecordsRequest> getRecordsRequestCapture = ArgumentCaptor.forClass(GetRecordsRequest.class);
ArgumentCaptor<GetShardIteratorRequest> getShardIteratorRequestCapture =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);

GetRecordsResponse getRecordsResponse =
GetRecordsResponse.builder().nextShardIterator(null).records(_records).build();
GetShardIteratorResponse getShardIteratorResponse =
GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();

expect(_kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
expect(_kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
.anyTimes();

replay(_kinesisClient);
when(_kinesisClient.getRecords(getRecordsRequestCapture.capture())).thenReturn(getRecordsResponse);
when(_kinesisClient.getShardIterator(getShardIteratorRequestCapture.capture())).thenReturn(
getShardIteratorResponse);

KinesisConsumer kinesisConsumer = new KinesisConsumer(_kinesisConfig, _kinesisClient);
KinesisPartitionGroupOffset startOffset = new KinesisPartitionGroupOffset("0", "1");
Expand All @@ -118,26 +115,27 @@ public void testBasicConsumer() {
}

assertFalse(kinesisMessageBatch.isEndOfPartitionGroup());
assertEquals(getRecordsRequestCapture.getValue().shardIterator(), "DUMMY");
assertEquals(getShardIteratorRequestCapture.getValue().shardId(), "0");
}

@Test
public void testBasicConsumerWithChildShard() {
List<ChildShard> shardList = new ArrayList<>();
shardList.add(ChildShard.builder().shardId(PLACEHOLDER).parentShards("0").build());

Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance();
Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance();
ArgumentCaptor<GetRecordsRequest> getRecordsRequestCapture = ArgumentCaptor.forClass(GetRecordsRequest.class);
ArgumentCaptor<GetShardIteratorRequest> getShardIteratorRequestCapture =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);

GetRecordsResponse getRecordsResponse =
GetRecordsResponse.builder().nextShardIterator(null).records(_records).childShards(shardList).build();
GetShardIteratorResponse getShardIteratorResponse =
GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build();

expect(_kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes();
expect(_kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse)
.anyTimes();

replay(_kinesisClient);
when(_kinesisClient.getRecords(getRecordsRequestCapture.capture())).thenReturn(getRecordsResponse);
when(_kinesisClient.getShardIterator(getShardIteratorRequestCapture.capture())).thenReturn(
getShardIteratorResponse);

KinesisConsumer kinesisConsumer = new KinesisConsumer(_kinesisConfig, _kinesisClient);
KinesisPartitionGroupOffset startOffset = new KinesisPartitionGroupOffset("0", "1");
Expand All @@ -149,6 +147,9 @@ public void testBasicConsumerWithChildShard() {
for (int i = 0; i < NUM_RECORDS; i++) {
assertEquals(baToString(kinesisMessageBatch.getStreamMessage(i).getValue()), DUMMY_RECORD_PREFIX + i);
}

assertEquals(getRecordsRequestCapture.getValue().shardIterator(), "DUMMY");
assertEquals(getShardIteratorRequestCapture.getValue().shardId(), "0");
}

public String baToString(byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;

import static org.easymock.EasyMock.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class KinesisStreamMetadataProviderTest {
Expand Down Expand Up @@ -71,9 +71,9 @@ private StreamConfig getStreamConfig() {

@BeforeMethod
public void setupTest() {
_kinesisConnectionHandler = createMock(KinesisConnectionHandler.class);
_streamConsumerFactory = createMock(StreamConsumerFactory.class);
_partitionGroupConsumer = createNiceMock(PartitionGroupConsumer.class);
_kinesisConnectionHandler = mock(KinesisConnectionHandler.class);
_streamConsumerFactory = mock(StreamConsumerFactory.class);
_partitionGroupConsumer = mock(PartitionGroupConsumer.class);
_kinesisStreamMetadataProvider =
new KinesisStreamMetadataProvider(CLIENT_ID, getStreamConfig(), _kinesisConnectionHandler,
_streamConsumerFactory);
Expand All @@ -87,11 +87,11 @@ public void getPartitionsGroupInfoListTest()
Shard shard1 = Shard.builder().shardId(SHARD_ID_1)
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();

expect(_kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
replay(_kinesisConnectionHandler);
when(_kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of(shard0, shard1));

List<PartitionGroupMetadata> result = _kinesisStreamMetadataProvider
.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), new ArrayList<>(), TIMEOUT);
List<PartitionGroupMetadata> result =
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), new ArrayList<>(),
TIMEOUT);

Assert.assertEquals(result.size(), 2);
Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
Expand All @@ -105,32 +105,33 @@ public void getPartitionsGroupInfoEndOfShardTest()

KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset("0", "1");

currentPartitionGroupMeta.add(new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset,
kinesisPartitionGroupOffset, "CONSUMING"));
currentPartitionGroupMeta.add(
new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset, kinesisPartitionGroupOffset,
"CONSUMING"));

Capture<StreamPartitionMsgOffset> checkpointArgs = newCapture(CaptureType.ALL);
Capture<PartitionGroupConsumptionStatus> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
Capture<Integer> intArguments = newCapture(CaptureType.ALL);
Capture<String> stringCapture = newCapture(CaptureType.ALL);
ArgumentCaptor<StreamPartitionMsgOffset> checkpointArgs = ArgumentCaptor.forClass(StreamPartitionMsgOffset.class);
ArgumentCaptor<PartitionGroupConsumptionStatus> partitionGroupMetadataCapture =
ArgumentCaptor.forClass(PartitionGroupConsumptionStatus.class);
ArgumentCaptor<Integer> intArguments = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<String> stringCapture = ArgumentCaptor.forClass(String.class);

Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(
SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();
Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(
SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
expect(_kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
expect(_streamConsumerFactory
.createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
.andReturn(_partitionGroupConsumer).anyTimes();
expect(_partitionGroupConsumer.fetchMessages(capture(checkpointArgs), captureInt(intArguments))).andReturn(
new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, true)).anyTimes();

replay(_kinesisConnectionHandler, _streamConsumerFactory, _partitionGroupConsumer);
Shard shard1 = Shard.builder().shardId(SHARD_ID_1)
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
when(_kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of(shard0, shard1));
when(_streamConsumerFactory.createPartitionGroupConsumer(stringCapture.capture(),
partitionGroupMetadataCapture.capture())).thenReturn(_partitionGroupConsumer);
when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), intArguments.capture())).thenReturn(
new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, true));

List<PartitionGroupMetadata> result = _kinesisStreamMetadataProvider
.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
List<PartitionGroupMetadata> result =
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(),
currentPartitionGroupMeta, TIMEOUT);

Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).getPartitionGroupId(), 1);
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1);
}

@Test
Expand All @@ -142,32 +143,33 @@ public void getPartitionsGroupInfoChildShardsest()
shardToSequenceMap.put("1", "1");
KinesisPartitionGroupOffset kinesisPartitionGroupOffset = new KinesisPartitionGroupOffset("1", "1");

currentPartitionGroupMeta.add(new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset,
kinesisPartitionGroupOffset, "CONSUMING"));
currentPartitionGroupMeta.add(
new PartitionGroupConsumptionStatus(0, 1, kinesisPartitionGroupOffset, kinesisPartitionGroupOffset,
"CONSUMING"));

Capture<StreamPartitionMsgOffset> checkpointArgs = newCapture(CaptureType.ALL);
Capture<PartitionGroupConsumptionStatus> partitionGroupMetadataCapture = newCapture(CaptureType.ALL);
Capture<Integer> intArguments = newCapture(CaptureType.ALL);
Capture<String> stringCapture = newCapture(CaptureType.ALL);
ArgumentCaptor<StreamPartitionMsgOffset> checkpointArgs = ArgumentCaptor.forClass(StreamPartitionMsgOffset.class);
ArgumentCaptor<PartitionGroupConsumptionStatus> partitionGroupMetadataCapture =
ArgumentCaptor.forClass(PartitionGroupConsumptionStatus.class);
ArgumentCaptor<Integer> intArguments = ArgumentCaptor.forClass(Integer.class);
ArgumentCaptor<String> stringCapture = ArgumentCaptor.forClass(String.class);

Shard shard0 = Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange(
SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
Shard shard0 = Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1)
.sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build();
Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(
SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build();

expect(_kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes();
expect(_streamConsumerFactory
.createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture)))
.andReturn(_partitionGroupConsumer).anyTimes();
expect(_partitionGroupConsumer.fetchMessages(capture(checkpointArgs), captureInt(intArguments))).andReturn(
new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, true)).anyTimes();

replay(_kinesisConnectionHandler, _streamConsumerFactory, _partitionGroupConsumer);
when(_kinesisConnectionHandler.getShards()).thenReturn(ImmutableList.of(shard0, shard1));
when(_streamConsumerFactory.createPartitionGroupConsumer(stringCapture.capture(),
partitionGroupMetadataCapture.capture())).thenReturn(_partitionGroupConsumer);
when(_partitionGroupConsumer.fetchMessages(checkpointArgs.capture(), intArguments.capture())).thenReturn(
new KinesisMessageBatch(new ArrayList<>(), kinesisPartitionGroupOffset, true));

List<PartitionGroupMetadata> result = _kinesisStreamMetadataProvider
.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(), currentPartitionGroupMeta, TIMEOUT);
List<PartitionGroupMetadata> result =
_kinesisStreamMetadataProvider.computePartitionGroupMetadata(CLIENT_ID, getStreamConfig(),
currentPartitionGroupMeta, TIMEOUT);

Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).getPartitionGroupId(), 0);
Assert.assertEquals(partitionGroupMetadataCapture.getValue().getSequenceNumber(), 1);
}
}

0 comments on commit c93de37

Please sign in to comment.