首页 > 基础资料 博客日记
Java MQTT 完整实战指南:接收消息与处理详解
2025-01-13 15:00:09基础资料围观57次
这篇文章介绍了Java MQTT 完整实战指南:接收消息与处理详解,分享给大家做个参考,收藏Java资料网收获更多编程知识
在物联网(IoT)领域中, MQTT 是一种轻量级的消息传输协议,特别适合资源受限的设备之间通信。许多初学者在接触 MQTT 时,常常会遇到“如何接收消息并处理”这一问题。本文将通过一个详细的 Java 示例,带你深入了解 MQTT 消息接收,并快速上手 Paho 客户端。
前期准备
- MQTT 服务器地址:你可以选择本地搭建的 MQTT 服务器或使用云端服务。
- Java 开发环境:推荐使用 IntelliJ IDEA,并配置好 JDK 8+。
- Paho MQTT 依赖库:确保已将 Eclipse Paho 客户端库添加到项目中。
- 发送消息到主题:确保有工具或代码向指定主题发送消息,用于测试接收功能。
实现功能
- 连接 MQTT 服务器。
- 订阅指定主题,例如
java/b
。 - 接收消息 并输出消息内容。
代码实现
以下是完整的 Java 代码,逐步实现接收 MQTT 消息的功能。
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;
import java.util.concurrent.CountDownLatch;
/**
* @author: Takumilove
* @description: MqttPahoTest
* @date: 2024/12/18
**/
public class MqttPahoTest {
private static final String SERVER_URI = "tcp://156.238.*******:1883"; // MQTT 服务器地址
private static final String CLIENT_ID = "paho_test"; // 客户端ID,唯一标识
@Test
public void receiveMessage() {
// 创建一个 CountDownLatch,确保程序持续运行以接收消息
CountDownLatch latch = new CountDownLatch(1);
try {
// 1. 创建 MqttClient 对象
MqttClient mqttClient = new MqttClient(SERVER_URI, CLIENT_ID, new MemoryPersistence());
// 2. 配置连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("admin");
options.setPassword("123456".toCharArray());
options.setCleanSession(true);
// 3. 设置回调函数
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) { // 连接丢失处理
System.out.println("连接断开: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception { // 接收消息处理
System.out.println("接收到消息 - 主题: " + topic);
byte[] payload = message.getPayload();
System.out.println("消息内容: " + new String(payload));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) { // 消息传输完毕
System.out.println("消息传输完毕");
}
});
// 4. 连接到 MQTT 服务器
mqttClient.connect(options);
System.out.println("成功连接到 MQTT 服务器");
// 5. 订阅主题
mqttClient.subscribe("java/b", 2); // 订阅 "java/b",QoS 等级为 2
System.out.println("成功订阅主题: java/b");
// 保持运行状态,持续监听消息
latch.await();
} catch (MqttException e) {
System.err.println("连接或订阅失败: " + e.getMessage());
e.printStackTrace();
} catch (InterruptedException e) {
System.err.println("程序被中断: " + e.getMessage());
e.printStackTrace();
}
}
}
代码讲解
以下是对Java代码的详细讲解,帮助你更好地理解每一部分的功能。
-
连接配置
private static final String SERVER_URI = "tcp://156.238.*******:1883"; // MQTT 服务器地址 private static final String CLIENT_ID = "paho_test"; // 客户端ID,唯一标识
SERVER_URI
:指定MQTT服务器的地址,格式为tcp://ip:port
。CLIENT_ID
:客户端的唯一标识,确保每个客户端ID唯一,以避免连接冲突。
-
创建MqttClient对象
MqttClient mqttClient = new MqttClient(SERVER_URI, CLIENT_ID, new MemoryPersistence());
- 使用
MqttClient
类创建客户端实例,MemoryPersistence
用于存储会话信息。
- 使用
-
配置连接选项
MqttConnectOptions options = new MqttConnectOptions(); options.setUserName("admin"); options.setPassword("123456".toCharArray()); options.setCleanSession(true);
setUserName
和setPassword
用于设置连接认证信息。setCleanSession(true)
表示每次连接时清除之前的会话信息。
-
设置回调函数
mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("连接断开: " + cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("接收到消息 - 主题: " + topic); byte[] payload = message.getPayload(); System.out.println("消息内容: " + new String(payload)); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("消息传输完毕"); } });
connectionLost
:当与MQTT服务器的连接丢失时触发,输出断开原因。messageArrived
:当订阅的主题接收到新消息时触发,输出主题和消息内容。deliveryComplete
:消息发布完成时触发,适用于消息发布功能。
-
连接并订阅主题
try { mqttClient.connect(options); System.out.println("成功连接到 MQTT 服务器"); mqttClient.subscribe("java/b", 2); // 订阅 "java/b" 主题,QoS等级为2 System.out.println("成功订阅主题: java/b"); } catch (MqttException e) { System.err.println("连接或订阅失败: " + e.getMessage()); e.printStackTrace(); }
- 使用
connect
方法连接到MQTT服务器。 - 使用
subscribe
方法订阅指定的主题,QoS等级为2,确保消息只传递一次。
- 使用
-
保持程序运行
// 保持运行状态,持续监听消息 CountDownLatch latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { System.err.println("程序被中断: " + e.getMessage()); e.printStackTrace(); }
- 使用
CountDownLatch
避免程序因无限循环而阻塞,保持程序持续运行以接收消息。
- 使用
运行结果
运行代码后,连接成功,控制台输出如下:
总结
通过以上代码,你可以快速实现一个简单的 Java MQTT 消息接收器。只需替换你的服务器地址、用户名和密码,即可接收来自指定主题的消息。
下一步可以做什么?
- 添加 消息持久化 功能,将接收到的消息保存到数据库。
- 实现 多主题监听,订阅多个 MQTT 主题。
- 结合前端展示,实时可视化消息内容。
文章来源:https://blog.csdn.net/Takumilove/article/details/144528867
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: