Browse Source

消息上报以及扫描回执优化,添加各个环境配置

lhm 3 years ago
parent
commit
b7fa8e42a8

+ 24 - 4
src/main/java/com/emato/file/tunnel/listener/ReportCEB621Listener.java

@@ -1,8 +1,10 @@
 package com.emato.file.tunnel.listener;
 package com.emato.file.tunnel.listener;
 
 
+import cn.hutool.core.convert.ConvertException;
 import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.io.FileUtil;
-import cn.hutool.core.lang.UUID;
+import cn.hutool.core.io.IORuntimeException;
 import cn.hutool.core.util.XmlUtil;
 import cn.hutool.core.util.XmlUtil;
+import cn.hutool.json.JSONException;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONObject;
 import cn.hutool.json.JSONUtil;
 import cn.hutool.json.JSONUtil;
 import com.emato.file.tunnel.config.properties.CPortProperties;
 import com.emato.file.tunnel.config.properties.CPortProperties;
@@ -18,7 +20,6 @@ import org.springframework.stereotype.Component;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
-import java.util.regex.Pattern;
 
 
 /**
 /**
  * @author lhm
  * @author lhm
@@ -50,16 +51,35 @@ public class ReportCEB621Listener {
             String content = jsonObject.get("msgFileContent", String.class);
             String content = jsonObject.get("msgFileContent", String.class);
             String path = cPortProperties.getPushDir()  + "\\" + msgFileName;
             String path = cPortProperties.getPushDir()  + "\\" + msgFileName;
             // 去除空格和换行符
             // 去除空格和换行符
+            content = content.trim();
             XmlUtil.cleanInvalid(content);
             XmlUtil.cleanInvalid(content);
             XmlUtil.cleanComment(content);
             XmlUtil.cleanComment(content);
             File file = FileUtil.writeString(content, path, StandardCharsets.UTF_8);
             File file = FileUtil.writeString(content, path, StandardCharsets.UTF_8);
             channel.basicAck(tag, false);
             channel.basicAck(tag, false);
             log.info("----- 监听621报文上报, 文件: {} , 上报结束! -----", file.getName());
             log.info("----- 监听621报文上报, 文件: {} , 上报结束! -----", file.getName());
+        } catch (JSONException e) {
+            requeue(channel, tag);
+            log.error("----- 监听621报文上报, json解析失败! 消息已重新入队! 消息体: {}, 异常: {} -----", message, e);
+        } catch (IORuntimeException e) {
+            requeue(channel, tag);
+            log.error("----- 监听621报文上报, 写入报文文件时出现异常! 消息已重新入队! 消息体: {}, 异常: {} -----", message, e);
+        } catch (ConvertException e) {
+            requeue(channel, tag);
+            log.error("----- 监听621报文上报, json转换失败! 消息已重新入队! 消息体: {}, 异常: {} -----", message, e);
         } catch (Exception e) {
         } catch (Exception e) {
-            channel.basicNack(tag, false, true);
-            log.error(String.format("----- 监听621报文上报, 上报异常! 消息已重新入队! 消息体: %s, 异常: %s -----", message, e));
+            channel.basicAck(tag, false);
+            log.error("----- 监听621报文上报, 上报出现未知异常! 消息已丢弃! 消息体: {}, 异常: {} -----", message, e);
         }
         }
     }
     }
 
 
 
 
+    private void requeue (Channel channel, long tag) {
+        try {
+            channel.basicNack(tag, false, true);
+        } catch (IOException ioException) {
+            log.error("----- 监听621报文上报, 消息重新入队失败! -----", ioException);
+            ioException.printStackTrace();
+        }
+    }
+
 }
 }

+ 4 - 2
src/main/java/com/emato/file/tunnel/mq/callback/SimpleConfirmCallBack.java

@@ -24,8 +24,10 @@ public class SimpleConfirmCallBack implements RabbitTemplate.ConfirmCallback {
      */
      */
     @Override
     @Override
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-        if (!ack) {
-            log.info("----- 消息唯一标识: {}, 确认结果: {}, 失败原因: {} -----", correlationData, ack, cause);
+        if (ack) {
+            log.info("----- 报文文件【{}】的数据发送成功! -----", correlationData.getId());
+        } else {
+            log.info("----- 报文文件【{}】的数据发送失败! 失败原因: {} -----", correlationData.getId(), cause);
         }
         }
     }
     }
 }
 }

+ 57 - 47
src/main/java/com/emato/file/tunnel/scheduler/ScanPullDirectoryTask.java

@@ -8,6 +8,7 @@ import cn.hutool.json.JSONUtil;
 import com.emato.file.tunnel.config.properties.CPortProperties;
 import com.emato.file.tunnel.config.properties.CPortProperties;
 import com.emato.file.tunnel.config.properties.RabbitMQProperties;
 import com.emato.file.tunnel.config.properties.RabbitMQProperties;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -43,57 +44,66 @@ public class ScanPullDirectoryTask {
 
 
     @Scheduled(fixedRate = 3000)
     @Scheduled(fixedRate = 3000)
     public void scanDirectory() {
     public void scanDirectory() {
-        String pullDir = cPortProperties.getPullDir();
-        log.info("----- 开始扫描目录: {} -----", pullDir);
-        List<String> fileNames = FileUtil.listFileNames(pullDir);
-        // 根据设置的值处理多少个文件
-        if (CollectionUtil.isEmpty(fileNames)) {
-            log.info("----- 扫描目录完成, 暂无回执文件! -----");
-            return;
-        }
-        if (fileNames.size() >= cPortProperties.getScanHandleSize()) {
-            fileNames = fileNames.subList(0, cPortProperties.getScanHandleSize());
-        }
-        // 获取回执目录的所有文件名
-        List<String> pullFilePaths = fileNames.parallelStream().map(fileName -> pullDir + "\\" + fileName).filter(fileName -> fileName.indexOf("xml") > 0).collect(Collectors.toList());
-        pullFilePaths.forEach(pullFilePath -> {
-            File file = FileUtil.file(pullFilePath);
-            HashMap<String, String> map = new HashMap<>();
-            String fileName = file.getName();
-            InputStream inputStream = null;
-            try {
-                if (file.exists()) {
-                    inputStream = new FileInputStream(file);
-                    Document document = XmlUtil.readXML(inputStream);
-                    boolean delete = file.delete();
-                    String content = XmlUtil.format(document);
-                    // 报文回执信息发送给csp
-                    map.put("fileName", fileName);
-                    map.put("fileContent", content);
-                    receiptRabbitTemplate.convertAndSend(JSONUtil.toJsonStr(map));
-                    if (!delete) {
-                        log.error("----- 文件: {}, 删除失败! -----", fileName);
+        try {
+            String pullDir = cPortProperties.getPullDir();
+            List<String> fileNames = FileUtil.listFileNames(pullDir);
+            // 根据设置的值处理多少个文件
+            if (CollectionUtil.isEmpty(fileNames)) {
+                return;
+            }
+            if (fileNames.size() >= cPortProperties.getScanHandleSize()) {
+                fileNames = fileNames.subList(0, cPortProperties.getScanHandleSize());
+            }
+            // 获取回执目录的所有文件名
+            List<String> pullFilePaths = fileNames.parallelStream().map(fileName -> pullDir + "\\" + fileName).filter(fileName -> fileName.indexOf("xml") > 0 && fileName.contains("CEB622Message")).collect(Collectors.toList());
+            if (CollectionUtils.isEmpty(pullFilePaths)) {
+                return;
+            }
+            log.info("----- 扫描报文目录, 开始扫描目录: {} -----", pullDir);
+            pullFilePaths.forEach(pullFilePath -> {
+                File file = FileUtil.file(pullFilePath);
+                HashMap<String, String> map = new HashMap<>();
+                String fileName = file.getName();
+                InputStream inputStream = null;
+                try {
+                    if (file.exists()) {
+                        inputStream = new FileInputStream(file);
+                        Document document = XmlUtil.readXML(inputStream);
+                        boolean delete = file.delete();
+                        String content = XmlUtil.format(document);
+                        // 报文回执信息发送给csp
+                        map.put("fileName", fileName);
+                        map.put("fileContent", content);
+                        Object message = JSONUtil.toJsonStr(map);
+                        CorrelationData correlationData = new CorrelationData();
+                        correlationData.setId(fileName);
+                        receiptRabbitTemplate.convertAndSend(rabbitMQProperties.getK_normal_tunnel_to_csp_receipt(), message, correlationData);
+                        if (!delete) {
+                            log.error("----- 扫描报文目录, 文件: {}, 删除失败! -----", fileName);
+                        }
                     }
                     }
-                }
-            } catch (UtilException e) {
-                log.error("----- 解析该文件: {}, 出现错误: {} -----", fileName, e.getMessage());
-            } catch (FileNotFoundException e) {
-                log.error("----- 文件【{}】不存在! -----", pullFilePath);
-            } catch (Exception e) {
-                log.error(String.format("----- 未知异常: %s -----", e));
-            } finally {
-                if (null != inputStream) {
-                    try {
-                        inputStream.close();
-                    } catch (IOException e) {
-                        e.printStackTrace();
+                } catch (UtilException e) {
+                    log.error("----- 扫描报文目录, 解析该文件: {}, 出现错误: {} -----", fileName, e.getMessage());
+                } catch (FileNotFoundException e) {
+                    log.error("----- 扫描报文目录, 文件【{}】不存在! -----", pullFilePath);
+                } catch (Exception e) {
+                    log.error(String.format("----- 扫描报文目录, 未知异常: %s -----", e));
+                } finally {
+                    if (null != inputStream) {
+                        try {
+                            inputStream.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
                     }
                     }
                 }
                 }
-            }
-        });
+            });
 
 
-        int size = CollectionUtils.isEmpty(pullFilePaths) ? 0 : pullFilePaths.size();
-        log.info("----- 扫描目录完成, 回执文件共 {} 个 -----", size);
+            int size = CollectionUtils.isEmpty(pullFilePaths) ? 0 : pullFilePaths.size();
+            log.info("----- 扫描报文目录, 扫描目录完成, 回执文件共 {} 个 -----", size);
+        } catch (Throwable e) {
+            log.error("------ 扫描报文目录, 扫描目录出现未知错误! -----", e);
+        }
     }
     }
 
 
 //    @Scheduled(fixedRate = 10000)
 //    @Scheduled(fixedRate = 10000)

+ 22 - 7
src/main/resources/application.properties

@@ -5,23 +5,38 @@ spring.rabbitmq.host=127.0.0.1
 spring.rabbitmq.username=guest
 spring.rabbitmq.username=guest
 spring.rabbitmq.password=guest
 spring.rabbitmq.password=guest
 spring.rabbitmq.virtual-host=/
 spring.rabbitmq.virtual-host=/
+#\u4E0A\u62A5\u76EE\u5F55
+cport.push_dir=D:\\report\\report
+#\u56DE\u6267\u76EE\u5F55
+cport.pull_dir=D:\\report\\receipt
+
 # \u6D4B\u8BD5\u73AF\u5883mq\u76F8\u5173\u914D\u7F6E
 # \u6D4B\u8BD5\u73AF\u5883mq\u76F8\u5173\u914D\u7F6E
 #spring.rabbitmq.port=5672
 #spring.rabbitmq.port=5672
 #spring.rabbitmq.host=120.24.174.90
 #spring.rabbitmq.host=120.24.174.90
 #spring.rabbitmq.username=admin
 #spring.rabbitmq.username=admin
 #spring.rabbitmq.password=Abc-123#
 #spring.rabbitmq.password=Abc-123#
 #spring.rabbitmq.virtual-host=/
 #spring.rabbitmq.virtual-host=/
+#\u4E0A\u62A5\u76EE\u5F55
+#cport.push_dir=D:\\report\\report
+#\u56DE\u6267\u76EE\u5F55
+#cport.pull_dir=D:\\report\\receipt
+
+# \u751F\u4EA7\u73AF\u5883mq\u76F8\u5173\u914D\u7F6E
+#spring.rabbitmq.port=5672
+#spring.rabbitmq.host=120.76.26.84
+#spring.rabbitmq.username=admin
+#spring.rabbitmq.password=Abc-123#
+#spring.rabbitmq.virtual-host=/
+#\u4E0A\u62A5\u76EE\u5F55
+#cport.push_dir=D:\\report\\report
+#\u56DE\u6267\u76EE\u5F55
+#cport.pull_dir=D:\\report\\receipt
+
+# \u901A\u7528\u914D\u7F6E
 
 
 # \u5FC3\u8DF3\u8D85\u65F6\u65F6\u95F4 \u5355\u4F4D: s
 # \u5FC3\u8DF3\u8D85\u65F6\u65F6\u95F4 \u5355\u4F4D: s
 spring.rabbitmq.requested-heartbeat=60
 spring.rabbitmq.requested-heartbeat=60
 # \u4FE1\u9053\u53EF\u7F13\u5B58\u7684\u6D88\u606F\u4E2A\u6570
 # \u4FE1\u9053\u53EF\u7F13\u5B58\u7684\u6D88\u606F\u4E2A\u6570
 spring.rabbitmq.cache.channel.size=50
 spring.rabbitmq.cache.channel.size=50
-
-# \u6DF1\u5173\u901A\u626B\u63CF\u7684\u76EE\u5F55
-#cport.push_dir=H:\\lhm\\push
-cport.push_dir=D:\\report\\report
-# \u6DF1\u5173\u901A\u56DE\u6267\u76EE\u5F55
-#cport.pull_dir=H:\\lhm\\pull
-cport.pull_dir=D:\\report\\receipt
 # \u8BBE\u7F6E\u56DE\u6267\u5355\u6B21\u5904\u7406\u7684\u4E2A\u6570
 # \u8BBE\u7F6E\u56DE\u6267\u5355\u6B21\u5904\u7406\u7684\u4E2A\u6570
 cport.scan_handle_size=90
 cport.scan_handle_size=90