Skip to content
정명주(myeongju.jung) edited this page May 7, 2019 · 13 revisions

N개 병렬 조회 (+ 순서보장)

Flux<PodCast> 에서 각 PodCast 별로 다시 조회(api)한 후 조건(오디오 채널인가?)에 따라서 필터링

    // 병렬처리 = flatMapSequential + callable + scheduler
    private List<PodCast> filterAudioPodtyCasts(List<PodCast> casts) {
        return Flux.fromIterable(casts)
                   .flatMapSequential(cast -> monoPodCastAndAudio(cast)) 
                   .filter(PodCastAndAudio::isAudio)
                   .map(PodtyCastAndAudio::getCast)
                   .collectList()
                   .block(Duration.ofSeconds(13));
    }

    private Mono<PodCastAndAudio> monoPodCastAndAudio(PodCast cast) {
        Long castId = cast.getCastId();
        return Mono.fromCallable(() -> audioChannelChecker.isAudio(castId))
                   .subscribeOn(Schedulers.elastic())
                   .map(isAudio -> new PodCastAndAudio(cast, isAudio));
    }

다양한 모델을 조회 후 조합

태그, 포스트, 뉴스, 랭킹 등 메인화면에 노출되는 여러가지 API를 한 번에 호출해서 조합

        // 병렬처리 = zip + TupleUtils.function & defer + scheduler(immediate)
        return Mono.zip(tags(), posts(accessMemberId), news(), rankings(accessMemberId), curators(accessMemberId))
                   .map(TupleUtils.function(MainDetail::new))
                   .doOnError(this::logError)
                   .onErrorReturn(MainDetail.EMPTY)
                   // 예외를 원하지 않는다면 .timeout 도 좋은 방법
                   .blockOptional(Duration.ofMillis(5000))    
                   .orElse(MainDetail.EMPTY);
//...
    private Mono<List<RecommendTagRow>> tags() {
        return Mono.fromCallable(() -> recommendTagQuerier.getRecommendTags())
                   .subscribeOn(Schedulers.elastic())
                   .doOnError(this::logError)
                   .onErrorReturn(emptyList());

    }
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class MainDetail {
    public static final MainDetail EMPTY = new MainDetail(emptyList(),
                                                          emptyList(),
                                                          NewsStandDto.EMPTY,
                                                          emptyList(),
                                                          emptyList());

    private List<RecommendTagRow> tags;
    private List<PostDetail> posts;
    private NewsStandDto news;
    private List<PostDetail> rankings;
    private List<RecommendCuratorRow> curators;

다양한 모델을 조회 후 명령(setAllDatas)

    private void setLatestReferenceDatas(Wiki wiki, PageContentReferencesHolder holder) {
        // block 없이 비동기적으로 데이터들를 한 번에 조회. 단, 모든 정보를 다 조회할 때 까지 block 한다.
        Mono.zip(getMemberMap(wiki, holder),                                // member
                 getMemberGroupMap(wiki, holder))                           // memberGroup
            .blockOptional()
            .ifPresent(TupleUtils.consumer(holder::setAllDatas));
    }
    Mono<Map<Long, TenantMember>> getMemberMap(Wiki wiki, PageContentReferencesHolder holder) {
        if (!holder.hasReferences(PageContentReference.MEMBER)) {
            return Mono.just(emptyMap());
        }
        Set<Long> orgMemberIds = holder.ids(PageContentReference.MEMBER);
        return Mono.fromCallable(() -> accountService.getTenantMemberMap(wiki.getTenantId(), orgMemberIds))
                   .subscribeOn(Schedulers.elastic());
    }

class PageContentReferencesHolder {
    void setAllDatas(Map<Long, TenantMember> memberMap, Map<Long, ProjectMemberGroup> memberGroupMap) {
        this.memberMap = memberMap;
        this.memberGroupMap = memberGroupMap;
    }
    // ...
}

Retry

푸시메시지를 발송한 후 푸시메시지가 정상적으로 발송되었는지 확인. 만약 푸시가 정상적으로 발송되지 않았으면 푸시실패 발행

  • Thread.sleep 없이 처리 가능
    Long messageId = messageIdGetter.getMessageId();
    PushMessageResponse response =
            Mono.just(messageId)
                .map(this::getMessageByApi)
                .retryWhen(
                        Retry.anyOf(WaitingException.class)               // 이 예외가 있으면 retry
                             .fixedBackoff(Duration.ofMillis(gapMillis))  // gapMillis 간격으로 
                             .retryMax(retryMax)                          // retryMax 만큼 retry
                             .doOnRetry(this::logRetry))
                .doOnError(this::logError)
                .onErrorReturn(PushMessageResponse.EMPTY)
                .blockOptional()
                .orElse(PushMessageResponse.EMPTY);

    if (FAIL == response.getStatus()) {
        pushFailNotificator.send(new ContactMessagePushFail(message));
    }
//...
    private void logError(Throwable error) {
        if (log.isWarnEnabled()) {
            log.warn("PushFailCallbackPostExecutor Error : " + error.getMessage(), error);
        }
    }

    private void logRetry(RetryContext<Object> context) {
        if (log.isInfoEnabled()) {
            log.info("Retry {} times. exception = {}", context.iteration(), context.exception().getMessage());
        }
    }

    private PushMessageResponse getMessageByApi(Long messageId) {
        PushMessageResponse response = client.getMessage(messageId);
        if (WAIT == response.getStatus()) {
            throw new WaitingException();    // 조건에 만족하지 못하는 경우 특정 예외 발생(그래야지 retry됨)
        }
        return response;
    }

Usage new[Elastic|...]

public class PageEventAssembler implements AutoCloseable {
    ...
    private final Scheduler rowScheduler = Schedulers.newElastic("page-event-row");

    ...
    private Mono<PageEventRow> monoPageEventRow(Tenant tenant, PageEvent source, OrganizationMember member) {
        return Mono.fromCallable(() -> {
            PageEventRow.Body body = toBody(tenant, source, member);
            PageEventRow.Creator creator = toCreator(tenant, source);
            return new PageEventRow(source.getEventId(), source.getPageId(), body,
                                    ZonedDateTime.of(source.getCreatedAt(), member.getZone()), creator);
        })
                   .doOnError(this::logError)
                   .onErrorReturn(PageEventRow.EMPTY)
                // .subscribeOn(Schedulers.newElasitc("page-event-row")); <- 이건 쓰레드 무한 증가 오류가 발생하게됨
                   .subscribeOn(rowScheduler);  
    }

    @Override
    public void close() {
        if (rowScheduler.isDisposed()) {
            return;
        }
        rowScheduler.dispose();
    }
}
Clone this wiki locally