Selaa lähdekoodia

mqtt客户端掉线重连并重新订阅主题

lhm 3 vuotta sitten
vanhempi
commit
d4f1d048dd

+ 8 - 141
app/src/main/java/com/emato/ich/MainActivity.java

@@ -18,6 +18,7 @@ import com.emato.ich.local.LocalStorage;
 import com.emato.ich.message.ICHPublishClient;
 import com.emato.ich.message.ICHSubscribeClient;
 import com.emato.ich.message.ICHTopic;
+import com.emato.ich.message.MessageHandler;
 import com.emato.ich.utils.BaseUtils;
 import com.emato.ich.utils.JacksonUtils;
 import com.emato.ich.utils.LoggingUtils;
@@ -97,18 +98,22 @@ public class MainActivity extends AppCompatActivity  {
         ICHSubscribeClient ichSubscribeClient = ICHSubscribeClient.getInstance();
 
         ICHTopic.CLIENT_ID = BaseUtils.getClientId();
+
+        // 设置上下文
+        ICHSubscribeClient.context = getApplication();
+
         // 订阅主题
 //        ICHSubscribeClient ichSubscribeClient = ICHSubscribeClient.getInstance();
-        openLocker(ichSubscribeClient, ichPublishClient);
+        MessageHandler.openLocker(ichSubscribeClient, ichPublishClient);
 
         // 注册柜子信息
         cabinetInfoReport(ichPublishClient);
 
         // TODO 更新版本监听
-        autoUpdateVersion(ichSubscribeClient);
+        MessageHandler.autoUpdateVersion(getApplication(), ichSubscribeClient);
 
         // TODO 监听shell命令脚本
-        executeShell(ichSubscribeClient, ichPublishClient);
+        MessageHandler.executeShell(ichSubscribeClient, ichPublishClient);
     }
 
     private void getSystemConfig(){
@@ -147,31 +152,6 @@ public class MainActivity extends AppCompatActivity  {
         });
     }
 
