首页 > 基础资料 博客日记
java springboot 对接mqtt发消息
2023-08-07 18:46:35基础资料围观316次
本篇文章分享java springboot 对接mqtt发消息,对你有帮助的话记得收藏一下,看Java资料网收获更多编程知识
一、引入maven
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
二、客户端
import com.ruoyi.common.config.MinioConfigg; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttPersistenceException; import org.eclipse.paho.client.mqttv3.MqttTopic; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component; @Component public class MyMqttClient { public static MqttClient mqttClient = null; private static MemoryPersistence memoryPersistence = null; private static MqttConnectOptions mqttConnectOptions = null; public static String ClientId = (int)((Math.random()*9+1)*10000000)+""; public static void init(String clientId) { //初始化连接设置对象 mqttConnectOptions = new MqttConnectOptions(); //初始化MqttClient if(null != mqttConnectOptions) { // true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态 mqttConnectOptions.setCleanSession(false); // 设置连接超时 mqttConnectOptions.setConnectionTimeout(30); //设置断开后重新连接 mqttConnectOptions.setAutomaticReconnect(true); // 设置持久化方式 memoryPersistence = new MemoryPersistence(); if(null != memoryPersistence && null != clientId) { try { mqttClient = new MqttClient("tcp://"+MinioConfigg.APP_KEY, clientId,memoryPersistence); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { } }else { System.out.println("mqttConnectOptions对象为空"); } System.out.println(mqttClient.isConnected()); //设置连接和回调 if(null != mqttClient) { if(!mqttClient.isConnected()) { // 创建回调函数对象 MqttRecieveCallback mqttReceriveCallback = new MqttRecieveCallback(); // 客户端添加回调函数 mqttClient.setCallback(mqttReceriveCallback); // 创建连接 try { System.out.println("创建连接"); mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { System.out.println("mqttClient为空"); } System.out.println(mqttClient.isConnected()); } // 关闭连接 public void closeConnect() { //关闭存储方式 if(null != memoryPersistence) { try { memoryPersistence.close(); } catch (MqttPersistenceException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("memoryPersistence is null"); } // 关闭连接 if(null != mqttClient) { if(mqttClient.isConnected()) { try { mqttClient.disconnect(); mqttClient.close(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is not connect"); } }else { System.out.println("mqttClient is null"); } } // 发布消息 public void publishMessage(String pubTopic,String message) { if(null != mqttClient&& mqttClient.isConnected()) { System.out.println("发布消息 "+mqttClient.isConnected()); System.out.println("id:"+mqttClient.getClientId()); MqttMessage mqttMessage = new MqttMessage(); /*QoS0,At most once,至多一次; QoS1,At least once,至少一次; QoS2,Exactly once,确保只有一次。2比1耗性能*/ mqttMessage.setQos(2); mqttMessage.setPayload(message.getBytes()); MqttTopic topic = mqttClient.getTopic(pubTopic); if(null != topic) { try { MqttDeliveryToken publish = topic.publish(mqttMessage); if(!publish.isComplete()) { System.out.println("消息发布成功"); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }else { reConnect(); } } // 重新连接 public void reConnect() { if(null != mqttClient) { if(!mqttClient.isConnected()) { if(null != mqttConnectOptions) { try { mqttClient.connect(mqttConnectOptions); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttConnectOptions is null"); } }else { System.out.println("mqttClient is null or connect"); } }else { init(ClientId); } } // 订阅主题 public void subTopic(String topic) { if(null != mqttClient&& mqttClient.isConnected()) { try { mqttClient.subscribe(topic, 1); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } } // 清空主题 public void cleanTopic(String topic) { if(null != mqttClient&& !mqttClient.isConnected()) { try { mqttClient.unsubscribe(topic); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } }else { System.out.println("mqttClient is error"); } } }
三、消费端
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MqttRecieveCallback implements MqttCallback{ @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { /*System.out.println("Client 接收消息主题 : " + topic); System.out.println("Client 接收消息Qos : " + message.getQos()); System.out.println("Client 接收消息内容 : " + new String(message.getPayload()));*/ } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
四、程序启动时候开启mqtt 在主程序的Application中
public class XXXApplication implements ApplicationListener<ApplicationReadyEvent> { public static void main(String[] args) { SpringApplication.run(XXXApplication.class, args); } @Override public void onApplicationEvent(ApplicationReadyEvent event) { MyMqttClient.init(MyMqttClient.ClientId); } }
文章来源:https://www.cnblogs.com/void--main/p/16532210.html
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: