diff --git a/dateroad-api/src/main/java/org/dateroad/config/RedisCacheConfig.java b/dateroad-api/src/main/java/org/dateroad/config/RedisCacheConfig.java index 24f9c423..8783d5e2 100644 --- a/dateroad-api/src/main/java/org/dateroad/config/RedisCacheConfig.java +++ b/dateroad-api/src/main/java/org/dateroad/config/RedisCacheConfig.java @@ -26,21 +26,21 @@ public class RedisCacheConfig { private int port; @Bean - public RedisConnectionFactory redisConnectionFactoryForCache() { + public RedisConnectionFactory redisConnectionFactoryForOne() { return new LettuceConnectionFactory(new RedisStandaloneConfiguration(host, port)); } @Bean - public RedisTemplate redisTemplateForCache() { + public RedisTemplate redisTemplateForOne() { RedisTemplate redisTemplate = new RedisTemplate<>(); - redisTemplate.setConnectionFactory(redisConnectionFactoryForCache()); + redisTemplate.setConnectionFactory(redisConnectionFactoryForOne()); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); return redisTemplate; } @Bean - public CacheManager cacheManager() { + public CacheManager cacheManagerForOne() { RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig() .serializeKeysWith( RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) @@ -50,7 +50,7 @@ public CacheManager cacheManager() { return RedisCacheManager .RedisCacheManagerBuilder - .fromConnectionFactory(redisConnectionFactoryForCache()) + .fromConnectionFactory(redisConnectionFactoryForOne()) .cacheDefaults(redisCacheConfiguration) .build(); } diff --git a/dateroad-api/src/main/java/org/dateroad/config/RedisConfig.java b/dateroad-api/src/main/java/org/dateroad/config/RedisClusterConfig.java similarity index 83% rename from dateroad-api/src/main/java/org/dateroad/config/RedisConfig.java rename to dateroad-api/src/main/java/org/dateroad/config/RedisClusterConfig.java index 5f194ad2..fa198280 100644 --- a/dateroad-api/src/main/java/org/dateroad/config/RedisConfig.java +++ b/dateroad-api/src/main/java/org/dateroad/config/RedisClusterConfig.java @@ -1,5 +1,8 @@ package org.dateroad.config; +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator; import io.lettuce.core.ReadFrom; import io.lettuce.core.SocketOptions; import io.lettuce.core.cluster.ClusterClientOptions; @@ -23,14 +26,19 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration -public class RedisConfig { +public class RedisClusterConfig { @Value("${aws.ip}") private String host; + private RedisConnectionFactory redisConnectionFactory; @Bean @Primary public RedisConnectionFactory redisConnectionFactoryForCluster() { + if (this.redisConnectionFactory != null) { + return this.redisConnectionFactory; + } + RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration() .clusterNode(host, 7001) .clusterNode(host, 7002) @@ -43,47 +51,46 @@ public RedisConnectionFactory redisConnectionFactoryForCluster() { .tcpNoDelay(true) .keepAlive(true) .build(); - //----------------- (2) Cluster topology refresh 옵션 + ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions .builder() .dynamicRefreshSources(true) .enableAllAdaptiveRefreshTriggers() .enablePeriodicRefresh(Duration.ofSeconds(30)) .build(); - //----------------- (3) Cluster client 옵션 + ClusterClientOptions clusterClientOptions = ClusterClientOptions .builder() .pingBeforeActivateConnection(true) .autoReconnect(true) -// .socketOptions(socketOptions) .topologyRefreshOptions(clusterTopologyRefreshOptions) .nodeFilter(it -> - ! (it.is(NodeFlag.EVENTUAL_FAIL) - || it.is(NodeFlag.FAIL) - || it.is(NodeFlag.NOADDR) - || it.is(NodeFlag.HANDSHAKE))) + !(it.is(NodeFlag.EVENTUAL_FAIL) + || it.is(NodeFlag.FAIL) + || it.is(NodeFlag.NOADDR) + || it.is(NodeFlag.HANDSHAKE))) .validateClusterNodeMembership(false) .maxRedirects(5).build(); - //----------------- (4) Lettuce Client 옵션 + final LettuceClientConfiguration clientConfig = LettuceClientConfiguration .builder() .readFrom(ReadFrom.REPLICA_PREFERRED) .commandTimeout(Duration.ofSeconds(10L)) .clientOptions(clusterClientOptions) .build(); + clusterConfig.setMaxRedirects(3); LettuceConnectionFactory factory = new LettuceConnectionFactory(clusterConfig, clientConfig); - //----------------- (5) LettuceConnectionFactory 옵션 factory.setValidateConnection(false); - factory.setShareNativeConnection(true); // 클러스터 + factory.setShareNativeConnection(true); + + this.redisConnectionFactory = factory; // 재사용을 위해 저장 return factory; } @Bean - @Primary - public RedisTemplate redistemplate() { + public RedisTemplate redistemplateForCluster() { RedisTemplate redisTemplate = new RedisTemplate<>(); -// redisTemplate.setEnableTransactionSupport(true); redisTemplate.setConnectionFactory(redisConnectionFactoryForCluster()); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); @@ -100,7 +107,7 @@ public CacheManager cacheManagerToCluster() { RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer( new GenericJackson2JsonRedisSerializer())) - .entryTtl(Duration.ofMinutes(60L)); // 캐쉬 저장 시간 1시간 설정 + .entryTtl(Duration.ofMinutes(60L)); return RedisCacheManager .RedisCacheManagerBuilder @@ -109,3 +116,4 @@ public CacheManager cacheManagerToCluster() { .build(); } } + diff --git a/dateroad-api/src/main/java/org/dateroad/config/RedisStreamConfig.java b/dateroad-api/src/main/java/org/dateroad/config/RedisStreamConfig.java index d7b68671..786cbf01 100644 --- a/dateroad-api/src/main/java/org/dateroad/config/RedisStreamConfig.java +++ b/dateroad-api/src/main/java/org/dateroad/config/RedisStreamConfig.java @@ -1,49 +1,36 @@ package org.dateroad.config; -import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.cluster.api.sync.RedisClusterCommands; -import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.output.StatusOutput; -import io.lettuce.core.protocol.CommandArgs; -import io.lettuce.core.protocol.CommandKeyword; -import io.lettuce.core.protocol.CommandType; + import lombok.RequiredArgsConstructor; import org.dateroad.point.event.FreeEventListener; import org.dateroad.point.event.PointEventListener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; + import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisClusterConnection; -import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.stream.*; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions; import org.springframework.data.redis.stream.Subscription; import java.time.Duration; import java.util.Iterator; -import java.util.Objects; @Configuration @RequiredArgsConstructor public class RedisStreamConfig { private final PointEventListener pointEventListener; private final FreeEventListener freeEventListener; - private final RedisTemplate redisTemplate; + private final RedisTemplate redistemplateForCluster; public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) { - if (Boolean.FALSE.equals(redisTemplate.hasKey(streamKey))) { + if (Boolean.FALSE.equals(redistemplateForCluster.hasKey(streamKey))) { // Stream이 존재하지 않을 경우, MKSTREAM 옵션으로 스트림과 그룹을 생성 - redisTemplate.execute((RedisCallback) connection -> { + redistemplateForCluster.execute((RedisCallback) connection -> { if (connection.isPipelined() || connection.isQueueing()) { throw new UnsupportedOperationException("Pipelined or queued connections are not supported for cluster."); } @@ -57,20 +44,19 @@ public void createStreamConsumerGroup(final String streamKey, final String consu // 비클러스터 모드에서 명령 실행 connection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes, "0".getBytes(), "MKSTREAM".getBytes()); } - return null; }); } else { // Stream이 존재할 경우 ConsumerGroup 존재 여부 확인 후 생성 if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) { - redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName); + redistemplateForCluster.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName); } } } // ConsumerGroup 존재 여부 확인 public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) { - Iterator iterator = redisTemplate + Iterator iterator = redistemplateForCluster .opsForStream().groups(streamKey).stream().iterator(); while (iterator.hasNext()) { StreamInfo.XInfoGroup xInfoGroup = iterator.next(); @@ -84,12 +70,11 @@ public boolean isStreamConsumerGroupExist(final String streamKey, final String c @Bean public Subscription pointSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) { createStreamConsumerGroup("coursePoint", "courseGroup"); - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainerOptions .builder().pollTimeout(Duration.ofMillis(100)).build(); StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( - redisConnectionFactoryForCluster, - containerOptions); + redisConnectionFactoryForCluster, containerOptions); Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-1"), StreamOffset.create("coursePoint", ReadOffset.lastConsumed()), pointEventListener); @@ -100,7 +85,7 @@ public Subscription pointSubscription(RedisConnectionFactory redisConnectionFact @Bean public Subscription freeSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) { createStreamConsumerGroup("courseFree", "courseGroup"); - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + StreamMessageListenerContainerOptions> containerOptions = StreamMessageListenerContainerOptions .builder().pollTimeout(Duration.ofMillis(100)).build(); StreamMessageListenerContainer> container = StreamMessageListenerContainer.create( redisConnectionFactoryForCluster, diff --git a/dateroad-api/src/main/java/org/dateroad/course/service/AsyncService.java b/dateroad-api/src/main/java/org/dateroad/course/service/AsyncService.java index deedd287..abd79e46 100644 --- a/dateroad-api/src/main/java/org/dateroad/course/service/AsyncService.java +++ b/dateroad-api/src/main/java/org/dateroad/course/service/AsyncService.java @@ -26,7 +26,7 @@ public class AsyncService { private final CoursePlaceService coursePlaceService; private final CourseTagService courseTagService; private final ImageService imageService; - private final RedisTemplate redisTemplate; + private final RedisTemplate redistemplateForCluster; @Transactional public List createImage(final List images, final Course course) { @@ -44,17 +44,17 @@ public void createCoursePlace(final List places, final Course } public void publishEvenUserPoint(final Long userId, PointUseReq pointUseReq) { - Map fieldMap = new HashMap<>(); + Map fieldMap = new HashMap<>(); fieldMap.put("userId", userId.toString()); - fieldMap.put("point", Integer.toString(pointUseReq.getPoint())); - fieldMap.put("type", pointUseReq.getType().toString()); + fieldMap.put("point", String.valueOf(pointUseReq.getPoint())); + fieldMap.put("type", String.valueOf(pointUseReq.getType())); fieldMap.put("description", pointUseReq.getDescription()); - redisTemplate.opsForStream().add("coursePoint", fieldMap); + redistemplateForCluster.opsForStream().add("coursePoint", fieldMap); } public void publishEventUserFree(final Long userId) { Map fieldMap = new HashMap<>(); fieldMap.put("userId", userId.toString()); - redisTemplate.opsForStream().add("courseFree", fieldMap); + redistemplateForCluster.opsForStream().add("courseFree", fieldMap); } } diff --git a/dateroad-api/src/main/java/org/dateroad/point/event/FreeEventListener.java b/dateroad-api/src/main/java/org/dateroad/point/event/FreeEventListener.java index 8271bca8..9fbcef26 100644 --- a/dateroad-api/src/main/java/org/dateroad/point/event/FreeEventListener.java +++ b/dateroad-api/src/main/java/org/dateroad/point/event/FreeEventListener.java @@ -23,8 +23,8 @@ public void onMessage(final MapRecord message) { Map map = message.getValue(); Long userId = Long.valueOf(map.get("userId")); User user = getUser(userId); - int userPoint = user.getFree(); - user.setFree(userPoint -1); + int userFree = user.getFree(); + user.setFree(userFree -1); userRepository.save(user); } diff --git a/dateroad-api/src/main/java/org/dateroad/point/event/PointEventListener.java b/dateroad-api/src/main/java/org/dateroad/point/event/PointEventListener.java index 02e554b0..494f8b2c 100644 --- a/dateroad-api/src/main/java/org/dateroad/point/event/PointEventListener.java +++ b/dateroad-api/src/main/java/org/dateroad/point/event/PointEventListener.java @@ -19,31 +19,35 @@ @Component @RequiredArgsConstructor(access = AccessLevel.PROTECTED) -public class PointEventListener implements StreamListener> { +public class PointEventListener implements StreamListener> { private final UserRepository userRepository; private final PointRepository pointRepository; @Override @Transactional public void onMessage(final MapRecord message) { - Map map = message.getValue(); - Long userId = Long.valueOf(map.get("userId")); - TransactionType type = TransactionType.valueOf(map.get("type")); - User user = getUser(userId); - int point = Integer.parseInt(map.get("point")); - String description = map.get("description"); - switch (type) { - case POINT_GAINED: - user.setTotalPoint(user.getTotalPoint() + point); - break; - case POINT_USED: - user.setTotalPoint(user.getTotalPoint() - point); - break; - default: - throw new UnauthorizedException(FailureCode.INVALID_TRANSACTION_TYPE); + try { + Map map = message.getValue(); + Long userId = Long.valueOf(map.get("userId")); + TransactionType type = TransactionType.valueOf(map.get("type")); + User user = getUser(userId); + int point = Integer.parseInt(map.get("point")); + String description = map.get("description"); + switch (type) { + case POINT_GAINED: + user.setTotalPoint(user.getTotalPoint() + point); + break; + case POINT_USED: + user.setTotalPoint(user.getTotalPoint() - point); + break; + default: + throw new UnauthorizedException(FailureCode.INVALID_TRANSACTION_TYPE); + } + pointRepository.save(Point.create(user, point, type, description)); + userRepository.save(user); + } catch (Exception e) { + e.printStackTrace(); } - pointRepository.save(Point.create(user,point,type,description)); - userRepository.save(user); } private User getUser(Long userId) { @@ -51,4 +55,4 @@ private User getUser(Long userId) { () -> new EntityNotFoundException(FailureCode.USER_NOT_FOUND) ); } -} +} \ No newline at end of file