首页 > 基础资料 博客日记

Java通过MQTT协议对接第三方设备————够用但不详细

2024-07-10 14:00:07基础资料围观452

本篇文章分享Java通过MQTT协议对接第三方设备————够用但不详细,对你有帮助的话记得收藏一下,看Java资料网收获更多编程知识

1. MQTT协议、参数需要注意这些!

1.1 详细介绍

其他地方很多,不多说了,遗步看风景这个老哥写的很清楚!

MQTT协议详解(完整版)-CSDN博客

1.2 参数介绍

IP:协议中broker的IP地址,端口一般是1883,具体看配置;

username:在部署broker时会配置username和password,如果broker是你们部署的,你应该清楚;如果是第三方的就直接找他要;

password:同上;

QoS:QoS 0至多一次、QoS 1至少一次、QoS 2只有一次(上边文章解释的比较清楚、自己看一下);

clientId:这个东西自己看着设置就好了,我的理解是连接同一个broker的唯一标识。所以在一个broker中这个一定是唯一的,不要和其他客户端的一样!有相同的两个clientId连接同一个broker会导致连接断开哟!

topic:主题,如果broker是学校,topic就是班级;你可以通过订阅topic获取到其他客户端下发的消息,相反的其他客户端也可以获取到你在某一个topic下发的消息;

主要的就这些了!

2. 业务场景

        说一下我的业务场景,我需要通过MQTT协议获取门禁设备的人员进出记录,设备方需要我先下发我们的人员信息,然后根据我下发的人员信息回复相应人员的进出记录;我再把这些进出记录保存到本地数据库;像这样↓

        我向topic1发布人员信息,设备订阅topic1获取我下发的人员信息,然后在收到人员信息后,设备向topic2发送该人员的出入记录,我通过订阅topic2获取设备发送的消息,然后处理保存。

3. Java后端操作流程

3.1 Maven依赖

第一步肯定是先搞依赖了!

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>

(broker如果需要自己部署的话,看4)

3.2 与broker建立连接

        这里搜到了很多方法,刚开始写的类似于这位老哥的java连接MQTT服务器(Springboot整合MQTT)_mqtt java-CSDN博客

这样写也没毛病,但是总要setMqttClient(),嫌麻烦,遂改!如下↓:

        直接在项目模块启动时初始化一个连接(IP、username、password最好写在配置文件哈,我这是方便展示这么写的);

@Configuration
@Slf4j
public class MQTTClientConfig {
​
    /**
     * 创建并配置一个MqttClient实例。
     * 该实例用于与MQTT服务器建立连接,并设置相应的连接选项,如自动重连、会话超时等。
     * 
     * @return MqttClient 返回配置好的MqttClient实例。
     */
    @Bean
    public MqttClient mqttClient() {
        MemoryPersistence dataStore = new MemoryPersistence();
        String serverUrl = "tcp://ip:1883";
        //前边有提到过,连接同一个broker的clientId是唯一的,我这么做是因为我们项目分测试和正式环境,正式环境是直接拉测试环境的docker,不这么搞会一直重连,如果你只有一个环境再跑就不用UUID,自己起个名就好了。
        String clientId = "mqtt"+ UUID.randomUUID();
        MqttClient client = null;
        try {
        // 初始化MqttClient实例
        client = new MqttClient(serverUrl, clientId, dataStore);
        
        // 配置连接选项
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setPassword("username".toCharArray());
        mqttConnectOptions.setUserName("password");
        mqttConnectOptions.setAutomaticReconnect(true); // 设置自动重连
        mqttConnectOptions.setCleanSession(false); // 设置会话不被清除,保持连接状态
        mqttConnectOptions.setConnectionTimeout(10); // 设置连接超时时间
        mqttConnectOptions.setKeepAliveInterval(60); // 设置会话心跳时间
        
        // 建立连接并设置回调
        client.connect(mqttConnectOptions);
        client.setCallback(new Callback());
    } catch (MqttException e) {
        e.printStackTrace();
        log.info(e.getMessage());
    }
    
    return client;
}

接下来是回调函数,有很多老哥都是直接操作的数据库存入数据库的,我这里是直接存入redis了,有两个原因:

  1. 在这里如果要操作数据库,只用用@Autowired注解注入是不行的,使用xxxService或者xxxMapper的方法都要在这里边初始化然后再引用,处理数据的话我还会用到ObjectMapper,也要搞,太麻烦,遂存入redis,去别的地方拿出来存入数据库;

  2. 业务场景问题:前边提到过我要先下发,设备才会给我相应的信息,我这里有几百个人员信息,也不能一次只发一条,所以一次性全发过去(用的队列,隔一定时间发,设备消费不及时也是会丢数据的),再依次接收,避免消息丢失;

    不需要用redis的话就把service或者mapper直接照这个内部类搞一下,用到什么方法写什么方法就好了;如果你也要用redis记得用完收拾一下;

@Slf4j
@Component
public class Callback implements MqttCallback {
​
    @Component
    public static class RedisTemplateOperate{
        @Autowired
        private RedisTemplate redisTemplate;
        private static RedisTemplateOperate redisTemplateOperate;
        @PostConstruct
        public void init() {
            redisTemplateOperate = this;
            redisTemplateOperate.redisTemplate = this.redisTemplate;
        }
​
        public void set(String key, Object value) {
            redisTemplateOperate.redisTemplate.opsForList().rightPush(key, value);
        }
​
        public void expire(String key, long value,TimeUnit timeUnit){
            redisTemplateOperate.redisTemplate.expire(key,value,timeUnit);
        }
​
    }
​
​
    /**
     * MQTT 断开连接会执行此方法
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("MQTT连接断开 :{}", throwable.getMessage());
    }
​
    /**
     * publish发布成功后会执行到这里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }
​
    /**
     * subscribe订阅后得到的消息会执行到这里
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        try {
            log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
            String payloadString = new String(message.getPayload(), StandardCharsets.UTF_8);
            
            RedisTemplateOperate.redisTemplateOperate.set(key,new String(message.getPayload()));
            log.info("考勤信息存入redis");
​
        }catch (Exception e){
            e.printStackTrace();
            log.error("消息处理失败",e);
        }
​
    }
​
}

然后是监听器,没什么说的就监听你接收数据的topic就好了;

@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
    @Resource
    private MqttClient mqttClient;
​
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            //订阅topic
            mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_ONE.getValue(),0);
            mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_TWO.getValue(),0);
            mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_THREE.getValue(),0);
            mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_FOUR.getValue(),0);
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}

然后、跑起来测试一下:这个软件可以MQTTX:全功能 MQTT 客户端工具

4.broker部署

如果你要部署broker就看一下官方文档,https://hub.docker.com/_/eclipse-mosquitto(这个需要翻墙),不会翻的话抽空学一下,很有用;不会翻百度一下mosquitto部署,有很多。。

记录一下,用于学习交流,业务相关代码不展示了,如果哪里有问题欢迎交流、指正。


文章来源:https://blog.csdn.net/qq_45113364/article/details/138864225
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云