123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package com.emato.ich.message;
- import android.content.Context;
- import com.emato.ich.utils.Log;
- import com.emato.ich.contant.ICSPConstant;
- import com.emato.ich.utils.BaseUtils;
- import com.emato.ich.utils.LoggingUtils;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- /**
- * 订阅客户端
- */
- public class ICHSubscribeClient {
- public static final String TAG = "ICHSubscribeClient";
- public static final String userName = "admin";
- public static final String password = "public";
- public static Context context;
- private MqttClient client;// 客户端实例
- private MqttTopic topic;// 主题实例
- private MqttMessage message;// 消息
- public static ICHSubscribeClient getInstance(){
- return ICHSubscribeClientInnerClass.ichPublishClient;
- }
- static class ICHSubscribeClientInnerClass{
- public static final ICHSubscribeClient ichPublishClient = new ICHSubscribeClient();
- }
- private ICHSubscribeClient() {
- String clientId = BaseUtils.getClientId() + "_subscribe";
- try {
- Log.i(TAG, "ICHSubscribeClient: 开始创建消息服务器订阅客户端实例!");
- client = new MqttClient(ICSPConstant.MQTT_SERVER_ADDRESS, clientId, new MemoryPersistence());
- Log.i(TAG, "ICHSubscribeClient: 创建消息服务器订阅客户端实例成功!");
- } catch (MqttException e) {
- Log.e(TAG, "ICHSubscribeClient: 创建消息服务器客户端失败!", e);
- LoggingUtils.sendErrorLog("业务异常: 创建消息服务器客户端失败! ", e);
- }
- connect();
- }
- public void connect() {
- Log.i(TAG, "connect: 订阅通知开始连接消息服务器!");
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);
- options.setUserName(userName);
- options.setPassword(password.toCharArray());
- // 连接超时时间
- options.setConnectionTimeout(30);
- // 设置心跳间隔时间
- options.setKeepAliveInterval(60);
- // TODO 设置连接回调
- client.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable cause) {
- // TODO 重连
- Log.e(TAG, "connectionLost: cause: ", cause);
- connect();
- LoggingUtils.sendErrorLog("mqtt-subscribe客户端断线!", cause);
- MessageHandler.executeShell(getInstance(), ICHPublishClient.getInstance());
- MessageHandler.openLocker(getInstance(), ICHPublishClient.getInstance());
- MessageHandler.autoUpdateVersion(context, getInstance());
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // TODO MQTT服务器成功接收消息后,返回消息并调用此方法
- Log.i(TAG, "messageArrived: topic: " + topic + ", msg: " + message.getId() + "--" + new String(message.getPayload()));
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- // TODO 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
- Log.i(TAG, "deliveryComplete: token---" + token.toString() + "---" + topic.getName());
- }
- });
- try {
- client.connect(options);
- Log.i(TAG, "connect: 连接消息服务器成功!");
- } catch (MqttException e) {
- Log.e(TAG, "connect: 连接消息服务器失败!", e);
- LoggingUtils.sendErrorLog("业务异常: 连接消息服务器失败! ", e);
- }
- }
- public void subscribe(String topicName, IMqttMessageListener iMqttMessageListener) {
- try {
- topic = client.getTopic(topicName);
- Log.i(TAG, "subscribe: 开始消息监听! topicName: " + topicName);
- client.subscribe(topicName, iMqttMessageListener);
- } catch (MqttException e) {
- Log.e(TAG, "subscribe: 消息监听失败! topicName: " + topicName, e);
- LoggingUtils.sendErrorLog("业务异常: 消息监听失败! topicName: " + topicName, e);
- }
- }
- }
|