Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Event Message Type에 따른 Point 로직 변경 - #263 #264

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ public class RedisCacheConfig {
private int port;

@Bean
public RedisConnectionFactory redisConnectionFactoryForCache() {
public RedisConnectionFactory redisConnectionFactoryForOne() {
return new LettuceConnectionFactory(new RedisStandaloneConfiguration(host, port));
}

@Bean
public RedisTemplate<String, Object> redisTemplateForCache() {
public RedisTemplate<String, Object> redisTemplateForOne() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactoryForCache());
redisTemplate.setConnectionFactory(redisConnectionFactoryForOne());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}

@Bean
public CacheManager cacheManager() {
public CacheManager cacheManagerForOne() {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.serializeKeysWith(
RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
Expand All @@ -50,7 +50,7 @@ public CacheManager cacheManager() {

return RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(redisConnectionFactoryForCache())
.fromConnectionFactory(redisConnectionFactoryForOne())
.cacheDefaults(redisCacheConfiguration)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
package org.dateroad.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator;
import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode.NodeFlag;
import java.time.Duration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
public class RedisClusterConfig {

@Value("${aws.ip}")
private String host;

private RedisConnectionFactory redisConnectionFactory;
@Bean
@Primary
public RedisConnectionFactory redisConnectionFactoryForCluster() {
if (this.redisConnectionFactory != null) {
return this.redisConnectionFactory;
}

RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration()
.clusterNode(host, 7001)
.clusterNode(host, 7002)
Expand All @@ -33,48 +47,50 @@ public RedisConnectionFactory redisConnectionFactoryForCluster() {
.clusterNode(host, 7005)
.clusterNode(host, 7006);
SocketOptions socketOptions = SocketOptions.builder()
.connectTimeout(Duration.ofMillis(500L))
.connectTimeout(Duration.ofSeconds(3L))
.tcpNoDelay(true)
.keepAlive(true)
.build();
//----------------- (2) Cluster topology refresh 옵션

ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions
.builder()
.dynamicRefreshSources(true)
.enableAllAdaptiveRefreshTriggers()
.enablePeriodicRefresh(Duration.ofSeconds(30))
.build();
//----------------- (3) Cluster client 옵션

ClusterClientOptions clusterClientOptions = ClusterClientOptions
.builder()
.pingBeforeActivateConnection(true)
.autoReconnect(true)
.socketOptions(socketOptions)
.topologyRefreshOptions(clusterTopologyRefreshOptions)
.nodeFilter(it ->
! (it.is(NodeFlag.EVENTUAL_FAIL)
|| it.is(NodeFlag.FAIL)
|| it.is(NodeFlag.NOADDR)
|| it.is(NodeFlag.HANDSHAKE)))
!(it.is(NodeFlag.EVENTUAL_FAIL)
|| it.is(NodeFlag.FAIL)
|| it.is(NodeFlag.NOADDR)
|| it.is(NodeFlag.HANDSHAKE)))
.validateClusterNodeMembership(false)
.maxRedirects(5).build();
//----------------- (4) Lettuce Client 옵션

final LettuceClientConfiguration clientConfig = LettuceClientConfiguration
.builder()
.readFrom(ReadFrom.REPLICA_PREFERRED)
.commandTimeout(Duration.ofSeconds(10L))
.clientOptions(clusterClientOptions)
.build();
clusterConfig.setMaxRedirects(5);

clusterConfig.setMaxRedirects(3);
LettuceConnectionFactory factory = new LettuceConnectionFactory(clusterConfig, clientConfig);
//----------------- (5) LettuceConnectionFactory 옵션
factory.setValidateConnection(false);
factory.setShareNativeConnection(false); // 클러스터
factory.setShareNativeConnection(true);

this.redisConnectionFactory = factory; // 재사용을 위해 저장
return factory;
}

@Bean
@Primary
public RedisTemplate<String, Object> redistemplate() {
public RedisTemplate<String, Object> redistemplateForCluster() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// redisTemplate.setEnableTransactionSupport(true);
redisTemplate.setConnectionFactory(redisConnectionFactoryForCluster());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
Expand All @@ -83,4 +99,21 @@ public RedisTemplate<String, Object> redistemplate() {
return redisTemplate;
}

@Bean
@Primary
public CacheManager cacheManagerToCluster() {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.serializeKeysWith(
RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(
new GenericJackson2JsonRedisSerializer()))
.entryTtl(Duration.ofMinutes(60L));

return RedisCacheManager
.RedisCacheManagerBuilder
.fromConnectionFactory(redisConnectionFactoryForCluster())
.cacheDefaults(redisCacheConfiguration)
.build();
}
}

Original file line number Diff line number Diff line change
@@ -1,49 +1,36 @@
package org.dateroad.config;


import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandKeyword;
import io.lettuce.core.protocol.CommandType;

import lombok.RequiredArgsConstructor;
import org.dateroad.point.event.FreeEventListener;
import org.dateroad.point.event.PointEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisClusterConnection;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer.StreamMessageListenerContainerOptions;
import org.springframework.data.redis.stream.Subscription;

import java.time.Duration;
import java.util.Iterator;
import java.util.Objects;

@Configuration
@RequiredArgsConstructor
public class RedisStreamConfig {
private final PointEventListener pointEventListener;
private final FreeEventListener freeEventListener;
private final RedisTemplate<String, Object> redisTemplate;
private final RedisTemplate<String, Object> redistemplateForCluster;

public void createStreamConsumerGroup(final String streamKey, final String consumerGroupName) {
if (Boolean.FALSE.equals(redisTemplate.hasKey(streamKey))) {
if (Boolean.FALSE.equals(redistemplateForCluster.hasKey(streamKey))) {
// Stream이 존재하지 않을 경우, MKSTREAM 옵션으로 스트림과 그룹을 생성
redisTemplate.execute((RedisCallback<Void>) connection -> {
redistemplateForCluster.execute((RedisCallback<Void>) connection -> {
if (connection.isPipelined() || connection.isQueueing()) {
throw new UnsupportedOperationException("Pipelined or queued connections are not supported for cluster.");
}
Expand All @@ -57,20 +44,19 @@ public void createStreamConsumerGroup(final String streamKey, final String consu
// 비클러스터 모드에서 명령 실행
connection.execute("XGROUP", "CREATE".getBytes(), streamKeyBytes, consumerGroupNameBytes, "0".getBytes(), "MKSTREAM".getBytes());
}

return null;
});
} else {
// Stream이 존재할 경우 ConsumerGroup 존재 여부 확인 후 생성
if (!isStreamConsumerGroupExist(streamKey, consumerGroupName)) {
redisTemplate.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
redistemplateForCluster.opsForStream().createGroup(streamKey, ReadOffset.from("0"), consumerGroupName);
}
}
}

// ConsumerGroup 존재 여부 확인
public boolean isStreamConsumerGroupExist(final String streamKey, final String consumerGroupName) {
Iterator<StreamInfo.XInfoGroup> iterator = redisTemplate
Iterator<StreamInfo.XInfoGroup> iterator = redistemplateForCluster
.opsForStream().groups(streamKey).stream().iterator();
while (iterator.hasNext()) {
StreamInfo.XInfoGroup xInfoGroup = iterator.next();
Expand All @@ -84,12 +70,11 @@ public boolean isStreamConsumerGroupExist(final String streamKey, final String c
@Bean
public Subscription pointSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) {
createStreamConsumerGroup("coursePoint", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactoryForCluster,
containerOptions);
redisConnectionFactoryForCluster, containerOptions);

Subscription subscription = container.receiveAutoAck(Consumer.from("courseGroup", "instance-1"),
StreamOffset.create("coursePoint", ReadOffset.lastConsumed()), pointEventListener);
Expand All @@ -100,7 +85,7 @@ public Subscription pointSubscription(RedisConnectionFactory redisConnectionFact
@Bean
public Subscription freeSubscription(RedisConnectionFactory redisConnectionFactoryForCluster) {
createStreamConsumerGroup("courseFree", "courseGroup");
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(
redisConnectionFactoryForCluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class AsyncService {
private final CoursePlaceService coursePlaceService;
private final CourseTagService courseTagService;
private final ImageService imageService;
private final RedisTemplate<String, Object> redisTemplate;
private final RedisTemplate<String, Object> redistemplateForCluster;

@Transactional
public List<Image> createImage(final List<MultipartFile> images, final Course course) {
Expand All @@ -44,17 +44,17 @@ public void createCoursePlace(final List<CoursePlaceGetReq> places, final Course
}

public void publishEvenUserPoint(final Long userId, PointUseReq pointUseReq) {
Map<String, Object> fieldMap = new HashMap<>();
Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("userId", userId.toString());
fieldMap.put("point", Integer.toString(pointUseReq.getPoint()));
fieldMap.put("type", pointUseReq.getType().toString());
fieldMap.put("point", String.valueOf(pointUseReq.getPoint()));
fieldMap.put("type", String.valueOf(pointUseReq.getType()));
fieldMap.put("description", pointUseReq.getDescription());
redisTemplate.opsForStream().add("coursePoint", fieldMap);
redistemplateForCluster.opsForStream().add("coursePoint", fieldMap);
}

public void publishEventUserFree(final Long userId) {
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put("userId", userId.toString());
redisTemplate.opsForStream().add("courseFree", fieldMap);
redistemplateForCluster.opsForStream().add("courseFree", fieldMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public void onMessage(final MapRecord<String, String, String> message) {
Map<String, String> map = message.getValue();
Long userId = Long.valueOf(map.get("userId"));
User user = getUser(userId);
int userPoint = user.getFree();
user.setFree(userPoint -1);
int userFree = user.getFree();
user.setFree(userFree -1);
userRepository.save(user);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,40 @@

@Component
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public class PointEventListener implements StreamListener<String, MapRecord<String, String, String>> {
public class PointEventListener implements StreamListener<String, MapRecord<String,String,String>> {
private final UserRepository userRepository;
private final PointRepository pointRepository;

@Override
@Transactional
public void onMessage(final MapRecord<String, String, String> message) {
Map<String, String> map = message.getValue();
Long userId = Long.valueOf(map.get("userId"));
TransactionType type = TransactionType.valueOf(map.get("type"));
User user = getUser(userId);
int point = Integer.parseInt(map.get("point"));
String description = map.get("description");
switch (type) {
case POINT_GAINED:
user.setTotalPoint(user.getTotalPoint() + point);
break;
case POINT_USED:
user.setTotalPoint(user.getTotalPoint() - point);
break;
default:
throw new UnauthorizedException(FailureCode.INVALID_TRANSACTION_TYPE);
try {
Map<String, String> map = message.getValue();
Long userId = Long.valueOf(map.get("userId"));
TransactionType type = TransactionType.valueOf(map.get("type"));
User user = getUser(userId);
int point = Integer.parseInt(map.get("point"));
String description = map.get("description");
switch (type) {
case POINT_GAINED:
user.setTotalPoint(user.getTotalPoint() + point);
break;
case POINT_USED:
user.setTotalPoint(user.getTotalPoint() - point);
break;
default:
throw new UnauthorizedException(FailureCode.INVALID_TRANSACTION_TYPE);
}
pointRepository.save(Point.create(user, point, type, description));
userRepository.save(user);
} catch (Exception e) {
e.printStackTrace();
}
pointRepository.save(Point.create(user,point,type,description));
userRepository.save(user);
}

private User getUser(Long userId) {
return userRepository.findById(userId).orElseThrow(
() -> new EntityNotFoundException(FailureCode.USER_NOT_FOUND)
);
}
}
}