首页 > 基础资料 博客日记
Java通过MQTT协议对接第三方设备————够用但不详细
2024-07-10 14:00:07基础资料围观392次
1. MQTT协议、参数需要注意这些!
1.1 详细介绍
其他地方很多,不多说了,遗步看风景这个老哥写的很清楚!
1.2 参数介绍
IP:协议中broker的IP地址,端口一般是1883,具体看配置;
username:在部署broker时会配置username和password,如果broker是你们部署的,你应该清楚;如果是第三方的就直接找他要;
password:同上;
QoS:QoS 0至多一次、QoS 1至少一次、QoS 2只有一次(上边文章解释的比较清楚、自己看一下);
clientId:这个东西自己看着设置就好了,我的理解是连接同一个broker的唯一标识。所以在一个broker中这个一定是唯一的,不要和其他客户端的一样!有相同的两个clientId连接同一个broker会导致连接断开哟!
topic:主题,如果broker是学校,topic就是班级;你可以通过订阅topic获取到其他客户端下发的消息,相反的其他客户端也可以获取到你在某一个topic下发的消息;
主要的就这些了!
2. 业务场景
说一下我的业务场景,我需要通过MQTT协议获取门禁设备的人员进出记录,设备方需要我先下发我们的人员信息,然后根据我下发的人员信息回复相应人员的进出记录;我再把这些进出记录保存到本地数据库;像这样↓
我向topic1发布人员信息,设备订阅topic1获取我下发的人员信息,然后在收到人员信息后,设备向topic2发送该人员的出入记录,我通过订阅topic2获取设备发送的消息,然后处理保存。
3. Java后端操作流程
3.1 Maven依赖
第一步肯定是先搞依赖了!
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
(broker如果需要自己部署的话,看4)
3.2 与broker建立连接
这里搜到了很多方法,刚开始写的类似于这位老哥的java连接MQTT服务器(Springboot整合MQTT)_mqtt java-CSDN博客
这样写也没毛病,但是总要setMqttClient(),嫌麻烦,遂改!如下↓:
直接在项目模块启动时初始化一个连接(IP、username、password最好写在配置文件哈,我这是方便展示这么写的);
@Configuration
@Slf4j
public class MQTTClientConfig {
/**
* 创建并配置一个MqttClient实例。
* 该实例用于与MQTT服务器建立连接,并设置相应的连接选项,如自动重连、会话超时等。
*
* @return MqttClient 返回配置好的MqttClient实例。
*/
@Bean
public MqttClient mqttClient() {
MemoryPersistence dataStore = new MemoryPersistence();
String serverUrl = "tcp://ip:1883";
//前边有提到过,连接同一个broker的clientId是唯一的,我这么做是因为我们项目分测试和正式环境,正式环境是直接拉测试环境的docker,不这么搞会一直重连,如果你只有一个环境再跑就不用UUID,自己起个名就好了。
String clientId = "mqtt"+ UUID.randomUUID();
MqttClient client = null;
try {
// 初始化MqttClient实例
client = new MqttClient(serverUrl, clientId, dataStore);
// 配置连接选项
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setPassword("username".toCharArray());
mqttConnectOptions.setUserName("password");
mqttConnectOptions.setAutomaticReconnect(true); // 设置自动重连
mqttConnectOptions.setCleanSession(false); // 设置会话不被清除,保持连接状态
mqttConnectOptions.setConnectionTimeout(10); // 设置连接超时时间
mqttConnectOptions.setKeepAliveInterval(60); // 设置会话心跳时间
// 建立连接并设置回调
client.connect(mqttConnectOptions);
client.setCallback(new Callback());
} catch (MqttException e) {
e.printStackTrace();
log.info(e.getMessage());
}
return client;
}
接下来是回调函数,有很多老哥都是直接操作的数据库存入数据库的,我这里是直接存入redis了,有两个原因:
-
在这里如果要操作数据库,只用用@Autowired注解注入是不行的,使用xxxService或者xxxMapper的方法都要在这里边初始化然后再引用,处理数据的话我还会用到ObjectMapper,也要搞,太麻烦,遂存入redis,去别的地方拿出来存入数据库;
-
业务场景问题:前边提到过我要先下发,设备才会给我相应的信息,我这里有几百个人员信息,也不能一次只发一条,所以一次性全发过去(用的队列,隔一定时间发,设备消费不及时也是会丢数据的),再依次接收,避免消息丢失;
不需要用redis的话就把service或者mapper直接照这个内部类搞一下,用到什么方法写什么方法就好了;如果你也要用redis记得用完收拾一下;
@Slf4j
@Component
public class Callback implements MqttCallback {
@Component
public static class RedisTemplateOperate{
@Autowired
private RedisTemplate redisTemplate;
private static RedisTemplateOperate redisTemplateOperate;
@PostConstruct
public void init() {
redisTemplateOperate = this;
redisTemplateOperate.redisTemplate = this.redisTemplate;
}
public void set(String key, Object value) {
redisTemplateOperate.redisTemplate.opsForList().rightPush(key, value);
}
public void expire(String key, long value,TimeUnit timeUnit){
redisTemplateOperate.redisTemplate.expire(key,value,timeUnit);
}
}
/**
* MQTT 断开连接会执行此方法
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("MQTT连接断开 :{}", throwable.getMessage());
}
/**
* publish发布成功后会执行到这里
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
/**
* subscribe订阅后得到的消息会执行到这里
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
String payloadString = new String(message.getPayload(), StandardCharsets.UTF_8);
RedisTemplateOperate.redisTemplateOperate.set(key,new String(message.getPayload()));
log.info("考勤信息存入redis");
}catch (Exception e){
e.printStackTrace();
log.error("消息处理失败",e);
}
}
}
然后是监听器,没什么说的就监听你接收数据的topic就好了;
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
@Resource
private MqttClient mqttClient;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
//订阅topic
mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_ONE.getValue(),0);
mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_TWO.getValue(),0);
mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_THREE.getValue(),0);
mqttClient.subscribe(MQTTEnum.MQTT_SUB_TOPIC_FOUR.getValue(),0);
} catch (MqttException e) {
log.error(e.getMessage(), e);
}
}
}
然后、跑起来测试一下:这个软件可以MQTTX:全功能 MQTT 客户端工具
4.broker部署
如果你要部署broker就看一下官方文档,https://hub.docker.com/_/eclipse-mosquitto(这个需要翻墙),不会翻的话抽空学一下,很有用;不会翻百度一下mosquitto部署,有很多。。
记录一下,用于学习交流,业务相关代码不展示了,如果哪里有问题欢迎交流、指正。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签: