ICHPublishClient.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package com.emato.ich.message;
  2. import android.util.Log;
  3. import com.emato.ich.contant.ICSPConstant;
  4. import com.emato.ich.utils.BaseUtils;
  5. import com.emato.ich.utils.LoggingUtils;
  6. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  7. import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
  8. import org.eclipse.paho.client.mqttv3.MqttCallback;
  9. import org.eclipse.paho.client.mqttv3.MqttClient;
  10. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  11. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  12. import org.eclipse.paho.client.mqttv3.MqttException;
  13. import org.eclipse.paho.client.mqttv3.MqttMessage;
  14. import org.eclipse.paho.client.mqttv3.MqttTopic;
  15. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  16. /**
  17. * 发布客户端
  18. */
  19. public class ICHPublishClient {
  20. public static final String TAG = "IchClient";
  21. public static final String userName = "admin";
  22. public static final String password = "public";
  23. private MqttClient client;// 客户端实例
  24. private MqttTopic topic;// 主题实例
  25. private MqttMessage message;// 消息
  26. public static ICHPublishClient getInstance(){
  27. return ICHPublishClientInnerClass.ichPublishClient;
  28. }
  29. static class ICHPublishClientInnerClass{
  30. public static ICHPublishClient ichPublishClient = new ICHPublishClient();
  31. }
  32. // 初始化客户端实例
  33. private ICHPublishClient() {
  34. // String clientId = BaseUtils.getClientId() + "publish";
  35. String clientId = BaseUtils.getClientId();
  36. try {
  37. Log.i(TAG, "ICHPublishClient: 创建客户端实例开始!");
  38. client = new MqttClient(ICSPConstant.MQTT_SERVER_ADDRESS, clientId, new MemoryPersistence());
  39. Log.i(TAG, "ICHPublishClient: 创建客户端实例完成!");
  40. } catch (MqttException e) {
  41. Log.e(TAG, "ICHPublishClient: 创建客户端实例失败!", e);
  42. LoggingUtils.sendErrorLog("业务异常: 创建客户端实例失败! ", e);
  43. }
  44. connect();
  45. }
  46. // 连接服务器
  47. private void connect() {
  48. Log.i(TAG, "connect: 开始连接服务器!");
  49. MqttConnectOptions options = new MqttConnectOptions();
  50. options.setCleanSession(true);
  51. options.setUserName(userName);
  52. options.setPassword(password.toCharArray());
  53. // 连接超时时间
  54. options.setConnectionTimeout(10);
  55. // 设置心跳间隔时间
  56. options.setKeepAliveInterval(30);
  57. // TODO 设置发布回调
  58. client.setCallback(new MqttCallback() {
  59. @Override
  60. public void connectionLost(Throwable cause) {
  61. Log.e(TAG, "connectionLost: cause: ", cause);
  62. }
  63. @Override
  64. public void messageArrived(String topic, MqttMessage message) throws Exception {
  65. Log.i(TAG, "messageArrived: topic: " + topic + ", msg: " + message.getId() + "--" + new String(message.getPayload()));
  66. }
  67. @Override
  68. public void deliveryComplete(IMqttDeliveryToken token) {
  69. Log.i(TAG, "deliveryComplete: token---" + token.toString() + "---" + topic.getName());
  70. }
  71. });
  72. try {
  73. client.connect(options);
  74. Log.i(TAG, "connect: 连接消息服务器成功!");
  75. } catch (MqttException e) {
  76. Log.e(TAG, "connect: 连接消息服务器失败!", e);
  77. LoggingUtils.sendErrorLog("业务异常: 连接消息服务器失败! ", e);
  78. }
  79. }
  80. // 发布消息
  81. public void publish(String topicName, MqttMessage message) {
  82. Log.i(TAG, "publish: 发布消息, topic: " + topicName + ", message:" + message);
  83. MqttDeliveryToken token = null;
  84. try {
  85. topic = client.getTopic(topicName);
  86. token = topic.publish(message);
  87. token.waitForCompletion();
  88. Log.i(TAG, "publish: 消息发送成功 " + token.isComplete());
  89. } catch (MqttException e) {
  90. Log.e(TAG, "publish: 消息推送失败!", e);
  91. LoggingUtils.sendErrorLog("业务异常: 消息推送失败! ", e);
  92. }
  93. }
  94. public void subscribe(String topicName, IMqttMessageListener iMqttMessageListener) {
  95. try {
  96. topic = client.getTopic(topicName);
  97. Log.i(TAG, "subscribe: 开始消息监听! topicName: " + topicName);
  98. client.subscribe(topicName, iMqttMessageListener);
  99. } catch (MqttException e) {
  100. Log.e(TAG, "subscribe: 消息监听失败! topicName: " + topicName, e);
  101. LoggingUtils.sendErrorLog("业务异常: 消息监听失败! topicName: " + topicName, e);
  102. }
  103. }
  104. }