Skip to content

Commit

Permalink
Gateway 대기열 도입 (#158)
Browse files Browse the repository at this point in the history
* feat(#114): QueueFilter 추가

* feat(#114): QueueFilter 추가

* feat(#114): 사용자 활동 추적

proceed queue 내에 있는 사용자 활동 추적, 5분 이상 활동이 없으면 proceed queue에서 제거

* feat(#114): gateway Exception

* feat(#114): docker compose 수정

gateway-cache 추가

* feat(#114): MAX_ACTIVE_USERS 환경변수로 변경

* feat(#114): prod-yml gateway cache 추가

* feat(#114) : 429 error에서 현재 대기 순위 반환해주도록 리턴 변경

wait queue에서 대기시
기존 : 대기열 진입 후 429 에러 반환
변경 : 대기열 진입 후 대기 순위(rank) 반환

* refactor(#114) : active queue 제거 및 코드 간소화

* feat(#114): thread pool, redisson 설정

* feat(#114): redisson 분산락 적용
  • Loading branch information
kimzinsun authored Oct 24, 2024
1 parent 703b4de commit ac6934a
Show file tree
Hide file tree
Showing 16 changed files with 427 additions and 1 deletion.
14 changes: 14 additions & 0 deletions docker-compose-dep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ services:
networks:
- sasaping-network

gateway-cache:
container_name: gateway-cache
image: redis:7.4.0
ports:
- "6382:6382"
command: [ "redis-server", "--port", "6382" ]
volumes:
- redis-gateway-data:/data
restart: always
networks:
- sasaping-network

volumes:
cassandra_data:
driver: local
Expand All @@ -313,6 +325,8 @@ volumes:
driver: local
redis-coupon-data:
driver: local
redis-gateway-data:
driver: local
certs:
driver: local
esdata01:
Expand Down
3 changes: 3 additions & 0 deletions service/gateway/server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {

implementation 'net.logstash.logback:logstash-logback-encoder:8.0'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-aop'
implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
implementation 'org.springframework.cloud:spring-cloud-starter-openfeign'
Expand All @@ -43,6 +44,8 @@ dependencies {
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.redisson:redisson:3.35.0'

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableFeignClients
@EnableScheduling
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class GatewayApplication {

public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.sparta.gateway.server.application;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.annotation.Aspect;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Aspect
@Component
@RequiredArgsConstructor
@Slf4j(topic = "DistributedLockComponent")
public class DistributedLockComponent {

private final RedissonClient redissonClient;

public void execute(
String lockName, long waitMilliSecond, long leaseMilliSecond, Runnable logic) {
RLock lock = redissonClient.getLock(lockName);
try {
boolean isLocked = lock.tryLock(waitMilliSecond, leaseMilliSecond, TimeUnit.MILLISECONDS);
if (!isLocked) {
throw new IllegalStateException("[" + lockName + "] lock 획득 실패");
}
logic.run();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}


@Bean
public ExecutorService customThreadPool() {
return Executors.newFixedThreadPool(10);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.sparta.gateway.server.application;

import com.sparta.gateway.server.application.dto.RegisterUserResponse;
import com.sparta.gateway.server.infrastructure.exception.GatewayErrorCode;
import com.sparta.gateway.server.infrastructure.exception.GatewayException;
import java.time.Instant;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Slf4j
@Service
@RequiredArgsConstructor
public class UserQueueService {

private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;
private final String USER_QUEUE_WAIT_KEY = "users:queue:wait";
private final String USER_QUEUE_PROCEED_KEY = "users:queue:proceed";
private final DistributedLockComponent lockComponent;
private Long activeUsers = 0L;

@Value("${MAX_ACTIVE_USERS}")
private long MAX_ACTIVE_USERS;

public Mono<RegisterUserResponse> registerUser(String userId) {
lockComponent.execute("registerUser", 1000, 1000, () -> {
activeUsers = reactiveRedisTemplate.opsForZSet().size(USER_QUEUE_PROCEED_KEY).block();
});
return activeUsers < MAX_ACTIVE_USERS ? addToProceedQueue(userId) : checkAndAddToQueue(userId);

}

private Mono<RegisterUserResponse> checkAndAddToQueue(String userId) {
return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId)
.defaultIfEmpty(-1.0)
.flatMap(score -> {
if (score >= 0) {
return updateWaitQueueScore(userId);
}
return addToWaitQueue(userId);
});
}

private Mono<RegisterUserResponse> updateWaitQueueScore(String userId) {
double newScore = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet().score(USER_QUEUE_WAIT_KEY, userId)
.flatMap(oldScore ->
reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_WAIT_KEY, userId, newScore)
.then(reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId))
)
.map(rank -> new RegisterUserResponse(rank + 1))
;
}

private Mono<RegisterUserResponse> addToProceedQueue(String userId) {
var unixTime = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_PROCEED_KEY, userId, unixTime)
.filter(i -> i)
.flatMap(
success -> updateUserActivityTime(userId).thenReturn(new RegisterUserResponse(0L)));
}

private Mono<RegisterUserResponse> addToWaitQueue(String userId) {
var unixTime = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet()
.add(USER_QUEUE_WAIT_KEY, userId, unixTime)
.filter(i -> i)
.switchIfEmpty(Mono.error(new GatewayException(GatewayErrorCode.INTERNAL_SERVER_ERROR)))
.then(reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId))
.map(rank -> new RegisterUserResponse(rank + 1))
;
}

public Mono<Boolean> isAllowed(String userId) {
return reactiveRedisTemplate.opsForZSet()
.rank(USER_QUEUE_PROCEED_KEY, userId)
.defaultIfEmpty(-1L)
.map(rank -> rank >= 0)
.flatMap(isAllowed -> {
if (isAllowed) {
return updateUserActivityTime(userId).thenReturn(true);
}
return registerUser(userId).thenReturn(true);
});
}

public Mono<Long> getRank(String userId) {
return reactiveRedisTemplate.opsForZSet().rank(USER_QUEUE_WAIT_KEY, userId)
.defaultIfEmpty(-1L)
.map(rank -> rank >= 0 ? rank + 1 : rank);
}

@Scheduled(fixedRate = 30000)
public void scheduleAllowUser() {
removeInactiveUsers()
.then(allowUserTask())
.subscribe(
movedUsers -> {
},
error -> {
log.error(GatewayErrorCode.INTERNAL_SERVER_ERROR.getMessage(), error);
}
);
}

private Mono<Void> removeInactiveUsers() {
long currentTime = Instant.now().getEpochSecond();
double maxScore = currentTime - 300;

return reactiveRedisTemplate.opsForZSet()
.removeRangeByScore(USER_QUEUE_PROCEED_KEY, Range.closed(0.0, maxScore))
.then();
}

private Mono<Long> allowUserTask() {
return reactiveRedisTemplate.opsForZSet().size(USER_QUEUE_PROCEED_KEY)
.flatMap(activeUsers -> {
long slotsAvailable = MAX_ACTIVE_USERS - activeUsers;
if (slotsAvailable <= 0) {
return Mono.just(0L);
}
return moveUsersToProceeds(slotsAvailable);
});
}

private Mono<Long> moveUsersToProceeds(long count) {
return reactiveRedisTemplate.opsForZSet()
.popMin(USER_QUEUE_WAIT_KEY, count)
.flatMap(user -> {
String userId = Objects.requireNonNull(user.getValue());
return updateUserActivityTime(userId);
})
.count();
}

private Mono<Boolean> updateUserActivityTime(String userId) {
long currentTime = Instant.now().getEpochSecond();
return reactiveRedisTemplate.opsForZSet().add(USER_QUEUE_PROCEED_KEY, userId, currentTime);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.sparta.gateway.server.application.dto;

public record RegisterUserResponse(Long rank) {

public int getRank() {
return rank.intValue();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.sparta.gateway.server.infrastructure.configuration;

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RedissonConfig {

private static final String REDIS_URL_PREFIX = "redis://";

@Value("${spring.data.redis.host}")
private String host;

@Value("${spring.data.redis.port}")
private int port;

@Bean
RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress(REDIS_URL_PREFIX + host + ":" + port);
return Redisson.create(config);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.sparta.gateway.server.infrastructure.exception;

import lombok.Getter;
import org.springframework.http.HttpStatus;

@Getter
public enum GatewayErrorCode {
INTERNAL_SERVER_ERROR(HttpStatus.INTERNAL_SERVER_ERROR, "서버 오류가 발생했습니다."),
UNAUTHORIZED(HttpStatus.UNAUTHORIZED, "인증되지 않은 사용자입니다."),
FORBIDDEN(HttpStatus.FORBIDDEN, "접근 권한이 없습니다."),
BAD_REQUEST(HttpStatus.BAD_REQUEST, "잘못된 요청입니다."),
NOT_FOUND(HttpStatus.NOT_FOUND, "요청한 리소스를 찾을 수 없습니다."),
SERVICE_UNAVAILABLE(HttpStatus.SERVICE_UNAVAILABLE, "서비스를 사용할 수 없습니다."),
GATEWAY_TIMEOUT(HttpStatus.GATEWAY_TIMEOUT, "게이트웨이 타임아웃이 발생했습니다."),
TOO_MANY_REQUESTS(HttpStatus.TOO_MANY_REQUESTS, "너무 많은 요청이 발생했습니다."),
INVALID_TOKEN(HttpStatus.UNAUTHORIZED, "유효하지 않은 토큰입니다."),
TOKEN_EXPIRED(HttpStatus.UNAUTHORIZED, "토큰이 만료되었습니다."),
RATE_LIMIT_EXCEEDED(HttpStatus.TOO_MANY_REQUESTS, "요청 한도를 초과했습니다."),
CIRCUIT_BREAKER_OPEN(HttpStatus.SERVICE_UNAVAILABLE, "서비스 회로 차단기가 열려있습니다."),
INVALID_ROUTE(HttpStatus.BAD_GATEWAY, "유효하지 않은 라우트입니다."),
REQUEST_BODY_TOO_LARGE(HttpStatus.PAYLOAD_TOO_LARGE, "요청 본문이 너무 큽니다.");

private final HttpStatus status;
private final String message;

GatewayErrorCode(HttpStatus status, String message) {
this.status = status;
this.message = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.sparta.gateway.server.infrastructure.exception;

import com.sparta.common.domain.exception.BusinessException;

public class GatewayException extends BusinessException {

public GatewayException(GatewayErrorCode errorCode) {
super(errorCode.getStatus().name(), errorCode.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.sparta.gateway.server.infrastructure.exception;

import com.sparta.common.domain.response.ApiResponse;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

@RestControllerAdvice
@Slf4j
public class GatewayExceptionHandler {

@ExceptionHandler(GatewayException.class)
public ApiResponse<?> handleException(GatewayException e) {
log.error(e.getMessage());
return ApiResponse.error(e.getStatusName(), e.getMessage());
}
}
Loading

0 comments on commit ac6934a

Please sign in to comment.