123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package com.kmall.admin.config.mq;
- import com.kmall.admin.config.mq.callback.SimpleClearConfirmCallback;
- import com.kmall.admin.config.mq.callback.SimpleClearReturnCallback;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
- import org.springframework.amqp.support.converter.MessageConversionException;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.core.env.Environment;
- /**
- * 队列相关配置
- * req_20210826_001
- * @author lhm
- * @version 1.0
- * 2021-08-26 17:07
- */
- @Configuration
- @PropertySource(value = {"classpath:conf/mq.properties"})
- public class RabbitMQConfig {
- @Autowired
- private Environment environment;
- @Bean
- public CustomRabbitMQProperties customRabbitMQProperties () {
- CustomRabbitMQProperties customRabbitMQProperties = new CustomRabbitMQProperties();
- customRabbitMQProperties.setUsername(environment.getProperty("mq.username"));
- customRabbitMQProperties.setPassword(environment.getProperty("mq.password"));
- customRabbitMQProperties.setHost(environment.getProperty("mq.host"));
- customRabbitMQProperties.setPort(Integer.parseInt(environment.getProperty("mq.port")));
- customRabbitMQProperties.setVirtualHost(environment.getProperty("mq.virtual.host"));
- customRabbitMQProperties.setChannelCacheSize(Integer.parseInt(environment.getProperty("mq.channel.cache.size")));
- customRabbitMQProperties.setOpen(Boolean.parseBoolean(environment.getProperty("mq.ope")));
- customRabbitMQProperties.setE_normal_oms_order_to_handle_customs_clearance(environment.getProperty("e.normal.oms.order.to.handle.customs.clearance"));
- customRabbitMQProperties.setQ_normal_oms_order_to_handle_customs_clearance(environment.getProperty("q.normal.oms.order.to.handle.customs.clearance"));
- customRabbitMQProperties.setK_normal_oms_order_to_handle_customs_clearance(environment.getProperty("k.normal.oms.order.to.handle.customs.clearance"));
- return customRabbitMQProperties;
- }
- /* ----------------------------------------------- 基础配置 --------------------------------------------- */
- @Bean
- public ConnectionFactory connectionFactory () {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- connectionFactory.setUsername(customRabbitMQProperties().getUsername());
- connectionFactory.setPassword(customRabbitMQProperties().getPassword());
- connectionFactory.setHost(customRabbitMQProperties().getHost());
- connectionFactory.setVirtualHost(customRabbitMQProperties().getVirtualHost());
- connectionFactory.setPort(customRabbitMQProperties().getPort());
- // 共用同一个Channel
- connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
- // 获取配置的Channel缓存大小
- connectionFactory.setChannelCacheSize(customRabbitMQProperties().getChannelCacheSize());
- // 消息到达broke后触发回调
- connectionFactory.setPublisherConfirms(true);
- return connectionFactory;
- }
- /**
- * SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型
- * @return rabbit监听器容器工厂
- */
- @Bean
- public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
- containerFactory.setConnectionFactory(connectionFactory);
- containerFactory.setConcurrentConsumers(3);
- containerFactory.setMessageConverter(new MessageConverter() {
- @Override
- public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
- return new Message(object.toString().getBytes(), messageProperties);
- }
- @Override
- public Object fromMessage(Message message) throws MessageConversionException {
- return new String(message.getBody());
- }
- });
- // 手动ack
- containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- return containerFactory;
- }
- @Bean
- public AmqpAdmin amqpAdmin () {
- AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory());
- amqpAdmin.declareExchange(clearDirectExchange());
- amqpAdmin.declareQueue(clearQueue());
- amqpAdmin.declareBinding(clearBinding());
- return amqpAdmin;
- }
- /* ----------------------------------------------- 队列配置 --------------------------------------------- */
- @Bean
- public RabbitTemplate clearRabbitTemplate (ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance());
- rabbitTemplate.setRoutingKey(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance());
- rabbitTemplate.setConfirmCallback(new SimpleClearConfirmCallback());
- rabbitTemplate.setReturnCallback(new SimpleClearReturnCallback());
- return rabbitTemplate;
- }
- @Bean
- public DirectExchange clearDirectExchange () {
- return new DirectExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance());
- }
- @Bean
- public Queue clearQueue () {
- return new Queue(customRabbitMQProperties().getQ_normal_oms_order_to_handle_customs_clearance());
- }
- @Bean
- public Binding clearBinding () {
- return BindingBuilder.bind(clearQueue()).to(clearDirectExchange()).with(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance());
- }
- }
|