1
0

RabbitMQConfig.java 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package com.kmall.admin.config.mq;
  2. import com.kmall.admin.config.mq.callback.SimpleClearConfirmCallback;
  3. import com.kmall.admin.config.mq.callback.SimpleClearReturnCallback;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  6. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  7. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  8. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
  11. import org.springframework.amqp.support.converter.MessageConversionException;
  12. import org.springframework.amqp.support.converter.MessageConverter;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.context.annotation.Bean;
  15. import org.springframework.context.annotation.Configuration;
  16. import org.springframework.context.annotation.PropertySource;
  17. import org.springframework.core.env.Environment;
  18. /**
  19. * 队列相关配置
  20. * req_20210826_001
  21. * @author lhm
  22. * @version 1.0
  23. * 2021-08-26 17:07
  24. */
  25. @Configuration
  26. @PropertySource(value = {"classpath:conf/mq.properties"})
  27. public class RabbitMQConfig {
  28. @Autowired
  29. private Environment environment;
  30. @Bean
  31. public CustomRabbitMQProperties customRabbitMQProperties () {
  32. CustomRabbitMQProperties customRabbitMQProperties = new CustomRabbitMQProperties();
  33. customRabbitMQProperties.setUsername(environment.getProperty("mq.username"));
  34. customRabbitMQProperties.setPassword(environment.getProperty("mq.password"));
  35. customRabbitMQProperties.setHost(environment.getProperty("mq.host"));
  36. customRabbitMQProperties.setPort(Integer.parseInt(environment.getProperty("mq.port")));
  37. customRabbitMQProperties.setVirtualHost(environment.getProperty("mq.virtual.host"));
  38. customRabbitMQProperties.setChannelCacheSize(Integer.parseInt(environment.getProperty("mq.channel.cache.size")));
  39. customRabbitMQProperties.setOpen(Boolean.parseBoolean(environment.getProperty("mq.ope")));
  40. customRabbitMQProperties.setE_normal_oms_order_to_handle_customs_clearance(environment.getProperty("e.normal.oms.order.to.handle.customs.clearance"));
  41. customRabbitMQProperties.setQ_normal_oms_order_to_handle_customs_clearance(environment.getProperty("q.normal.oms.order.to.handle.customs.clearance"));
  42. customRabbitMQProperties.setK_normal_oms_order_to_handle_customs_clearance(environment.getProperty("k.normal.oms.order.to.handle.customs.clearance"));
  43. return customRabbitMQProperties;
  44. }
  45. /* ----------------------------------------------- 基础配置 --------------------------------------------- */
  46. @Bean
  47. public ConnectionFactory connectionFactory () {
  48. CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  49. connectionFactory.setUsername(customRabbitMQProperties().getUsername());
  50. connectionFactory.setPassword(customRabbitMQProperties().getPassword());
  51. connectionFactory.setHost(customRabbitMQProperties().getHost());
  52. connectionFactory.setVirtualHost(customRabbitMQProperties().getVirtualHost());
  53. connectionFactory.setPort(customRabbitMQProperties().getPort());
  54. // 共用同一个Channel
  55. connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
  56. // 获取配置的Channel缓存大小
  57. connectionFactory.setChannelCacheSize(customRabbitMQProperties().getChannelCacheSize());
  58. // 消息到达broke后触发回调
  59. connectionFactory.setPublisherConfirms(true);
  60. return connectionFactory;
  61. }
  62. /**
  63. * SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型
  64. * @return rabbit监听器容器工厂
  65. */
  66. @Bean
  67. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  68. SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
  69. containerFactory.setConnectionFactory(connectionFactory);
  70. containerFactory.setConcurrentConsumers(3);
  71. containerFactory.setMessageConverter(new MessageConverter() {
  72. @Override
  73. public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
  74. return new Message(object.toString().getBytes(), messageProperties);
  75. }
  76. @Override
  77. public Object fromMessage(Message message) throws MessageConversionException {
  78. return new String(message.getBody());
  79. }
  80. });
  81. // 手动ack
  82. containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  83. return containerFactory;
  84. }
  85. @Bean
  86. public AmqpAdmin amqpAdmin () {
  87. AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory());
  88. amqpAdmin.declareExchange(clearDirectExchange());
  89. amqpAdmin.declareQueue(clearQueue());
  90. amqpAdmin.declareBinding(clearBinding());
  91. return amqpAdmin;
  92. }
  93. /* ----------------------------------------------- 队列配置 --------------------------------------------- */
  94. @Bean
  95. public RabbitTemplate clearRabbitTemplate (ConnectionFactory connectionFactory) {
  96. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  97. rabbitTemplate.setExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance());
  98. rabbitTemplate.setRoutingKey(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance());
  99. rabbitTemplate.setConfirmCallback(new SimpleClearConfirmCallback());
  100. rabbitTemplate.setReturnCallback(new SimpleClearReturnCallback());
  101. return rabbitTemplate;
  102. }
  103. @Bean
  104. public DirectExchange clearDirectExchange () {
  105. return new DirectExchange(customRabbitMQProperties().getE_normal_oms_order_to_handle_customs_clearance());
  106. }
  107. @Bean
  108. public Queue clearQueue () {
  109. return new Queue(customRabbitMQProperties().getQ_normal_oms_order_to_handle_customs_clearance());
  110. }
  111. @Bean
  112. public Binding clearBinding () {
  113. return BindingBuilder.bind(clearQueue()).to(clearDirectExchange()).with(customRabbitMQProperties().getK_normal_oms_order_to_handle_customs_clearance());
  114. }
  115. }