Skip to content

Commit eb7e451

Browse files
authored
Fix to call the duplicator's child stream subscriber methods by the correct executor (#5783)
Motivation: Subscriber methods must be called by the executor that is specified when subscribing to a `StreamMessage`. However, the methods of the duplicator's child stream subscriber are currently being called from the duplicator's executor, which is incorrect. Modifications: - Updated the implementation to ensure that the duplicator's child stream subscriber methods are called by the correct executor as specified during subscription. Result: - The duplicator's child stream subscriber methods are now called by the correct executor.
1 parent 0db0628 commit eb7e451

File tree

3 files changed

+187
-77
lines changed

3 files changed

+187
-77
lines changed

core/src/main/java/com/linecorp/armeria/common/stream/DefaultStreamMessageDuplicator.java

+83-73
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,7 @@ void subscribe(DownstreamSubscription<T> subscription) {
285285

286286
private void doSubscribe(DownstreamSubscription<T> subscription) {
287287
if (state == State.ABORTED) {
288-
final EventExecutor executor = subscription.executor;
289-
if (executor.inEventLoop()) {
290-
failLateProcessorSubscriber(subscription);
291-
} else {
292-
executor.execute(() -> failLateProcessorSubscriber(subscription));
293-
}
288+
subscription.failLateProcessorSubscriber();
294289
return;
295290
}
296291

@@ -301,63 +296,11 @@ private void doSubscribe(DownstreamSubscription<T> subscription) {
301296
}
302297
}
303298

304-
private static void failLateProcessorSubscriber(DownstreamSubscription<?> subscription) {
305-
final Subscriber<?> lateSubscriber = subscription.subscriber();
306-
try {
307-
lateSubscriber.onSubscribe(NoopSubscription.get());
308-
lateSubscriber.onError(
309-
new IllegalStateException("duplicator is closed or no more downstream can be added."));
310-
} catch (Throwable t) {
311-
throwIfFatal(t);
312-
logger.warn("Subscriber should not throw an exception. subscriber: {}", lateSubscriber, t);
313-
}
314-
}
315-
316-
void unsubscribe(DownstreamSubscription<T> subscription, @Nullable Throwable cause) {
299+
private void cleanupIfLastSubscription() {
317300
if (executor.inEventLoop()) {
318-
doUnsubscribe(subscription, cause);
319-
} else {
320-
executor.execute(() -> doUnsubscribe(subscription, cause));
321-
}
322-
}
323-
324-
private void doUnsubscribe(DownstreamSubscription<T> subscription, @Nullable Throwable cause) {
325-
if (!downstreamSubscriptions.remove(subscription)) {
326-
return;
327-
}
328-
329-
final Subscriber<? super T> subscriber = subscription.subscriber();
330-
subscription.clearSubscriber();
331-
332-
final CompletableFuture<Void> completionFuture = subscription.whenComplete();
333-
if (cause == null) {
334-
try {
335-
subscriber.onComplete();
336-
completionFuture.complete(null);
337-
} catch (Throwable t) {
338-
completionFuture.completeExceptionally(t);
339-
throwIfFatal(t);
340-
logger.warn("Subscriber.onComplete() should not raise an exception. subscriber: {}",
341-
subscriber, t);
342-
} finally {
343-
doCleanupIfLastSubscription();
344-
}
345-
return;
346-
}
347-
348-
try {
349-
if (subscription.notifyCancellation || !(cause instanceof CancelledSubscriptionException)) {
350-
subscriber.onError(cause);
351-
}
352-
completionFuture.completeExceptionally(cause);
353-
} catch (Throwable t) {
354-
final Exception composite = new CompositeException(t, cause);
355-
completionFuture.completeExceptionally(composite);
356-
throwIfFatal(t);
357-
logger.warn("Subscriber.onError() should not raise an exception. subscriber: {}",
358-
subscriber, composite);
359-
} finally {
360301
doCleanupIfLastSubscription();
302+
} else {
303+
executor.execute(this::doCleanupIfLastSubscription);
361304
}
362305
}
363306

@@ -613,7 +556,7 @@ static final class DownstreamSubscription<T> implements Subscription {
613556
private final StreamMessage<T> streamMessage;
614557
private Subscriber<? super T> subscriber;
615558
private final StreamMessageProcessor<T> processor;
616-
private final EventExecutor executor;
559+
private final EventExecutor downstreamExecutor;
617560
private final boolean withPooledObjects;
618561
private final boolean notifyCancellation;
619562

@@ -640,7 +583,7 @@ static final class DownstreamSubscription<T> implements Subscription {
640583
this.streamMessage = streamMessage;
641584
this.subscriber = subscriber;
642585
this.processor = processor;
643-
this.executor = executor;
586+
downstreamExecutor = executor;
644587
this.withPooledObjects = withPooledObjects;
645588
this.notifyCancellation = notifyCancellation;
646589
}
@@ -661,17 +604,36 @@ void clearSubscriber() {
661604
}
662605
}
663606

607+
void failLateProcessorSubscriber() {
608+
if (downstreamExecutor.inEventLoop()) {
609+
failLateProcessorSubscriber0();
610+
} else {
611+
downstreamExecutor.execute(this::failLateProcessorSubscriber0);
612+
}
613+
}
614+
615+
private void failLateProcessorSubscriber0() {
616+
try {
617+
subscriber.onSubscribe(NoopSubscription.get());
618+
subscriber.onError(
619+
new IllegalStateException("duplicator is closed or no more downstream can be added."));
620+
} catch (Throwable t) {
621+
throwIfFatal(t);
622+
logger.warn("Subscriber should not throw an exception. subscriber: {}", subscriber, t);
623+
}
624+
}
625+
664626
// Called from processor.processorExecutor
665627
void invokeOnSubscribe() {
666628
if (invokedOnSubscribe) {
667629
return;
668630
}
669631
invokedOnSubscribe = true;
670632

671-
if (executor.inEventLoop()) {
633+
if (downstreamExecutor.inEventLoop()) {
672634
invokeOnSubscribe0();
673635
} else {
674-
executor.execute(this::invokeOnSubscribe0);
636+
downstreamExecutor.execute(this::invokeOnSubscribe0);
675637
}
676638
}
677639

@@ -680,7 +642,7 @@ void invokeOnSubscribe0() {
680642
try {
681643
subscriber.onSubscribe(this);
682644
} catch (Throwable t) {
683-
processor.unsubscribe(this, t);
645+
unsubscribe(t);
684646
throwIfFatal(t);
685647
logger.warn("Subscriber.onSubscribe() should not raise an exception. subscriber: {}",
686648
subscriber, t);
@@ -692,7 +654,7 @@ public void request(long n) {
692654
if (n <= 0) {
693655
final Throwable cause = new IllegalArgumentException(
694656
"n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)");
695-
processor.unsubscribe(this, cause);
657+
unsubscribe(cause);
696658
return;
697659
}
698660

@@ -726,10 +688,10 @@ private void accumulateDemand(long n) {
726688
}
727689

728690
void signal() {
729-
if (executor.inEventLoop()) {
691+
if (downstreamExecutor.inEventLoop()) {
730692
doSignal();
731693
} else {
732-
executor.execute(this::doSignal);
694+
downstreamExecutor.execute(this::doSignal);
733695
}
734696
}
735697

@@ -757,7 +719,7 @@ private boolean doSignalSingle(SignalQueue signals) {
757719

758720
if (cancelledOrAborted != null) {
759721
// Stream ended due to cancellation or abortion.
760-
processor.unsubscribe(this, cancelledOrAborted);
722+
unsubscribe(cancelledOrAborted);
761723
return false;
762724
}
763725

@@ -771,7 +733,7 @@ private boolean doSignalSingle(SignalQueue signals) {
771733
if (signal instanceof CloseEvent) {
772734
// The stream has reached at its end.
773735
offset++;
774-
processor.unsubscribe(this, ((CloseEvent) signal).cause);
736+
unsubscribe(((CloseEvent) signal).cause);
775737
return false;
776738
}
777739

@@ -812,7 +774,7 @@ private boolean doSignalSingle(SignalQueue signals) {
812774
// If an exception such as IllegalReferenceCountException is raised while operating
813775
// on the ByteBuf, catch it and notify the subscriber with it. So the
814776
// subscriber does not hang forever.
815-
processor.unsubscribe(this, thrown);
777+
unsubscribe(thrown);
816778
return false;
817779
}
818780

@@ -832,7 +794,7 @@ private boolean doSignalSingle(SignalQueue signals) {
832794
try {
833795
subscriber.onNext(obj);
834796
} catch (Throwable t) {
835-
processor.unsubscribe(this, t);
797+
unsubscribe(t);
836798
throwIfFatal(t);
837799
logger.warn("Subscriber.onNext({}) should not raise an exception. subscriber: {}",
838800
obj, subscriber, t);
@@ -844,6 +806,54 @@ private boolean doSignalSingle(SignalQueue signals) {
844806
}
845807
}
846808

809+
void unsubscribe(@Nullable Throwable cause) {
810+
if (downstreamExecutor.inEventLoop()) {
811+
doUnsubscribe(cause);
812+
} else {
813+
downstreamExecutor.execute(() -> doUnsubscribe(cause));
814+
}
815+
}
816+
817+
private void doUnsubscribe(@Nullable Throwable cause) {
818+
if (!processor.downstreamSubscriptions.remove(this)) {
819+
return;
820+
}
821+
822+
final Subscriber<? super T> subscriber = this.subscriber;
823+
clearSubscriber();
824+
825+
final CompletableFuture<Void> completionFuture = whenComplete();
826+
if (cause == null) {
827+
try {
828+
subscriber.onComplete();
829+
completionFuture.complete(null);
830+
} catch (Throwable t) {
831+
completionFuture.completeExceptionally(t);
832+
throwIfFatal(t);
833+
logger.warn("Subscriber.onComplete() should not raise an exception. subscriber: {}",
834+
subscriber, t);
835+
} finally {
836+
processor.cleanupIfLastSubscription();
837+
}
838+
return;
839+
}
840+
841+
try {
842+
if (notifyCancellation || !(cause instanceof CancelledSubscriptionException)) {
843+
subscriber.onError(cause);
844+
}
845+
completionFuture.completeExceptionally(cause);
846+
} catch (Throwable t) {
847+
final Exception composite = new CompositeException(t, cause);
848+
completionFuture.completeExceptionally(composite);
849+
throwIfFatal(t);
850+
logger.warn("Subscriber.onError() should not raise an exception. subscriber: {}",
851+
subscriber, composite);
852+
} finally {
853+
processor.cleanupIfLastSubscription();
854+
}
855+
}
856+
847857
@Override
848858
public void cancel() {
849859
abort(subscriber instanceof AbortingSubscriber ? ((AbortingSubscriber<?>) subscriber).cause()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2021 LINE Corporation
3+
*
4+
* LINE Corporation licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.linecorp.armeria.common.stream;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
22+
import org.junit.jupiter.api.extension.RegisterExtension;
23+
import org.junit.jupiter.params.ParameterizedTest;
24+
import org.junit.jupiter.params.provider.CsvSource;
25+
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
27+
28+
import com.linecorp.armeria.testing.junit5.common.EventLoopExtension;
29+
30+
import io.netty.channel.EventLoop;
31+
32+
class StreamMessageDuplicatorChildSubscriberTest {
33+
34+
@RegisterExtension
35+
static final EventLoopExtension eventLoop1 = new EventLoopExtension();
36+
37+
@RegisterExtension
38+
static final EventLoopExtension eventLoop2 = new EventLoopExtension();
39+
40+
@RegisterExtension
41+
static final EventLoopExtension eventLoop3 = new EventLoopExtension();
42+
43+
@CsvSource({ "true", "false" })
44+
@ParameterizedTest
45+
void childSubscriberMethodsMustBeCalledByExecutors(boolean close) throws InterruptedException {
46+
final StreamWriter<String> publisher = StreamMessage.streaming();
47+
publisher.write("foo");
48+
if (close) {
49+
publisher.close();
50+
} else {
51+
publisher.abort();
52+
}
53+
54+
final StreamMessageDuplicator<String> duplicator =
55+
publisher.toDuplicator(eventLoop1.get());
56+
57+
final StreamMessage<String> first = duplicator.duplicate();
58+
final StreamMessage<String> second = duplicator.duplicate();
59+
60+
duplicator.close();
61+
62+
final CountDownLatch latch = new CountDownLatch(2);
63+
final EventLoop executor2 = eventLoop2.get();
64+
first.subscribe(new ChildSubscriber(executor2, latch), executor2);
65+
66+
final EventLoop executor3 = eventLoop3.get();
67+
second.subscribe(new ChildSubscriber(executor3, latch), executor3);
68+
latch.await();
69+
}
70+
71+
private static final class ChildSubscriber implements Subscriber<String> {
72+
73+
private final EventLoop eventLoop;
74+
private final CountDownLatch latch;
75+
76+
ChildSubscriber(EventLoop eventLoop, CountDownLatch latch) {
77+
this.eventLoop = eventLoop;
78+
this.latch = latch;
79+
}
80+
81+
@Override
82+
public void onSubscribe(Subscription s) {
83+
assertThat(eventLoop.inEventLoop()).isTrue();
84+
s.request(Long.MAX_VALUE);
85+
}
86+
87+
@Override
88+
public void onNext(String data) {
89+
assertThat(eventLoop.inEventLoop()).isTrue();
90+
}
91+
92+
@Override
93+
public void onError(Throwable t) {
94+
assertThat(eventLoop.inEventLoop()).isTrue();
95+
latch.countDown();
96+
}
97+
98+
@Override
99+
public void onComplete() {
100+
assertThat(eventLoop.inEventLoop()).isTrue();
101+
latch.countDown();
102+
}
103+
}
104+
}

core/src/test/java/com/linecorp/armeria/common/stream/StreamMessageDuplicatorTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import org.mockito.ArgumentCaptor;
4343
import org.reactivestreams.Subscriber;
4444
import org.reactivestreams.Subscription;
45-
import org.slf4j.Logger;
46-
import org.slf4j.LoggerFactory;
4745

4846
import com.google.common.base.Charsets;
4947

@@ -61,8 +59,6 @@
6159

6260
class StreamMessageDuplicatorTest {
6361

64-
private static final Logger logger = LoggerFactory.getLogger(StreamMessageDuplicatorTest.class);
65-
6662
private static final List<ByteBuf> byteBufs = new ArrayList<>();
6763

6864
@AfterEach

0 commit comments

Comments
 (0)