ICHSubscribeClient.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package com.emato.ich.message;
  2. import android.content.Context;
  3. import com.emato.ich.utils.Log;
  4. import com.emato.ich.contant.ICSPConstant;
  5. import com.emato.ich.utils.BaseUtils;
  6. import com.emato.ich.utils.LoggingUtils;
  7. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  8. import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
  9. import org.eclipse.paho.client.mqttv3.MqttCallback;
  10. import org.eclipse.paho.client.mqttv3.MqttClient;
  11. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  12. import org.eclipse.paho.client.mqttv3.MqttException;
  13. import org.eclipse.paho.client.mqttv3.MqttMessage;
  14. import org.eclipse.paho.client.mqttv3.MqttTopic;
  15. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  16. /**
  17. * 订阅客户端
  18. */
  19. public class ICHSubscribeClient {
  20. public static final String TAG = "ICHSubscribeClient";
  21. public static final String userName = "admin";
  22. public static final String password = "public";
  23. public static Context context;
  24. private MqttClient client;// 客户端实例
  25. private MqttTopic topic;// 主题实例
  26. private MqttMessage message;// 消息
  27. public static ICHSubscribeClient getInstance(){
  28. return ICHSubscribeClientInnerClass.ichPublishClient;
  29. }
  30. static class ICHSubscribeClientInnerClass{
  31. public static final ICHSubscribeClient ichPublishClient = new ICHSubscribeClient();
  32. }
  33. private ICHSubscribeClient() {
  34. String clientId = BaseUtils.getClientId() + "_subscribe";
  35. try {
  36. Log.i(TAG, "ICHSubscribeClient: 开始创建消息服务器订阅客户端实例!");
  37. client = new MqttClient(ICSPConstant.MQTT_SERVER_ADDRESS, clientId, new MemoryPersistence());
  38. Log.i(TAG, "ICHSubscribeClient: 创建消息服务器订阅客户端实例成功!");
  39. } catch (MqttException e) {
  40. Log.e(TAG, "ICHSubscribeClient: 创建消息服务器客户端失败!", e);
  41. LoggingUtils.sendErrorLog("业务异常: 创建消息服务器客户端失败! ", e);
  42. }
  43. connect();
  44. }
  45. public void connect() {
  46. Log.i(TAG, "connect: 订阅通知开始连接消息服务器!");
  47. MqttConnectOptions options = new MqttConnectOptions();
  48. options.setCleanSession(true);
  49. options.setUserName(userName);
  50. options.setPassword(password.toCharArray());
  51. // 连接超时时间
  52. options.setConnectionTimeout(30);
  53. // 设置心跳间隔时间
  54. options.setKeepAliveInterval(60);
  55. // TODO 设置连接回调
  56. client.setCallback(new MqttCallback() {
  57. @Override
  58. public void connectionLost(Throwable cause) {
  59. // TODO 重连
  60. Log.e(TAG, "connectionLost: cause: ", cause);
  61. connect();
  62. LoggingUtils.sendErrorLog("mqtt-subscribe客户端断线!", cause);
  63. MessageHandler.executeShell(getInstance(), ICHPublishClient.getInstance());
  64. MessageHandler.openLocker(getInstance(), ICHPublishClient.getInstance());
  65. MessageHandler.autoUpdateVersion(context, getInstance());
  66. }
  67. @Override
  68. public void messageArrived(String topic, MqttMessage message) throws Exception {
  69. // TODO MQTT服务器成功接收消息后,返回消息并调用此方法
  70. Log.i(TAG, "messageArrived: topic: " + topic + ", msg: " + message.getId() + "--" + new String(message.getPayload()));
  71. }
  72. @Override
  73. public void deliveryComplete(IMqttDeliveryToken token) {
  74. // TODO 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
  75. Log.i(TAG, "deliveryComplete: token---" + token.toString() + "---" + topic.getName());
  76. }
  77. });
  78. try {
  79. client.connect(options);
  80. Log.i(TAG, "connect: 连接消息服务器成功!");
  81. } catch (MqttException e) {
  82. Log.e(TAG, "connect: 连接消息服务器失败!", e);
  83. LoggingUtils.sendErrorLog("业务异常: 连接消息服务器失败! ", e);
  84. }
  85. }
  86. public void subscribe(String topicName, IMqttMessageListener iMqttMessageListener) {
  87. try {
  88. topic = client.getTopic(topicName);
  89. Log.i(TAG, "subscribe: 开始消息监听! topicName: " + topicName);
  90. client.subscribe(topicName, iMqttMessageListener);
  91. } catch (MqttException e) {
  92. Log.e(TAG, "subscribe: 消息监听失败! topicName: " + topicName, e);
  93. LoggingUtils.sendErrorLog("业务异常: 消息监听失败! topicName: " + topicName, e);
  94. }
  95. }
  96. }