package com.kmall.admin.config.mq; import com.kmall.admin.config.mq.callback.SimpleClearConfirmCallback; import com.kmall.admin.config.mq.callback.SimpleClearReturnCallback; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.env.Environment; /** * 队列相关配置 * req_20210826_001 * @author lhm * @version 1.0 * 2021-08-26 17:07 */ @Configuration @PropertySource(value = {"classpath:conf/mq.properties"}) public class RabbitMQConfig { @Autowired private Environment environment; @Bean public CustomRabbitMQProperties customRabbitMQProperties () { CustomRabbitMQProperties customRabbitMQProperties = new CustomRabbitMQProperties(); customRabbitMQProperties.setUsername(environment.getProperty("mq.username")); customRabbitMQProperties.setPassword(environment.getProperty("mq.password")); customRabbitMQProperties.setHost(environment.getProperty("mq.host")); customRabbitMQProperties.setPort(Integer.parseInt(environment.getProperty("mq.port"))); customRabbitMQProperties.setVirtualHost(environment.getProperty("mq.virtual.host")); customRabbitMQProperties.setChannelCacheSize(Integer.parseInt(environment.getProperty("mq.channel.cache.size"))); customRabbitMQProperties.setOpen(Boolean.parseBoolean(environment.getProperty("mq.ope"))); customRabbitMQProperties.setE_normal_oms_order_to_handle_customs_clearance(environment.getProperty("e.normal.oms.order.to.handle.customs.clearance")); customRabbitMQProperties.setQ_normal_oms_order_to_handle_customs_clearance(environment.getProperty("q.normal.oms.order.to.handle.customs.clearance")); customRabbitMQProperties.setK_normal_oms_order_to_handle_customs_clearance(environment.getProperty("k.normal.oms.order.to.handle.customs.clearance")); return customRabbitMQProperties; } /* ----------------------------------------------- 基础配置 --------------------------------------------- */ @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setUsername(customRabbitMQProperties().getUsername()); connectionFactory.setPassword(customRabbitMQProperties().getPassword()); connectionFactory.setHost(customRabbitMQProperties().getHost()); connectionFactory.setVirtualHost(customRabbitMQProperties().getVirtualHost()); connectionFactory.setPort(customRabbitMQProperties().getPort()); // 共用同一个Channel connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); // 获取配置的Channel缓存大小 connectionFactory.setChannelCacheSize(customRabbitMQProperties().getChannelCacheSize()); // 消息到达broke后触发回调 connectionFactory.setPublisherConfirms(true); return connectionFactory; } /** * SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型 * @return rabbit监听器容器工厂 */ @Bean public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setConnectionFactory(connectionFactory); containerFactory.setConcurrentConsumers(3); containerFactory.setMessageConverter(new MessageConverter() { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { return new String(message.getBody()); } }); // 手动ack containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return containerFactory; } @Bean public AmqpAdmin amqpAdmin () { AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory()); amqpAdmin.declareExchange(clearDirectExchange()); amqpAdmin.declareQueue(clearQueue()); amqpAdmin.declareBinding(clearBinding()); return amqpAdmin; } /* ----------------------------------------------- 队列配置 --------------------------------------------- */ @Bean public RabbitTemplate clearRabbitTemplate (ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance()); rabbitTemplate.setRoutingKey(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance()); rabbitTemplate.setConfirmCallback(new SimpleClearConfirmCallback()); rabbitTemplate.setReturnCallback(new SimpleClearReturnCallback()); return rabbitTemplate; } @Bean public DirectExchange clearDirectExchange () { return new DirectExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance()); } @Bean public Queue clearQueue () { return new Queue(customRabbitMQProperties().getQ_normal_oms_order_to_handle_customs_clearance()); } @Bean public Binding clearBinding () { return BindingBuilder.bind(clearQueue()).to(clearDirectExchange()).with(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance()); } }