Skip to content

Commit

Permalink
block until element arrived
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Feb 20, 2024
1 parent ba16656 commit 34f7d43
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class JdbcProjectionTest extends JUnitSuite {

Expand Down Expand Up @@ -209,13 +211,14 @@ private void expectNextUntilErrorMessage(TestSubscriber.Probe<Done> probe, Strin
}

private JdbcHandler<Envelope, PureJdbcSession> concatHandler(StringBuffer str) {
return concatHandler(str, __ -> false);
return concatHandler(str, new CountDownLatch(0), __ -> false);
}

private JdbcHandler<Envelope, PureJdbcSession> concatHandler(
StringBuffer buffer, Predicate<Long> failPredicate) {
StringBuffer buffer, CountDownLatch latch, Predicate<Long> failPredicate) {
return JdbcHandler.fromFunction(
(PureJdbcSession session, Envelope envelope) -> {
latch.countDown();
if (failPredicate.test(envelope.offset)) {
throw new RuntimeException(failMessage(envelope.offset));
} else {
Expand Down Expand Up @@ -275,14 +278,15 @@ public void exactlyOnceShouldRestartFromPreviousOffset() {
ProjectionId projectionId = genRandomProjectionId();

StringBuffer str = new StringBuffer();
CountDownLatch latch = new CountDownLatch(3);

Projection<Envelope> projection =
JdbcProjection.exactlyOnce(
projectionId,
sourceProvider(entityId),
jdbcSessionCreator,
// fail on fourth offset
() -> concatHandler(str, offset -> offset == 4),
() -> concatHandler(str, latch, offset -> offset == 4),
testKit.system());

projectionTestKit.runWithTestSink(
Expand Down Expand Up @@ -326,14 +330,15 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
ProjectionId projectionId = genRandomProjectionId();

StringBuffer str = new StringBuffer();
CountDownLatch latch = new CountDownLatch(3);

Projection<Envelope> projection =
JdbcProjection.atLeastOnce(
projectionId,
sourceProvider(entityId),
jdbcSessionCreator,
// fail on fourth offset
() -> concatHandler(str, offset -> offset == 4),
() -> concatHandler(str, latch, offset -> offset == 4),
testKit.system())
.withSaveOffset(1, Duration.ZERO);

Expand All @@ -346,10 +351,9 @@ public void atLeastOnceShouldRestartFromPreviousOffset() {
*
* See https://github.com/akka/akka-projection/issues/462 for a possible solution.
*/
// because concatHandler won't concat element that offset 4, so this is safe that request
// 3
probe.request(3);
probe.expectNextN(3);
probe.request(2);
probe.expectNextN(2);
assertTrue(latch.await(3, TimeUnit.SECONDS));
assertEquals("abc|def|ghi|", str.toString());
expectNextUntilErrorMessage(probe, failMessage(4));
});
Expand Down

0 comments on commit 34f7d43

Please sign in to comment.