Skip to content

Commit

Permalink
完成 mq 的整合
Browse files Browse the repository at this point in the history
  • Loading branch information
千面 committed Dec 17, 2021
1 parent 01edf29 commit c1af481
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.billow.notice.amqp.properties.MqSetting;
import com.billow.notice.amqp.properties.NoticeMqYml;
import com.billow.notice.config.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.BeansException;
Expand All @@ -21,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* 获取 ApplicationContext
Expand All @@ -46,36 +46,75 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) applicationContext;
// 获取bean工厂并转换为DefaultListableBeanFactory
defaultListableBeanFactory = (DefaultListableBeanFactory) configurableApplicationContext.getBeanFactory();
// 保存 applicationContext
SpringContextUtil.setApplicationContext(applicationContext);
}

@PostConstruct
public void afterPropertiesSet() throws Exception
public void dynamicRegistryMQ() throws Exception
{
/// https://www.cnblogs.com/Chary/p/14361830.html
// 注册bean
// 获取配置文件
Map<String, MqSetting> mqSettingMap = this.getMqSettingList();
// 转换生成需要构建 bean 的数据
Map<String, MqSetting> appendMqSetting = this.convertMqSetting(mqSettingMap);
if (!CollectionUtils.isEmpty(appendMqSetting))
{
mqSettingMap.putAll(appendMqSetting);
}
log.info("====================== 开始注入 MQ 设置:{} ======================", mqSettingMap.size());
for (String key : mqSettingMap.keySet())
{
MqSetting mqSetting = mqSettingMap.get(key);
// 配置数据检查
this.checkMqSetting(key, mqSetting);
// 注入队列
this.registryQueue(mqSetting);
boolean registryQueue = this.registryQueue(mqSetting);
// 注入交换机
this.registryExchange(mqSetting.getExchangeType(), mqSetting.getExchange());
boolean registryExchange = this.registryExchange(mqSetting);
// 绑定队列和交换机
if (!registryExchange && !registryQueue)
{
continue;
}
String bindingName = this.registryBinding(key, mqSetting);
log.info("exchangeName:{},queueName:{},routeKey:{},bindingName:{} 绑定成功", mqSetting.getExchange(), mqSetting.getQueue(),
mqSetting.getRouteKey(), bindingName);

System.out.println(SpringContextUtil.getBean(bindingName, Binding.class));
}
log.info("====================== 结束注入 MQ 设置 ======================");
}

/**
* 转换生成需要构建 bean 的数据
*
* @param mqSettingMap
* @return Map<String, MqSetting>
* @author 千面
* @date 2021/12/17 16:06
*/
private Map<String, MqSetting> convertMqSetting(Map<String, MqSetting> mqSettingMap)
{
Map<String, MqSetting> appendMqSetting = new HashMap<>();
Set<String> keySet = mqSettingMap.keySet();
for (String key : keySet)
{
MqSetting mqSetting = mqSettingMap.get(key);
String dlxExchange = mqSetting.getDlxExchange();
String dlxQueue = mqSetting.getDlxQueue();
String dlxExchangeType = mqSetting.getDlxExchangeType();
String dlxRouteKey = mqSetting.getDlxRouteKey();
if (StringUtils.isEmpty(dlxExchange) || StringUtils.isEmpty(dlxQueue))
{
continue;
}
MqSetting temp = new MqSetting();
temp.setExchange(dlxExchange);
temp.setQueue(dlxQueue);
temp.setRouteKey(dlxRouteKey);
temp.setDlxExchangeType(dlxExchangeType);
String tempKey = key + MqConstant.SUFFIX_BINDING_DLX;
appendMqSetting.put(tempKey, temp);
}
return appendMqSetting;
}

/**
* 获取 mq 的设置属性
*
Expand Down Expand Up @@ -125,19 +164,22 @@ private String registryBinding(String mqSettingName, MqSetting mqSetting)
/**
* 注入交换机
*
* @param exchangeType 交换机类型
* @param exchangeName 交换机名称
* @param mqSetting mq 配置
* @return void
* @author 千面
* @date 2021/12/15 9:16
*/
private void registryExchange(String exchangeType, String exchangeName) throws ClassNotFoundException
private boolean registryExchange(MqSetting mqSetting) throws ClassNotFoundException
{
// 交换机类型
String exchangeType = mqSetting.getExchangeType();
// 交换机名称
String exchangeName = mqSetting.getExchange();
boolean exchangeFlag = applicationContext.containsBean(exchangeName);
if (exchangeFlag)
{
log.warn("交换机:{} 已经存在", exchangeName);
return;
return false;
}
// 交换机类型
Class<?> exchangeClass;
Expand All @@ -153,9 +195,10 @@ private void registryExchange(String exchangeType, String exchangeName) throws C
exchangeClass = TopicExchange.class;
break;
default:
throw new ClassNotFoundException("交换机:" + exchangeName + ",没有找到对应的交换机类型:" + exchangeType);
log.error("交换机:{},没有找到对应的交换机类型:{}", exchangeName, exchangeType);
throw new ClassNotFoundException();
}
this.registerBean(exchangeName, exchangeClass, exchangeName);
return this.registerBean(exchangeName, exchangeClass, exchangeName);
}

