首页 > 基础资料 博客日记

Java MQTT 完整实战指南:接收消息与处理详解

2025-01-13 15:00:09基础资料围观57

这篇文章介绍了Java MQTT 完整实战指南:接收消息与处理详解,分享给大家做个参考,收藏Java资料网收获更多编程知识


在物联网(IoT)领域中, MQTT 是一种轻量级的消息传输协议,特别适合资源受限的设备之间通信。许多初学者在接触 MQTT 时,常常会遇到“如何接收消息并处理”这一问题。本文将通过一个详细的 Java 示例,带你深入了解 MQTT 消息接收,并快速上手 Paho 客户端。


前期准备

  • MQTT 服务器地址:你可以选择本地搭建的 MQTT 服务器或使用云端服务。
  • Java 开发环境:推荐使用 IntelliJ IDEA,并配置好 JDK 8+
  • Paho MQTT 依赖库:确保已将 Eclipse Paho 客户端库添加到项目中。
  • 发送消息到主题:确保有工具或代码向指定主题发送消息,用于测试接收功能。


实现功能

  1. 连接 MQTT 服务器
  2. 订阅指定主题,例如 java/b
  3. 接收消息 并输出消息内容。

代码实现

以下是完整的 Java 代码,逐步实现接收 MQTT 消息的功能。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;

/**
 * @author: Takumilove
 * @description: MqttPahoTest
 * @date: 2024/12/18
 **/
public class MqttPahoTest {

    private static final String SERVER_URI = "tcp://156.238.*******:1883";  // MQTT 服务器地址
    private static final String CLIENT_ID = "paho_test";                  // 客户端ID,唯一标识

    @Test
    public void receiveMessage() {
        // 创建一个 CountDownLatch,确保程序持续运行以接收消息
        CountDownLatch latch = new CountDownLatch(1);

        try {
            // 1. 创建 MqttClient 对象
            MqttClient mqttClient = new MqttClient(SERVER_URI, CLIENT_ID, new MemoryPersistence());

            // 2. 配置连接选项
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName("admin");
            options.setPassword("123456".toCharArray());
            options.setCleanSession(true);

            // 3. 设置回调函数
            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) { // 连接丢失处理
                    System.out.println("连接断开: " + cause.getMessage());
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception { // 接收消息处理
                    System.out.println("接收到消息 - 主题: " + topic);
                    byte[] payload = message.getPayload();
                    System.out.println("消息内容: " + new String(payload));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) { // 消息传输完毕
                    System.out.println("消息传输完毕");
                }
            });

            // 4. 连接到 MQTT 服务器
            mqttClient.connect(options);
            System.out.println("成功连接到 MQTT 服务器");

            // 5. 订阅主题
            mqttClient.subscribe("java/b", 2); // 订阅 "java/b",QoS 等级为 2
            System.out.println("成功订阅主题: java/b");

            // 保持运行状态,持续监听消息
            latch.await();
        } catch (MqttException e) {
            System.err.println("连接或订阅失败: " + e.getMessage());
            e.printStackTrace();
        } catch (InterruptedException e) {
            System.err.println("程序被中断: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

代码讲解

以下是对Java代码的详细讲解,帮助你更好地理解每一部分的功能。

  1. 连接配置

    private static final String SERVER_URI = "tcp://156.238.*******:1883";  // MQTT 服务器地址
    private static final String CLIENT_ID = "paho_test";                  // 客户端ID,唯一标识
    
    • SERVER_URI:指定MQTT服务器的地址,格式为tcp://ip:port
    • CLIENT_ID:客户端的唯一标识,确保每个客户端ID唯一,以避免连接冲突。
  2. 创建MqttClient对象

    MqttClient mqttClient = new MqttClient(SERVER_URI, CLIENT_ID, new MemoryPersistence());
    
    • 使用MqttClient类创建客户端实例,MemoryPersistence用于存储会话信息。
  3. 配置连接选项

    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName("admin");
    options.setPassword("123456".toCharArray());
    options.setCleanSession(true);
    
    • setUserNamesetPassword用于设置连接认证信息。
    • setCleanSession(true)表示每次连接时清除之前的会话信息。
  4. 设置回调函数

    mqttClient.setCallback(new MqttCallback() {
        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("连接断开: " + cause.getMessage());
        }
    
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("接收到消息 - 主题: " + topic);
            byte[] payload = message.getPayload();
            System.out.println("消息内容: " + new String(payload));
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("消息传输完毕");
        }
    });
    
    • connectionLost:当与MQTT服务器的连接丢失时触发,输出断开原因。
    • messageArrived:当订阅的主题接收到新消息时触发,输出主题和消息内容。
    • deliveryComplete:消息发布完成时触发,适用于消息发布功能。
  5. 连接并订阅主题

    try {
        mqttClient.connect(options);
        System.out.println("成功连接到 MQTT 服务器");
    
        mqttClient.subscribe("java/b", 2); // 订阅 "java/b" 主题,QoS等级为2
        System.out.println("成功订阅主题: java/b");
    } catch (MqttException e) {
        System.err.println("连接或订阅失败: " + e.getMessage());
        e.printStackTrace();
    }
    
    • 使用connect方法连接到MQTT服务器。
    • 使用subscribe方法订阅指定的主题,QoS等级为2,确保消息只传递一次。
  6. 保持程序运行

    // 保持运行状态,持续监听消息
    CountDownLatch latch = new CountDownLatch(1);
    try {
        latch.await();
    } catch (InterruptedException e) {
        System.err.println("程序被中断: " + e.getMessage());
        e.printStackTrace();
    }
    
    • 使用CountDownLatch避免程序因无限循环而阻塞,保持程序持续运行以接收消息。

运行结果

运行代码后,连接成功,控制台输出如下:


总结

通过以上代码,你可以快速实现一个简单的 Java MQTT 消息接收器。只需替换你的服务器地址、用户名和密码,即可接收来自指定主题的消息。

下一步可以做什么?

  • 添加 消息持久化 功能,将接收到的消息保存到数据库。
  • 实现 多主题监听,订阅多个 MQTT 主题。
  • 结合前端展示,实时可视化消息内容。

文章来源:https://blog.csdn.net/Takumilove/article/details/144528867
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云