在目前所有的推送技术中,mqtt无疑是目前最主流的协议了,其优越性在于它的协议消耗非常的少,对于网络性能不是特别对的好的系统来说,mqtt无疑可以保证其推送的高效性,之前小沃曾经有过关于html5技术实现mqtt的案例详解,但是由于一些原因,有些时候并不是要求客户一定要打开浏览器才能实现,而是要客户只要运行后台程序就需要实现。因此,除了html5,小沃最近也研究出了一套后台连接mqtt服务器的方式。所使用的语言也是目前最为流行的java程序。
做到这一步,首先需要引入相关的库文件。这里我们使用mqtt官方推荐的org.eclipse.paho.client.mqttv3.jar库,最新版本是1.2.0版本。
这里我们要建立一个MqttClient类,其构造函数为new MqttClient(broker, clientId, persistence)。broker为类似"tcp://0.0.0.0:1883"这样的字符串,clientId就不多解释了,persistence只需要通过new创建一个新的方法即可。
还需要一个MqttConnectOptions类,这个类用于配置各种与mqtt相关的参数,在最后的MqttClient.connect(),作为参数送进去。
其中一个比较重要的参数是MqttCallback,它是接收到数据的回调类,用这个类可以实现执行特定的消息处理函数。
好了,废话不多说,直接上代码。
package cn.worldflying.mqtt; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class Mqtt { private MqttClient client; public Mqtt(String broker, String topic, String clientId, String username, String password, MqttCallback mqttcallback) { try { MemoryPersistence persistence = new MemoryPersistence(); client = new MqttClient(broker, clientId, persistence); MqttConnectOptions opt = new MqttConnectOptions(); opt.setCleanSession(true); opt.setConnectionTimeout(10); opt.setKeepAliveInterval(20); if(username != null) opt.setUserName(username); if(password != null) opt.setPassword(password.toCharArray()); client.setCallback(mqttcallback); System.out.println("Connecting to broker: "+broker); client.connect(opt); client.subscribe(topic, 2); System.out.println("Connected"); } catch(MqttException me) { System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); } } public int Public(String topic, String content) { int qos = 2; System.out.println("Publishing message: "+content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); try { client.publish(topic, message); } catch (MqttException me) { // TODO Auto-generated catch block System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); } System.out.println("Message published"); return 0; } public int Disconnect() { try { client.disconnect(); } catch (MqttException me) { // TODO Auto-generated catch block System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); } return 0; } }