package com.emato.ich.message; import android.content.Context; import com.emato.ich.utils.Log; import com.emato.ich.contant.ICSPConstant; import com.emato.ich.utils.BaseUtils; import com.emato.ich.utils.LoggingUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 订阅客户端 */ public class ICHSubscribeClient { public static final String TAG = "ICHSubscribeClient"; public static final String userName = "admin"; public static final String password = "public"; public static Context context; private MqttClient client;// 客户端实例 private MqttTopic topic;// 主题实例 private MqttMessage message;// 消息 public static ICHSubscribeClient getInstance(){ return ICHSubscribeClientInnerClass.ichPublishClient; } static class ICHSubscribeClientInnerClass{ public static final ICHSubscribeClient ichPublishClient = new ICHSubscribeClient(); } private ICHSubscribeClient() { String clientId = BaseUtils.getClientId() + "_subscribe"; try { Log.i(TAG, "ICHSubscribeClient: 开始创建消息服务器订阅客户端实例!"); client = new MqttClient(ICSPConstant.MQTT_SERVER_ADDRESS, clientId, new MemoryPersistence()); Log.i(TAG, "ICHSubscribeClient: 创建消息服务器订阅客户端实例成功!"); } catch (MqttException e) { Log.e(TAG, "ICHSubscribeClient: 创建消息服务器客户端失败!", e); LoggingUtils.sendErrorLog("业务异常: 创建消息服务器客户端失败! ", e); } connect(); } public void connect() { Log.i(TAG, "connect: 订阅通知开始连接消息服务器!"); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(userName); options.setPassword(password.toCharArray()); // 连接超时时间 options.setConnectionTimeout(30); // 设置心跳间隔时间 options.setKeepAliveInterval(60); // TODO 设置连接回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { // TODO 重连 Log.e(TAG, "connectionLost: cause: ", cause); connect(); LoggingUtils.sendErrorLog("mqtt-subscribe客户端断线!", cause); MessageHandler.executeShell(getInstance(), ICHPublishClient.getInstance()); MessageHandler.openLocker(getInstance(), ICHPublishClient.getInstance()); MessageHandler.autoUpdateVersion(context, getInstance()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // TODO MQTT服务器成功接收消息后,返回消息并调用此方法 Log.i(TAG, "messageArrived: topic: " + topic + ", msg: " + message.getId() + "--" + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // TODO 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 Log.i(TAG, "deliveryComplete: token---" + token.toString() + "---" + topic.getName()); } }); try { client.connect(options); Log.i(TAG, "connect: 连接消息服务器成功!"); } catch (MqttException e) { Log.e(TAG, "connect: 连接消息服务器失败!", e); LoggingUtils.sendErrorLog("业务异常: 连接消息服务器失败! ", e); } } public void subscribe(String topicName, IMqttMessageListener iMqttMessageListener) { try { topic = client.getTopic(topicName); Log.i(TAG, "subscribe: 开始消息监听! topicName: " + topicName); client.subscribe(topicName, iMqttMessageListener); } catch (MqttException e) { Log.e(TAG, "subscribe: 消息监听失败! topicName: " + topicName, e); LoggingUtils.sendErrorLog("业务异常: 消息监听失败! topicName: " + topicName, e); } } }