/**
Expand All @@ -166,15 +209,32 @@ private void registryExchange(String exchangeType, String exchangeName) throws C
* @author 千面
* @date 2021/12/15 9:16
*/
private void registryQueue(MqSetting mqSetting)
private boolean registryQueue(MqSetting mqSetting)
{
String queueName = mqSetting.getQueue();
boolean queueFlag = applicationContext.containsBean(queueName);
if (queueFlag)
{
log.warn("队列:{} 已经存在", queueName);
return false;
}
// 延时队列转发的死信交换机上
Map<String, Object> arguments = new HashMap<>();
String dlxExchange = mqSetting.getDlxExchange();
if (StringUtils.hasText(dlxExchange))
{
arguments.put(MqConstant.X_DEAD_LETTER_EXCHANGE, dlxExchange);
arguments.put(MqConstant.X_MESSAGE_TTL, mqSetting.getMessageTtl());
String dlxRouteKey = mqSetting.getDlxRouteKey();
if (StringUtils.hasText(dlxRouteKey))
{
arguments.put(MqConstant.X_DEAD_LETTER_ROUTING_KEY, dlxRouteKey);
}
mqSetting.setDurable(true);
}
this.registerBean(queueName, Queue.class, queueName, mqSetting.getDurable());
// 注册 bean
return this.registerBean(queueName, Queue.class, queueName, mqSetting.getDurable(),
false, false, arguments);
}

/**
Expand All @@ -188,23 +248,24 @@ private void registryQueue(MqSetting mqSetting)
*/
private void checkMqSetting(String mqSettingName, MqSetting mqSetting)
{
log.info("mqSettingName:{},mqSetting{}", mqSettingName, mqSetting);
String queueName = mqSetting.getQueue();
if (StringUtils.isEmpty(queueName))
{
log.warn("MQ 配置===>>>队列名称为空,自动生成");
mqSetting.setQueue(mqSettingName + MqConstant.SUFFIX_QUEUE);
log.warn("MQ 配置{}===>>>队列名称为空,自动生成:{}", mqSetting.getQueue());
}
String exchangeName = mqSetting.getExchange();
if (StringUtils.isEmpty(exchangeName))
{
log.warn("MQ 配置===>>>交换机名称为空,自动生成");
mqSetting.setExchange(mqSettingName + MqConstant.SUFFIX_EXCHANGE);
log.warn("MQ 配置===>>>交换机名称为空,自动生成:{}", mqSetting.getExchange());
}
String routeKey = mqSetting.getRouteKey();
if (StringUtils.isEmpty(routeKey))
{
log.error("MQ 配置===>>>路由key名称为空,自动生成");
mqSetting.setRouteKey(mqSettingName + MqConstant.SUFFIX_ROUTE_KEY);
mqSetting.setRouteKey("");
log.warn("MQ 配置===>>>路由key名称为空,使用默认路由");
}
}

Expand All @@ -218,7 +279,7 @@ private void checkMqSetting(String mqSettingName, MqSetting mqSetting)
* @author 千面
* @date 2021/12/15 8:39
*/
private <T> void registerBean(String beanName, Class<T> beanClass, Object... args)
private <T> boolean registerBean(String beanName, Class<T> beanClass, Object... args)
{
// 如果为空,自动生成 beanName
if (StringUtils.isEmpty(beanName))
Expand All @@ -235,6 +296,7 @@ private <T> void registerBean(String beanName, Class<T> beanClass, Object... arg
BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
// 注入 bean 定义
defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinition);
return true;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class MqConstant
public final static String X_DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";

/**
* DLK:配置路由器
* DLX:配置路由器
*/
public final static String X_DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

Expand All @@ -31,26 +31,30 @@ public class MqConstant
* @date 2021/12/15 10:36
*/
public final static String SUFFIX_QUEUE = "Queue";

/**
* 交换机默认后缀
*
* @author 千面
* @date 2021/12/15 10:36
*/
public final static String SUFFIX_EXCHANGE = "Exchange";

/**
* 路由 key 默认后缀
* 绑定默认后缀
*
* @author 千面
* @date 2021/12/15 10:36
*/
public final static String SUFFIX_ROUTE_KEY = "RouteKey";
public final static String SUFFIX_BINDING = "Binding";

/**
* 绑定默认后缀
* 死信绑定默认后缀
*
* @author 千面
* @date 2021/12/15 10:36
*/
public final static String SUFFIX_BINDING = "Binding";
public final static String SUFFIX_BINDING_DLX = "Binding-DLX";


}

This file was deleted.

Loading

0 comments on commit c1af481

Please sign in to comment.