package com.emato.ich.message; import android.util.Log; import com.emato.ich.contant.ICSPConstant; import com.emato.ich.utils.BaseUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * 发布客户端 */ public class ICHPublishClient { public static final String TAG = "IchClient"; public static final String userName = "admin"; public static final String password = "public"; private MqttClient client;// 客户端实例 private MqttTopic topic;// 主题实例 private MqttMessage message;// 消息 public static ICHPublishClient getInstance(){ return ICHPublishClientInnerClass.ichPublishClient; } static class ICHPublishClientInnerClass{ public static ICHPublishClient ichPublishClient = new ICHPublishClient(); } // 初始化客户端实例 private ICHPublishClient() { // String clientId = BaseUtils.getClientId() + "publish"; String clientId = BaseUtils.getClientId(); try { Log.i(TAG, "ICHPublishClient: 创建客户端实例开始!"); client = new MqttClient(ICSPConstant.MQTT_SERVER_ADDRESS, clientId, new MemoryPersistence()); Log.i(TAG, "ICHPublishClient: 创建客户端实例完成!"); } catch (MqttException e) { Log.e(TAG, "ICHPublishClient: 创建客户端实例失败!", e); } connect(); } // 连接服务器 private void connect() { Log.i(TAG, "connect: 开始连接服务器!"); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(userName); options.setPassword(password.toCharArray()); // 连接超时时间 options.setConnectionTimeout(10); // 设置心跳间隔时间 options.setKeepAliveInterval(30); // TODO 设置发布回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { Log.e(TAG, "connectionLost: cause: ", cause); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { Log.i(TAG, "messageArrived: topic: " + topic + ", msg: " + message.getId() + "--" + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { Log.i(TAG, "deliveryComplete: token---" + token.toString() + "---" + topic.getName()); } }); try { client.connect(options); Log.i(TAG, "connect: 连接消息服务器成功!"); } catch (MqttException e) { Log.e(TAG, "connect: 连接消息服务器失败!", e); } } // 发布消息 public void publish(String topicName, MqttMessage message) { Log.i(TAG, "publish: 发布消息, topic: " + topicName + ", message:" + message); MqttDeliveryToken token = null; try { topic = client.getTopic(topicName); token = topic.publish(message); token.waitForCompletion(); Log.i(TAG, "publish: 消息发送成功 " + token.isComplete()); } catch (MqttException e) { Log.e(TAG, "publish: 消息推送失败!", e); } } public void subscribe(String topicName, IMqttMessageListener iMqttMessageListener) { try { topic = client.getTopic(topicName); Log.i(TAG, "subscribe: 开始消息监听! topicName: " + topicName); client.subscribe(topicName, iMqttMessageListener); } catch (MqttException e) { Log.e(TAG, "subscribe: 消息监听失败! topicName: " + topicName, e); } } }