Browse Source

优化代码

lhm 3 years ago
parent
commit
2c3cff37c0

+ 5 - 13
src/main/java/com/emato/file/tunnel/config/RabbitMQConfig.java

@@ -3,7 +3,7 @@ package com.emato.file.tunnel.config;
 import com.emato.file.tunnel.config.properties.RabbitMQProperties;
 import com.emato.file.tunnel.config.properties.RabbitMQProperties;
 import com.emato.file.tunnel.mq.callback.SimpleConfirmCallBack;
 import com.emato.file.tunnel.mq.callback.SimpleConfirmCallBack;
 import com.emato.file.tunnel.mq.callback.SimpleReturnsCallBack;
 import com.emato.file.tunnel.mq.callback.SimpleReturnsCallBack;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.emato.file.tunnel.mq.converter.CustomMessageConverter;
 import org.springframework.amqp.core.*;
 import org.springframework.amqp.core.*;
 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -18,6 +18,8 @@ import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
 
 
+import java.nio.charset.StandardCharsets;
+
 /**
 /**
  * @author lhm
  * @author lhm
  * @version 1.0
  * @version 1.0
@@ -58,18 +60,8 @@ public class RabbitMQConfig {
     public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
     public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
         SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
         SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
         containerFactory.setConnectionFactory(connectionFactory);
         containerFactory.setConnectionFactory(connectionFactory);
-        containerFactory.setConcurrentConsumers(3);
-        containerFactory.setMessageConverter(new MessageConverter() {
-            @Override
-            public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
-                return new Message(object.toString().getBytes(), messageProperties);
-            }
-
-            @Override
-            public Object fromMessage(Message message) throws MessageConversionException {
-                return new String(message.getBody());
-            }
-        });
+        containerFactory.setMaxConcurrentConsumers(3);
+        containerFactory.setMessageConverter(new CustomMessageConverter());
         containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
         containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
         return containerFactory;
         return containerFactory;
     }
     }

+ 2 - 2
src/main/java/com/emato/file/tunnel/listener/ReportCEB621Listener.java

@@ -51,9 +51,9 @@ public class ReportCEB621Listener {
             String content = jsonObject.get("msgFileContent", String.class);
             String content = jsonObject.get("msgFileContent", String.class);
             String path = cPortProperties.getPushDir()  + "\\" + msgFileName;
             String path = cPortProperties.getPushDir()  + "\\" + msgFileName;
             // 去除空格和换行符
             // 去除空格和换行符
+            content = XmlUtil.cleanComment(content);
+            content = XmlUtil.cleanInvalid(content);
             content = content.trim();
             content = content.trim();
-            XmlUtil.cleanInvalid(content);
-            XmlUtil.cleanComment(content);
             File file = FileUtil.writeString(content, path, StandardCharsets.UTF_8);
             File file = FileUtil.writeString(content, path, StandardCharsets.UTF_8);
             channel.basicAck(tag, false);
             channel.basicAck(tag, false);
             log.info("----- 监听621报文上报, 文件: {} , 上报结束! -----", file.getName());
             log.info("----- 监听621报文上报, 文件: {} , 上报结束! -----", file.getName());

+ 41 - 0
src/main/java/com/emato/file/tunnel/mq/converter/CustomMessageConverter.java

@@ -0,0 +1,41 @@
+package com.emato.file.tunnel.mq.converter;
+
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageProperties;
+import org.springframework.amqp.support.converter.MessageConversionException;
+import org.springframework.amqp.support.converter.MessageConverter;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author lhm
+ * @version 1.0
+ * 2021-09-07 10:01
+ */
+public class CustomMessageConverter implements MessageConverter {
+
+    /**
+     * Convert a Java object to a Message.
+     *
+     * @param object            the object to convert
+     * @param messageProperties The message properties.
+     * @return the Message
+     * @throws MessageConversionException in case of conversion failure
+     */
+    @Override
+    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
+        return new Message(object.toString().getBytes(StandardCharsets.UTF_8), messageProperties);
+    }
+
+    /**
+     * Convert from a Message to a Java object.
+     *
+     * @param message the message to convert
+     * @return the converted Java object
+     * @throws MessageConversionException in case of conversion failure
+     */
+    @Override
+    public Object fromMessage(Message message) throws MessageConversionException {
+        return new String(message.getBody(), StandardCharsets.UTF_8);
+    }
+}

+ 47 - 42
src/main/java/com/emato/file/tunnel/scheduler/ScanPullDirectoryTask.java

@@ -56,48 +56,8 @@ public class ScanPullDirectoryTask {
             }
             }
             // 获取回执目录的所有文件名
             // 获取回执目录的所有文件名
             List<String> pullFilePaths = fileNames.parallelStream().map(fileName -> pullDir + "\\" + fileName).filter(fileName -> fileName.indexOf("xml") > 0 && fileName.contains("CEB622Message")).collect(Collectors.toList());
             List<String> pullFilePaths = fileNames.parallelStream().map(fileName -> pullDir + "\\" + fileName).filter(fileName -> fileName.indexOf("xml") > 0 && fileName.contains("CEB622Message")).collect(Collectors.toList());