-    private void autoUpdateVersion(ICHSubscribeClient ichSubscribeClient) {
-        ichSubscribeClient.subscribe(String.format(ICHTopic.APK_UPDATE_PATH, BaseUtils.getClientId()), (msgId, msg) -> {
-            try {
-//                 String path = new String(msg.getPayload());
-                // 发起请求下载APK
-                Log.i(TAG, "autoUpdateVersion: =====================>接收到服务器发来更新指令! 开始更新! ");
-                UpdateConfig updateConfig = EasyUpdate.getUpdateConfig(getApplication());
-                EasyUpdate.create(getBaseContext(), ICSPApi.GET_UPDATE_PATH.getUrl())
-                        .updateHttpService(AriaDownloader.getUpdateHttpService(getApplication()))
-                        .build()
-                        .update();
-
-//                XUpdate.newBuild(getApplicationContext())
-//                        .updateUrl(ICSPApi.GET_UPDATE_PATH.getUrl())
-//                        .isAutoMode(true) // 如果需要完全无人干预,自动更新,需要root权限【静默安装需要】
-//                        .update();
-                Log.i(TAG, "autoUpdateVersion: ====================================>更新中! ");
-//                APKUpdateDownload.getInstance().downloadAPK(MainActivity.this, getApplication(), path);
-            }  catch (Exception e) {
-                Log.e(TAG, "onCreate: =====================>更新版本监听失败! ", e);
-                LoggingUtils.sendErrorLog("业务异常: 更新版本监听失败! ", e);
-            }
-        });
-    }
-
     private UpdateEntity getUpdateEntityFromAssets() {
 //        UpdateEntity updateEntity = new UpdateEntity();
 //        updateEntity.setVersionCode(3);
@@ -181,100 +161,6 @@ public class MainActivity extends AppCompatActivity  {
         return new DefaultUpdateParser().parseJson(ResourceUtils.readStringFromAssert("update_test.json", getAssets()));
     }
 
-    private void executeShell(ICHSubscribeClient ichSubscribeClient, ICHPublishClient ichPublishClient){
-        ichSubscribeClient.subscribe(String.format(ICHTopic.EXECUTE_SHELL_SCRIPT, BaseUtils.getClientId()), (msgId, msg) -> {
-            if (msg != null) {
-                try {
-                    ShellVo shellVo = JacksonUtils.objectmapper.readValue(msg.getPayload(), ShellVo.class);
-                    ShellVo result = shellVo;
-                    if (shellVo != null && shellVo.getScript() != null && !"".equals(shellVo.getScript()) && shellVo.getScript().length() > 0) {
-                        Log.i(TAG, "executeShell: =====================>接到服务端传来命令: " + shellVo.getScript() + ", 参数: " + shellVo.getArgs());
-                        BaseUtils.executeShell(shellVo);
-                    } else {
-                        Log.i(TAG, "executeShell: =====================>接到服务端传来空命令, 不执行! ");
-                        result = new ShellVo();
-                        result.setResult("命令为空, 不可执行!");
-                    }
-                    MqttMessage mqttMessage = new MqttMessage();
-                    mqttMessage.setQos(1);
-                    mqttMessage.setPayload(JacksonUtils.objectmapper.writeValueAsString(result).getBytes());
-                    ichPublishClient.publish(String.format(ICHTopic.EXECUTE_SHELL_SCRIPT_RESPONSE, BaseUtils.getClientId()), mqttMessage);
-                    Log.i(TAG, "executeShell: =====================>发送命令执行结果成功! ");
-                } catch (Exception e) {
-                    Log.e(TAG, "onCreate: 发送执行脚本结果失败! ", e);
-                    LoggingUtils.sendErrorLog("业务异常: 发送执行脚本结果失败! ", e);
-                }
-            }
-        });
-    }
-
-    private void openLocker(ICHSubscribeClient ichSubscribeClient, ICHPublishClient ichPublishClient) {
-        ichSubscribeClient.subscribe(ICHTopic.LOCK + BaseUtils.getClientId(), (s, msg) -> {
-            String payload = new String(msg.getPayload());
-            ObjectMapper objectMapper = JacksonUtils.objectmapper;
-            Message message = objectMapper.readValue(payload, Message.class);
-            Log.i(TAG, "onCreate: message id: " + s + "--------------消息体: " + message);
-            Log.i(TAG, "openLocker: ==================>接到开锁消息: " + message);
-
-            DeviceControl.unlockLocker(message.getSection(), message.getPort(), (var1, var2) -> {
-                Log.i(TAG, "onCreate: 开锁返回码: " + var1 + "--------------返回消息: " + Arrays.asList(var2));
-                MqttMessage mqttMessage;
-                if (var1 != 0) {
-                    try {
-                        mqttMessage = wrapMessage(String.valueOf(var1), message, objectMapper);
-                        ichPublishClient.publish(String.format(ICHTopic.CALLBACK_FAILED, BaseUtils.getClientId()), mqttMessage);
-                        Log.i(TAG, "openLocker: ==================>发送开锁失败消息成功! 锁板id: " + message.getSection() + ", 端口: " + message.getPort());
-                    } catch (JsonProcessingException e) {
-                        Log.e(TAG, "onCreate: ---------------------序列化开锁错误消息失败! ", e);
-                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁错误消息失败! ", e);
-                    } catch (Exception e) {
-                        Log.e(TAG, "onCreate: ---------------------未知错误! ", e);
-                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁未知错误! ", e);
-                    }
-                } else {
-                    // TODO 暂时不做 成功需要不断去请求查询锁是否关闭, 关闭后推送成功消息
-//                    AtomicBoolean atomicBoolean = new AtomicBoolean(true);
-//                    AtomicInteger atomicInteger = new AtomicInteger(5);
-//                    do {
-//                        try {
-//                            TimeUnit.SECONDS.sleep(30);
-//                        } catch (InterruptedException e) {
-//                            Log.e(TAG, "onCreate: 等待查询锁状态时线程中断! ", e);
-//                        }
-//                        DeviceControl.queryLocker(message.getSection(), message.getPort(), (var3, var4) -> {
-//                            List<String> strings = Arrays.asList(var4);
-//                            if (var3 == 0 && strings.size() > 0 && !strings.get(0).equals("locked")) {
-//                                atomicBoolean.set(false);
-//                            }
-//                            atomicInteger.decrementAndGet();
-//                        });
-//                        if (atomicInteger.get() <= 0) {
-//                            atomicBoolean.set(false);
-//                        }
-//                    } while(atomicBoolean.get());
-                    try {
-                        Log.i(TAG, "onCreate: =====================>开锁成功! 锁板id: " + message.getSection() + ", 端口: " + message.getPort());
-                        mqttMessage = wrapMessage(String.valueOf(var1), message, objectMapper);
-//                        if (atomicInteger.get() <= 0) {
-//                            message.setCause("长时间未关闭柜门!");
-//                            mqttMessage.setPayload(objectMapper.writeValueAsBytes(message));
-//                            ichPublishClient.publish(String.format(ICHTopic.CALLBACK_FAILED, BaseUtils.getClientId()), mqttMessage);
-//                        }
-
-                        ichPublishClient.publish(String.format(ICHTopic.CALLBACK_SUCCESS, BaseUtils.getClientId()), mqttMessage);
-                    } catch (JsonProcessingException e) {
-                        Log.e(TAG, "onCreate: ---------------------序列化开锁成功消息失败! ", e);
-                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁成功消息失败! ", e);
-                    } catch (RuntimeException e) {
-                        Log.e(TAG, "onCreate: ---------------------序列化开锁成功消息未知错误! ", e);
-                        LoggingUtils.sendErrorLog("业务异常: 开锁未知错误! ", e);
-                    }
-                }
-            });
-
-        });
-    }
-
     private void cabinetInfoReport(ICHPublishClient ichPublishClient){
         String appVersion = BaseUtils.getAppVersion(getApplicationContext());
         String androidVersion = BaseUtils.getVersionName();
@@ -352,25 +238,6 @@ public class MainActivity extends AppCompatActivity  {
                 || super.onSupportNavigateUp();
     }
 
-    private MqttMessage wrapMessage(String code, Message message, ObjectMapper objectMapper) throws JsonProcessingException {
-        MqttMessage mqttMessage = new MqttMessage();
-        Message failedMsg = new Message();
-        // TODO 失败原因枚举
-        failedMsg.setCause(OpenLockerResponseEnum.getResponseInfo(code));
-        failedMsg.setMessageId(message.getMessageId());
-        failedMsg.setClientId(message.getClientId());
-        failedMsg.setCmd(message.getCmd());
-        failedMsg.setDatetime(message.getDatetime());
-        failedMsg.setPort(message.getPort());
-        failedMsg.setScene(message.getScene());
-        failedMsg.setSection(message.getSection());
-        failedMsg.setMsgId(message.getMsgId());
-        mqttMessage.setQos(1);
-        mqttMessage.setPayload(objectMapper.writeValueAsBytes(failedMsg));
-
-        return mqttMessage;
-    }
-
     public Map<String, Bundle> getBundleMap() {
         return bundleMap;
     }

+ 6 - 0
app/src/main/java/com/emato/ich/message/ICHSubscribeClient.java

@@ -1,5 +1,7 @@
 package com.emato.ich.message;
 
+import android.content.Context;
+
 import com.emato.ich.utils.Log;
 
 import com.emato.ich.contant.ICSPConstant;
@@ -24,6 +26,7 @@ public class ICHSubscribeClient {
     public static final String TAG = "ICHSubscribeClient";
     public static final String userName = "admin";
     public static final String password = "public";
+    public static Context context;
 
     private MqttClient client;// 客户端实例
     private MqttTopic topic;// 主题实例
@@ -70,6 +73,9 @@ public class ICHSubscribeClient {
                 Log.e(TAG, "connectionLost: cause: ", cause);
                 connect();
                 LoggingUtils.sendErrorLog("mqtt-subscribe客户端断线!", cause);
+                MessageHandler.executeShell(getInstance(), ICHPublishClient.getInstance());
+                MessageHandler.openLocker(getInstance(), ICHPublishClient.getInstance());
+                MessageHandler.autoUpdateVersion(context, getInstance());
             }
 
             @Override

+ 183 - 0
app/src/main/java/com/emato/ich/message/MessageHandler.java

@@ -0,0 +1,183 @@
+package com.emato.ich.message;
+
+import android.content.Context;
+
+import com.emato.ich.api.ICSPApi;
+import com.emato.ich.contant.OpenLockerResponseEnum;
+import com.emato.ich.device.DeviceControl;
+import com.emato.ich.entity.Message;
+import com.emato.ich.entity.vo.ShellVo;
+import com.emato.ich.utils.BaseUtils;
+import com.emato.ich.utils.JacksonUtils;
+import com.emato.ich.utils.Log;
+import com.emato.ich.utils.LoggingUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.xuexiang.xupdate.aria.AriaDownloader;
+import com.xuexiang.xupdate.easy.EasyUpdate;
+import com.xuexiang.xupdate.easy.config.UpdateConfig;
+
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+import java.util.Arrays;
+
+/**
+ * 消息处理
+ */
+public class MessageHandler {
+
+    public static final String TAG = MessageHandler.class.getName();
+
+    /**
+     * 自动更新监听
+     * @param context               程序上下文
+     * @param ichSubscribeClient    监听客户端
+     */
+    public static void autoUpdateVersion(Context context, ICHSubscribeClient ichSubscribeClient) {
+        ichSubscribeClient.subscribe(String.format(ICHTopic.APK_UPDATE_PATH, BaseUtils.getClientId()), (msgId, msg) -> {
+            try {
+//                 String path = new String(msg.getPayload());
+                // 发起请求下载APK
+                Log.i(TAG, "autoUpdateVersion: =====================>接收到服务器发来更新指令! 开始更新! ");
+                UpdateConfig updateConfig = EasyUpdate.getUpdateConfig(context);
+                EasyUpdate.create(context, ICSPApi.GET_UPDATE_PATH.getUrl())
+                        .updateHttpService(AriaDownloader.getUpdateHttpService(context))
+                        .build()
+                        .update();
+
+//                XUpdate.newBuild(getApplicationContext())
+//                        .updateUrl(ICSPApi.GET_UPDATE_PATH.getUrl())
+//                        .isAutoMode(true) // 如果需要完全无人干预,自动更新,需要root权限【静默安装需要】
+//                        .update();
+                Log.i(TAG, "autoUpdateVersion: ====================================>更新中! ");
+//                APKUpdateDownload.getInstance().downloadAPK(MainActivity.this, getApplication(), path);
+            }  catch (Exception e) {
+                Log.e(TAG, "onCreate: =====================>更新版本监听失败! ", e);
+                LoggingUtils.sendErrorLog("业务异常: 更新版本监听失败! ", e);
+            }
+        });
+    }
+
+    /**
+     * 执行shell命令监听
+     * @param ichSubscribeClient    监听客户端
+     * @param ichPublishClient      发布客户端
+     */
+    public static void executeShell(ICHSubscribeClient ichSubscribeClient, ICHPublishClient ichPublishClient){
+        ichSubscribeClient.subscribe(String.format(ICHTopic.EXECUTE_SHELL_SCRIPT, BaseUtils.getClientId()), (msgId, msg) -> {
+            if (msg != null) {
+                try {
+                    ShellVo shellVo = JacksonUtils.objectmapper.readValue(msg.getPayload(), ShellVo.class);
+                    ShellVo result = shellVo;
+                    if (shellVo != null && shellVo.getScript() != null && !"".equals(shellVo.getScript()) && shellVo.getScript().length() > 0) {
+                        Log.i(TAG, "executeShell: =====================>接到服务端传来命令: " + shellVo.getScript() + ", 参数: " + shellVo.getArgs());
+                        BaseUtils.executeShell(shellVo);
+                    } else {
+                        Log.i(TAG, "executeShell: =====================>接到服务端传来空命令, 不执行! ");
+                        result = new ShellVo();
+                        result.setResult("命令为空, 不可执行!");
+                    }
+                    MqttMessage mqttMessage = new MqttMessage();
+                    mqttMessage.setQos(1);
+                    mqttMessage.setPayload(JacksonUtils.objectmapper.writeValueAsString(result).getBytes());
+                    ichPublishClient.publish(String.format(ICHTopic.EXECUTE_SHELL_SCRIPT_RESPONSE, BaseUtils.getClientId()), mqttMessage);
+                    Log.i(TAG, "executeShell: =====================>发送命令执行结果成功! ");
+                } catch (Exception e) {
+                    Log.e(TAG, "onCreate: 发送执行脚本结果失败! ", e);
+                    LoggingUtils.sendErrorLog("业务异常: 发送执行脚本结果失败! ", e);
+                }
+            }
+        });
+    }
+
+    /**
+     * 开锁监听
+     * @param ichSubscribeClient    监听客户端
+     * @param ichPublishClient      发布客户端
+     */
+    public static void openLocker(ICHSubscribeClient ichSubscribeClient, ICHPublishClient ichPublishClient) {
+        ichSubscribeClient.subscribe(ICHTopic.LOCK + BaseUtils.getClientId(), (s, msg) -> {
+            String payload = new String(msg.getPayload());
+            ObjectMapper objectMapper = JacksonUtils.objectmapper;
+            Message message = objectMapper.readValue(payload, Message.class);
+            Log.i(TAG, "onCreate: message id: " + s + "--------------消息体: " + message);
+            Log.i(TAG, "openLocker: ==================>接到开锁消息: " + message);
+
+            DeviceControl.unlockLocker(message.getSection(), message.getPort(), (var1, var2) -> {
+                Log.i(TAG, "onCreate: 开锁返回码: " + var1 + "--------------返回消息: " + Arrays.asList(var2));
+                MqttMessage mqttMessage;
+                if (var1 != 0) {
+                    try {
+                        mqttMessage = wrapMessage(String.valueOf(var1), message, objectMapper);
+                        ichPublishClient.publish(String.format(ICHTopic.CALLBACK_FAILED, BaseUtils.getClientId()), mqttMessage);
+                        Log.i(TAG, "openLocker: ==================>发送开锁失败消息成功! 锁板id: " + message.getSection() + ", 端口: " + message.getPort());
+                    } catch (JsonProcessingException e) {
+                        Log.e(TAG, "onCreate: ---------------------序列化开锁错误消息失败! ", e);
+                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁错误消息失败! ", e);
+                    } catch (Exception e) {
+                        Log.e(TAG, "onCreate: ---------------------未知错误! ", e);
+                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁未知错误! ", e);
+                    }
+                } else {
+                    // TODO 暂时不做 成功需要不断去请求查询锁是否关闭, 关闭后推送成功消息
+//                    AtomicBoolean atomicBoolean = new AtomicBoolean(true);
+//                    AtomicInteger atomicInteger = new AtomicInteger(5);
+//                    do {
+//                        try {
+//                            TimeUnit.SECONDS.sleep(30);
+//                        } catch (InterruptedException e) {
+//                            Log.e(TAG, "onCreate: 等待查询锁状态时线程中断! ", e);
+//                        }
+//                        DeviceControl.queryLocker(message.getSection(), message.getPort(), (var3, var4) -> {
+//                            List<String> strings = Arrays.asList(var4);
+//                            if (var3 == 0 && strings.size() > 0 && !strings.get(0).equals("locked")) {
+//                                atomicBoolean.set(false);
+//                            }
+//                            atomicInteger.decrementAndGet();
+//                        });
+//                        if (atomicInteger.get() <= 0) {
+//                            atomicBoolean.set(false);
+//                        }
+//                    } while(atomicBoolean.get());
+                    try {
+                        Log.i(TAG, "onCreate: =====================>开锁成功! 锁板id: " + message.getSection() + ", 端口: " + message.getPort());
+                        mqttMessage = wrapMessage(String.valueOf(var1), message, objectMapper);
+//                        if (atomicInteger.get() <= 0) {
+//                            message.setCause("长时间未关闭柜门!");
+//                            mqttMessage.setPayload(objectMapper.writeValueAsBytes(message));
+//                            ichPublishClient.publish(String.format(ICHTopic.CALLBACK_FAILED, BaseUtils.getClientId()), mqttMessage);
+//                        }
+
+                        ichPublishClient.publish(String.format(ICHTopic.CALLBACK_SUCCESS, BaseUtils.getClientId()), mqttMessage);
+                    } catch (JsonProcessingException e) {
+                        Log.e(TAG, "onCreate: ---------------------序列化开锁成功消息失败! ", e);
+                        LoggingUtils.sendErrorLog("业务异常: 序列化开锁成功消息失败! ", e);
+                    } catch (RuntimeException e) {
+                        Log.e(TAG, "onCreate: ---------------------序列化开锁成功消息未知错误! ", e);
+                        LoggingUtils.sendErrorLog("业务异常: 开锁未知错误! ", e);
+                    }
+                }
+            });
+
+        });
+    }
+
+    private static MqttMessage wrapMessage(String code, Message message, ObjectMapper objectMapper) throws JsonProcessingException {
+        MqttMessage mqttMessage = new MqttMessage();
+        Message failedMsg = new Message();
+        // TODO 失败原因枚举
+        failedMsg.setCause(OpenLockerResponseEnum.getResponseInfo(code));
+        failedMsg.setMessageId(message.getMessageId());
+        failedMsg.setClientId(message.getClientId());
+        failedMsg.setCmd(message.getCmd());
+        failedMsg.setDatetime(message.getDatetime());
+        failedMsg.setPort(message.getPort());
+        failedMsg.setScene(message.getScene());
+        failedMsg.setSection(message.getSection());
+        failedMsg.setMsgId(message.getMsgId());
+        mqttMessage.setQos(1);
+        mqttMessage.setPayload(objectMapper.writeValueAsBytes(failedMsg));
+
+        return mqttMessage;
+    }
+}