-
Notifications
You must be signed in to change notification settings - Fork 0
Reactor Samples
정명주(myeongju.jung) edited this page May 7, 2019
·
13 revisions
Flux<PodCast>
에서 각 PodCast 별로 다시 조회(api)한 후 조건(오디오 채널인가?)에 따라서 필터링
// 병렬처리 = flatMapSequential + callable + scheduler
private List<PodCast> filterAudioPodtyCasts(List<PodCast> casts) {
return Flux.fromIterable(casts)
.flatMapSequential(cast -> monoPodCastAndAudio(cast))
.filter(PodCastAndAudio::isAudio)
.map(PodtyCastAndAudio::getCast)
.collectList()
.block(Duration.ofSeconds(13));
}
private Mono<PodCastAndAudio> monoPodCastAndAudio(PodCast cast) {
Long castId = cast.getCastId();
return Mono.fromCallable(() -> audioChannelChecker.isAudio(castId))
.subscribeOn(Schedulers.elastic())
.map(isAudio -> new PodCastAndAudio(cast, isAudio));
}
태그, 포스트, 뉴스, 랭킹 등 메인화면에 노출되는 여러가지 API를 한 번에 호출해서 조합
// 병렬처리 = zip + TupleUtils.function & defer + scheduler(immediate)
return Mono.zip(tags(), posts(accessMemberId), news(), rankings(accessMemberId), curators(accessMemberId))
.map(TupleUtils.function(MainDetail::new))
.doOnError(this::logError)
.onErrorReturn(MainDetail.EMPTY)
// 예외를 원하지 않는다면 .timeout 도 좋은 방법
.blockOptional(Duration.ofMillis(5000))
.orElse(MainDetail.EMPTY);
//...
private Mono<List<RecommendTagRow>> tags() {
return Mono.fromCallable(() -> recommendTagQuerier.getRecommendTags())
.subscribeOn(Schedulers.elastic())
.doOnError(this::logError)
.onErrorReturn(emptyList());
}
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class MainDetail {
public static final MainDetail EMPTY = new MainDetail(emptyList(),
emptyList(),
NewsStandDto.EMPTY,
emptyList(),
emptyList());
private List<RecommendTagRow> tags;
private List<PostDetail> posts;
private NewsStandDto news;
private List<PostDetail> rankings;
private List<RecommendCuratorRow> curators;
private void setLatestReferenceDatas(Wiki wiki, PageContentReferencesHolder holder) {
// block 없이 비동기적으로 데이터들를 한 번에 조회. 단, 모든 정보를 다 조회할 때 까지 block 한다.
Mono.zip(getMemberMap(wiki, holder), // member
getMemberGroupMap(wiki, holder)) // memberGroup
.blockOptional()
.ifPresent(TupleUtils.consumer(holder::setAllDatas));
}
Mono<Map<Long, TenantMember>> getMemberMap(Wiki wiki, PageContentReferencesHolder holder) {
if (!holder.hasReferences(PageContentReference.MEMBER)) {
return Mono.just(emptyMap());
}
Set<Long> orgMemberIds = holder.ids(PageContentReference.MEMBER);
return Mono.fromCallable(() -> accountService.getTenantMemberMap(wiki.getTenantId(), orgMemberIds))
.subscribeOn(Schedulers.elastic());
}
class PageContentReferencesHolder {
void setAllDatas(Map<Long, TenantMember> memberMap, Map<Long, ProjectMemberGroup> memberGroupMap) {
this.memberMap = memberMap;
this.memberGroupMap = memberGroupMap;
}
// ...
}
푸시메시지를 발송한 후 푸시메시지가 정상적으로 발송되었는지 확인. 만약 푸시가 정상적으로 발송되지 않았으면 푸시실패 발행
- Thread.sleep 없이 처리 가능
Long messageId = messageIdGetter.getMessageId();
PushMessageResponse response =
Mono.just(messageId)
.map(this::getMessageByApi)
.retryWhen(
Retry.anyOf(WaitingException.class) // 이 예외가 있으면 retry
.fixedBackoff(Duration.ofMillis(gapMillis)) // gapMillis 간격으로
.retryMax(retryMax) // retryMax 만큼 retry
.doOnRetry(this::logRetry))
.doOnError(this::logError)
.onErrorReturn(PushMessageResponse.EMPTY)
.blockOptional()
.orElse(PushMessageResponse.EMPTY);
if (FAIL == response.getStatus()) {
pushFailNotificator.send(new ContactMessagePushFail(message));
}
//...
private void logError(Throwable error) {
if (log.isWarnEnabled()) {
log.warn("PushFailCallbackPostExecutor Error : " + error.getMessage(), error);
}
}
private void logRetry(RetryContext<Object> context) {
if (log.isInfoEnabled()) {
log.info("Retry {} times. exception = {}", context.iteration(), context.exception().getMessage());
}
}
private PushMessageResponse getMessageByApi(Long messageId) {
PushMessageResponse response = client.getMessage(messageId);
if (WAIT == response.getStatus()) {
throw new WaitingException(); // 조건에 만족하지 못하는 경우 특정 예외 발생(그래야지 retry됨)
}
return response;
}
public class PageEventAssembler implements AutoCloseable {
...
private final Scheduler rowScheduler = Schedulers.newElastic("page-event-row");
...
private Mono<PageEventRow> monoPageEventRow(Tenant tenant, PageEvent source, OrganizationMember member) {
return Mono.fromCallable(() -> {
PageEventRow.Body body = toBody(tenant, source, member);
PageEventRow.Creator creator = toCreator(tenant, source);
return new PageEventRow(source.getEventId(), source.getPageId(), body,
ZonedDateTime.of(source.getCreatedAt(), member.getZone()), creator);
})
.doOnError(this::logError)
.onErrorReturn(PageEventRow.EMPTY)
// .subscribeOn(Schedulers.newElasitc("page-event-row")); <- 이건 쓰레드 무한 증가 오류가 발생하게됨
.subscribeOn(rowScheduler);
}
@Override
public void close() {
if (rowScheduler.isDisposed()) {
return;
}
rowScheduler.dispose();
}
}
JAVA
JPA
- JPA-Create-And-Update
- Optional-Eager
- QueryDsl-Configuration
- QueryDsl-More-Type-safety
- QueryDsl-SubQuery
DDD
Install
Spring
Spring-Boot
- Swagger2-Configuration
- Spring-Restdocs-Configuration
- Spring-Page-Jackson
- JSR310-Guide
- logback-spring.xml
- WebMvcUtils.java
- Spring-Boot-Properties
- Spring-Boot-Hidden-Gems
- Spring-Boot-Config
Spring-Cloud
- Spring-Cloud-Zuul
- Spring-Cloud-Feign
- Spring-Cloud-Hystrix
- Spring-Cloud-Consul
- Spring-Cloud-Ribbon
- Spring-Cloud-Circuit-Breaker
JavaScript
Gradle
Test
Linux
Etc
TODO http://zoltanaltfatter.com/2017/06/09/publishing-domain-events-from-aggregate-roots/