|
@@ -0,0 +1,112 @@
|
|
|
+package com.kmall.admin.config.mq;
|
|
|
+
|
|
|
+import com.kmall.admin.config.mq.callback.SimpleClearConfirmCallback;
|
|
|
+import com.kmall.admin.config.mq.callback.SimpleClearReturnCallback;
|
|
|
+import org.springframework.amqp.core.*;
|
|
|
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
|
|
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
|
|
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
|
|
+import org.springframework.amqp.support.converter.MessageConversionException;
|
|
|
+import org.springframework.amqp.support.converter.MessageConverter;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 队列相关配置
|
|
|
+ * req_20210826_001
|
|
|
+ * @author lhm
|
|
|
+ * @version 1.0
|
|
|
+ * 2021-08-26 17:07
|
|
|
+ */
|
|
|
+@Configuration
|
|
|
+public class RabbitMQConfig {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private CustomRabbitMQProperties customRabbitMQProperties;
|
|
|
+
|
|
|
+ /* ----------------------------------------------- 基础配置 --------------------------------------------- */
|
|
|
+ @Bean
|
|
|
+ public ConnectionFactory connectionFactory () {
|
|
|
+ CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
|
|
|
+ connectionFactory.setUsername(customRabbitMQProperties.getUsername());
|
|
|
+ connectionFactory.setPassword(customRabbitMQProperties.getPassword());
|
|
|
+ connectionFactory.setHost(customRabbitMQProperties.getHost());
|
|
|
+ connectionFactory.setVirtualHost(customRabbitMQProperties.getVirtualHost());
|
|
|
+ connectionFactory.setPort(customRabbitMQProperties.getPort());
|
|
|
+ // 共用同一个Channel
|
|
|
+ connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
|
|
|
+ // 获取配置的Channel缓存大小
|
|
|
+ connectionFactory.setChannelCacheSize(customRabbitMQProperties.getChannelCacheSize());
|
|
|
+ // 消息到达broke后触发回调
|
|
|
+ connectionFactory.setPublisherConfirms(true);
|
|
|
+ return connectionFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型
|
|
|
+ * @return rabbit监听器容器工厂
|
|
|
+ */
|
|
|
+ @Bean
|
|
|
+ public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
|
|
|
+ SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
|
|
|
+ containerFactory.setConnectionFactory(connectionFactory);
|
|
|
+ containerFactory.setConcurrentConsumers(3);
|
|
|
+ containerFactory.setMessageConverter(new MessageConverter() {
|
|
|
+ @Override
|
|
|
+ public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
|
|
|
+ return new Message(object.toString().getBytes(), messageProperties);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Object fromMessage(Message message) throws MessageConversionException {
|
|
|
+ return new String(message.getBody());
|
|
|
+ }
|
|
|
+ });
|
|
|
+ // 手动ack
|
|
|
+ containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
|
|
+ return containerFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public AmqpAdmin amqpAdmin () {
|
|
|
+ AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory());
|
|
|
+
|
|
|
+ amqpAdmin.declareExchange(clearDirectExchange());
|
|
|
+ amqpAdmin.declareQueue(clearQueue());
|
|
|
+ amqpAdmin.declareBinding(clearBinding());
|
|
|
+
|
|
|
+ return amqpAdmin;
|
|
|
+ }
|
|
|
+
|
|
|
+ /* ----------------------------------------------- 队列配置 --------------------------------------------- */
|
|
|
+ @Bean
|
|
|
+ public RabbitTemplate clearRabbitTemplate (ConnectionFactory connectionFactory) {
|
|
|
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
|
|
+
|
|
|
+ rabbitTemplate.setExchange(customRabbitMQProperties.getE_normal_oms_order_to_handle_customs_clearance());
|
|
|
+ rabbitTemplate.setRoutingKey(customRabbitMQProperties.getK_normal_oms_order_to_handle_customs_clearance());
|
|
|
+ rabbitTemplate.setConfirmCallback(new SimpleClearConfirmCallback());
|
|
|
+ rabbitTemplate.setReturnCallback(new SimpleClearReturnCallback());
|
|
|
+
|
|
|
+ return rabbitTemplate;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public DirectExchange clearDirectExchange () {
|
|
|
+ return new DirectExchange(customRabbitMQProperties.getE_normal_oms_order_to_handle_customs_clearance());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Queue clearQueue () {
|
|
|
+ return new Queue(customRabbitMQProperties.getQ_normal_oms_order_to_handle_customs_clearance());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Binding clearBinding () {
|
|
|
+ return BindingBuilder.bind(clearQueue()).to(clearDirectExchange()).with(customRabbitMQProperties.getK_normal_oms_order_to_handle_customs_clearance());
|
|
|
+ }
|
|
|
+}
|