-            if (CollectionUtils.isEmpty(pullFilePaths)) {
-                return;
-            }
-            log.info("----- 扫描报文目录, 开始扫描目录: {} -----", pullDir);
-            pullFilePaths.forEach(pullFilePath -> {
-                File file = FileUtil.file(pullFilePath);
-                HashMap<String, String> map = new HashMap<>();
-                String fileName = file.getName();
-                InputStream inputStream = null;
-                try {
-                    if (file.exists()) {
-                        inputStream = new FileInputStream(file);
-                        Document document = XmlUtil.readXML(inputStream);
-                        boolean delete = file.delete();
-                        String content = XmlUtil.format(document);
-                        // 报文回执信息发送给csp
-                        map.put("fileName", fileName);
-                        map.put("fileContent", content);
-                        Object message = JSONUtil.toJsonStr(map);
-                        CorrelationData correlationData = new CorrelationData();
-                        correlationData.setId(fileName);
-                        receiptRabbitTemplate.convertAndSend(rabbitMQProperties.getK_normal_tunnel_to_csp_receipt(), message, correlationData);
-                        if (!delete) {
-                            log.error("----- 扫描报文目录, 文件: {}, 删除失败! -----", fileName);
-                        }
-                    }
-                } catch (UtilException e) {
-                    log.error("----- 扫描报文目录, 解析该文件: {}, 出现错误: {} -----", fileName, e.getMessage());
-                } catch (FileNotFoundException e) {
-                    log.error("----- 扫描报文目录, 文件【{}】不存在! -----", pullFilePath);
-                } catch (Exception e) {
-                    log.error(String.format("----- 扫描报文目录, 未知异常: %s -----", e));
-                } finally {
-                    if (null != inputStream) {
-                        try {
-                            inputStream.close();
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                }
-            });
+
+            writeMessageFile(pullFilePaths, pullDir);
 
 
             int size = CollectionUtils.isEmpty(pullFilePaths) ? 0 : pullFilePaths.size();
             int size = CollectionUtils.isEmpty(pullFilePaths) ? 0 : pullFilePaths.size();
             log.info("----- 扫描报文目录, 扫描目录完成, 回执文件共 {} 个 -----", size);
             log.info("----- 扫描报文目录, 扫描目录完成, 回执文件共 {} 个 -----", size);
@@ -106,6 +66,51 @@ public class ScanPullDirectoryTask {
         }
         }
     }
     }
 
 
+    private void writeMessageFile (List<String> pullFilePaths, String pullDir) {
+        if (CollectionUtils.isEmpty(pullFilePaths)) {
+            return;
+        }
+        log.info("----- 扫描报文目录, 开始扫描目录: {} -----", pullDir);
+        pullFilePaths.forEach(pullFilePath -> {
+            File file = FileUtil.file(pullFilePath);
+            HashMap<String, String> map = new HashMap<>();
+            String fileName = file.getName();
+            InputStream inputStream = null;
+            try {
+                if (file.exists()) {
+                    inputStream = new FileInputStream(file);
+                    Document document = XmlUtil.readXML(inputStream);
+                    boolean delete = file.delete();
+                    String content = XmlUtil.format(document);
+                    // 报文回执信息发送给csp
+                    map.put("fileName", fileName);
+                    map.put("fileContent", content);
+                    Object message = JSONUtil.toJsonStr(map);
+                    CorrelationData correlationData = new CorrelationData();
+                    correlationData.setId(fileName);
+                    receiptRabbitTemplate.convertAndSend(rabbitMQProperties.getK_normal_tunnel_to_csp_receipt(), message, correlationData);
+                    if (!delete) {
+                        log.error("----- 扫描报文目录, 文件: {}, 删除失败! -----", fileName);
+                    }
+                }
+            } catch (UtilException e) {
+                log.error("----- 扫描报文目录, 解析该文件: {}, 出现错误: {} -----", fileName, e.getMessage());
+            } catch (FileNotFoundException e) {
+                log.error("----- 扫描报文目录, 文件【{}】不存在! -----", pullFilePath);
+            } catch (Exception e) {
+                log.error(String.format("----- 扫描报文目录, 未知异常: %s -----", e));
+            } finally {
+                if (null != inputStream) {
+                    try {
+                        inputStream.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        });
+    }
+
 //    @Scheduled(fixedRate = 10000)
 //    @Scheduled(fixedRate = 10000)
     public void sendCEB621 (int size) {
     public void sendCEB621 (int size) {
         String content = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
         String content = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +