Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] 단일 Redis -> Cluster Migration - #233 #234

Merged
merged 2 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,22 +1,46 @@
package org.dateroad.config;

import java.time.Duration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching
public class RedisCacheConfig {

@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private int port;

@Bean
public RedisConnectionFactory redisConnectionFactoryForCache() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(host, port));
}

@Bean
public RedisTemplate<String, Object> redisTemplateForCache() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactoryForCache());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}

@Bean
public CacheManager cacheManager(RedisConnectionFactory cf) {
public CacheManager cacheManager() {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.serializeKeysWith(
RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
Expand All @@ -26,7 +50,7 @@ public CacheManager cacheManager(RedisConnectionFactory cf) {

return RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(cf)
.fromConnectionFactory(redisConnectionFactoryForCache())
.cacheDefaults(redisCacheConfiguration)
.build();
}
Expand Down
41 changes: 41 additions & 0 deletions dateroad-api/src/main/java/org/dateroad/config/RedisConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.dateroad.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisConnectionFactory redisConnectionFactoryForCluster() {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration()
.clusterNode("127.0.0.1", 7000)
.clusterNode("127.0.0.1", 7001)
.clusterNode("127.0.0.1", 7002)
.clusterNode("127.0.0.1", 7003)
.clusterNode("127.0.0.1", 7004)
.clusterNode("127.0.0.1", 7005);
// 클러스터 모드로 LettuceConnectionFactory 설정
LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(clusterConfig);
lettuceConnectionFactory.setShareNativeConnection(false); // 클러스터 모드에서 필요에 따라 사용할 수 있음
return lettuceConnectionFactory;
}

@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.setConnectionFactory(redisConnectionFactoryForCluster());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
return redisTemplate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
Expand All @@ -10,13 +11,20 @@
import lombok.RequiredArgsConstructor;
import org.dateroad.point.event.FreeEventListener;
import org.dateroad.point.event.PointEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
Expand All @@ -30,56 +38,40 @@
public class RedisStreamConfig {
private final PointEventListener pointEventListener;
private final FreeEventListener freeEventListener;
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private int port;

@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}

@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
private final RedisTemplate<String, Object> redisTemplate;

public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
if (Boolean.FALSE.equals(redisTemplate().hasKey(streamKey))) {
RedisAsyncCommands<String, String> commands = (RedisAsyncCommands<String, String>) Objects.requireNonNull(
redisTemplate()
.getConnectionFactory())
.getConnection()
.getNativeConnection();
if (Boolean.FALSE.equals(redisTemplate.hasKey(streamKey))) {
// Stream이 존재하지 않을 경우, MKSTREAM 옵션으로 스트림과 그룹을 생성
redisTemplate.execute((RedisCallback<Void>) connection -> {
if (connection.isPipelined() || connection.isQueueing()) {
throw new UnsupportedOperationException("Pipelined or queued connections are not supported for cluster.");
}
byte[] streamKeyBytes = streamKey.getBytes();
byte[] consumerGroupNameBytes = consumerGroupName.getBytes();

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8)
.add(CommandKeyword.CREATE)
.add(streamKey)
.add(consumerGroupName)
.add("0")
.add("MKSTREAM");
// MKSTREAM 옵션을 사용하여 스트림과 그룹을 생성
commands.dispatch(CommandType.XGROUP, new StatusOutput<>(StringCodec.UTF8), args).toCompletableFuture()
.join();
}
// Stream 존재 시, ConsumerGroup 존재 여부 확인 후 ConsumerGroup을 생성
else {
if (connection instanceof RedisClusterConnection clusterConnection) {
// 클러스터 모드에서 명령 실행
clusterConnection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes, "0".getBytes(), "MKSTREAM".getBytes());
} else {
// 비클러스터 모드에서 명령 실행
connection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes, "0".getBytes(), "MKSTREAM".getBytes());
}

return null;
});
} else {
// Stream이 존재할 경우 ConsumerGroup 존재 여부 확인 후 생성
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redisTemplate().opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}

// ConsumerGroup 존재 여부 확인
public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate()
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate
.opsForStream().groups(streamKey).stream().iterator();

while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
if (xInfoGroup.groupName().equals(consumerGroupName)) {
Expand All @@ -90,13 +82,13 @@ public boolean isStreamConsumerGroupExist(final String streamKey, final String c
}

@Bean
public Subscription pointSubscription() {
public Subscription pointSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) {
createStreamConsumerGroup("coursePoint", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactory(),
redisConnectionFactoryForCluster,
containerOptions);

Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-1"),
Expand All @@ -106,17 +98,16 @@ public Subscription pointSubscription() {
}

@Bean
public Subscription freeSubscription() {
public Subscription freeSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) {
createStreamConsumerGroup("courseFree", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactory(),
redisConnectionFactoryForCluster,
containerOptions);
Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-2"),
StreamOffset.create("courseFree", ReadOffset.lastConsumed()), freeEventListener);
container.start();
return subscription;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.dateroad.course.dto.request.TagCreateReq;
import org.dateroad.date.domain.Course;
import org.dateroad.image.domain.Image;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
Expand All @@ -24,7 +26,7 @@ public class AsyncService {
private final CoursePlaceService coursePlaceService;
private final CourseTagService courseTagService;
private final ImageService imageService;
private final StringRedisTemplate redisTemplate;
private final RedisTemplate<String, Object> redisTemplate;

@Transactional
public List<Image> createImage(final List<MultipartFile> images, final Course course) {
Expand Down
Loading