diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index 165fc35bad4d5..42c466c2120fe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -141,8 +141,6 @@ private ProcessorContextImpl getStandbyContext() { @Test public void globalKeyValueStoreShouldBeReadOnly() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.globalStore(anyString())).thenReturn(null); @@ -173,8 +171,6 @@ public void globalKeyValueStoreShouldBeReadOnly() { @Test public void globalTimestampedKeyValueStoreShouldBeReadOnly() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.globalStore(anyString())).thenReturn(null); @@ -299,8 +295,6 @@ public void globalSessionStoreShouldBeReadOnly() { @Test public void localKeyValueStoreShouldNotAllowInitOrClose() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.globalStore(anyString())).thenReturn(null); @@ -343,8 +337,6 @@ public void localKeyValueStoreShouldNotAllowInitOrClose() { @Test public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.globalStore(anyString())).thenReturn(null); @@ -521,8 +513,6 @@ public void localSessionStoreShouldNotAllowInitOrClose() { @Test public void shouldNotSendRecordHeadersToChangelogTopic() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION); @@ -553,18 +543,9 @@ public void shouldNotSendRecordHeadersToChangelogTopic() { @Test public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION); - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - final Position position = Position.emptyPosition(); final Headers headers = new RecordHeaders(); headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY); @@ -593,17 +574,6 @@ public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() { @Test public void shouldThrowUnsupportedOperationExceptionOnLogChange() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -613,17 +583,6 @@ public void shouldThrowUnsupportedOperationExceptionOnLogChange() { @Test public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -633,17 +592,6 @@ public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() { @Test public void shouldThrowUnsupportedOperationExceptionOnForward() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -653,17 +601,6 @@ public void shouldThrowUnsupportedOperationExceptionOnForward() { @Test public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -673,17 +610,6 @@ public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() { @Test public void shouldThrowUnsupportedOperationExceptionOnCommit() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -693,17 +619,6 @@ public void shouldThrowUnsupportedOperationExceptionOnCommit() { @Test public void shouldThrowUnsupportedOperationExceptionOnSchedule() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -713,17 +628,6 @@ public void shouldThrowUnsupportedOperationExceptionOnSchedule() { @Test public void shouldThrowUnsupportedOperationExceptionOnTopic() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -733,17 +637,6 @@ public void shouldThrowUnsupportedOperationExceptionOnTopic() { @Test public void shouldThrowUnsupportedOperationExceptionOnPartition() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -753,17 +646,6 @@ public void shouldThrowUnsupportedOperationExceptionOnPartition() { @Test public void shouldThrowUnsupportedOperationExceptionOnOffset() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -773,17 +655,6 @@ public void shouldThrowUnsupportedOperationExceptionOnOffset() { @Test public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -793,17 +664,6 @@ public void shouldThrowUnsupportedOperationExceptionOnTimestamp() { @Test public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -813,17 +673,6 @@ public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() { @Test public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -833,17 +682,6 @@ public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() { @Test public void shouldThrowUnsupportedOperationExceptionOnRecordContext() { - foreachSetUp(); - - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); - - context = buildProcessorContextImpl(streamsConfig, stateManager); - - final StreamTask task = mock(StreamTask.class); - context.transitionToActive(task, null, null); - - mockProcessorNodeWithLocalKeyValueStore(); - context = getStandbyContext(); assertThrows( UnsupportedOperationException.class, @@ -853,8 +691,6 @@ public void shouldThrowUnsupportedOperationExceptionOnRecordContext() { @Test public void shouldMatchStreamTime() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -870,8 +706,6 @@ public void shouldMatchStreamTime() { @Test public void shouldAddAndGetProcessorKeyValue() { - foreachSetUp(); - when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); context = buildProcessorContextImpl(streamsConfig, stateManager); @@ -891,8 +725,6 @@ public void shouldAddAndGetProcessorKeyValue() { @Test public void shouldSetAndGetProcessorMetaData() { - foreachSetUp(); - context = buildProcessorContextImpl(streamsConfig, stateManager); mockProcessorNodeWithLocalKeyValueStore();