首页 > 基础资料 博客日记
使用java获取阿里云物联网平台设备数据
2024-09-30 10:00:09基础资料围观217次
当前有一个业务要获取阿里云物联网平台上面的传感器数据,代码使用java,采用springboot框架,阿里云物联网平台上一个产品下的一个设备只能连接真实设备或者代码服务端,于是我采用数据流转的方式将数据流转到一个设备中,服务端订阅该设备的Topic。
阿里云创建
首先创建一个产品,用于测试使用,直接选择一个标准品类,品类有温度和湿度属性(实际使用需要按需求自定义产品),节点类型为了方便选的直连设备。
产品创建完成之后点击查看,点击功能定义模块,编辑草稿,添加刚才选择的标准产品的两个标准功能,也可以添加自定义功能看自己选择,这边只使用温度和湿度。
添加完成之后直接点击发布上线。
发布完成之后进入Topic 类列表的自定义Topic 部分
定义一个Topic类,便于后面通过该Topic流转到服务器端,操作权限为发布和订阅,命名为tento(按照自己需求命名),点击确认就定义完成了。
之后为这个产品添加两个设备(一个真实设备,一个服务器设备),实际使用可以创建多个真实设备,数据流转到一个服务器设备中就可以了,为了方便这边只创建两个设备测试。直接添加,DeviceName让他随机生成。
之后进入云产品流转
创建一个解析器
解析器创建完成之后点击编辑,点击关联数据源然后创建一个数据源,然后关联数据源选择刚才创建的这个。
点击查看这个数据源,添加Topic,因为我要获取真实设备的数据,所以选择物模型数据上报的Topic,设备DeviceName选择刚才创建的真实设备,最后选择post。这样数据源就完成了
然后进入数据目的,点击关联数据目的
和刚才操作类似点击关联数据目的---->创建数据目的,产品选择我们一开始创建的产品,操作为发送到另一个Topic。
最后是解析器脚本,会有一个草稿,脚本中的writeIotTopic(1004),1004是自动生成的,自己的代码不要改这个
// 草稿页为空时,进入草稿页会生成默认脚本
// 如果默认脚本自动保存过,继续绑定数据目的,默认脚本不会自动更新
// 此时清空脚本并保存之后,重新进入草稿页即可重新生成包含最新数据目的的默认脚本
// 设备上报数据内容,json格式
var data = payload('json');
// 流转到另一个Topic
writeIotTopic(1004, "/" + productKey() + "/" + deviceName() + "/user/get", data);
改为
// 草稿页为空时,进入草稿页会生成默认脚本
// 如果默认脚本自动保存过,继续绑定数据目的,默认脚本不会自动更新
// 此时清空脚本并保存之后,重新进入草稿页即可重新生成包含最新数据目的的默认脚本
// 设备上报数据内容,json格式
var data = payload('json');
// 流转到另一个Topic
writeIotTopic(1004, "/k1a8gKQzYMD/ejP0IrtXk49UIMz7Wsn1/user/tento", data);
只更改了/k1a8gKQzYMD/ejP0IrtXk49UIMz7Wsn1/user/tento是刚才咱们自定义的Topic,k1a8gKQzYMD为产品的ProductKey,ejP0IrtXk49UIMz7Wsn1为服务端设备的DeviceName,点击发布就完成了这个解析器,解析器一定要点击启动才会生效。
测试
开两个设备模拟器
真实设备端
服务器端
服务器端订阅刚才自定义的Topic
点击订阅之后,右边日志会显示订阅的Topic (订阅这一步不能省)
再回到真实设备的模拟器(服务器模拟器不要关,开两个网页),输入温度湿度数据之后,点击提交,模拟真实设备提交数据。
真实设备提交数据之后,刚才服务器设备的模拟器订阅了这个消息,就会收到消息。
消息中包括刚才真实设备上传的温度湿度,还包括真实设备的deviceName,这样如果多个真实设备上传数据,全部流转到服务器端,服务器端就是知道真实设备是哪一个,上传的数据是多少。
received topic=/k1a8gKQzYMD/ejP0IrtXk49UIMz7Wsn1/user/tento, payload={"deviceType":"CurrentTemperature","iotId":"oA3PB1m7zuHCRWcxt1ljk1a8g0","requestId":"1726044789941","checkFailedData":{},"productKey":"k1a8gKQzYMD","gmtCreate":1726044790060,"deviceName":"oA3PB1m7zuHCRWcxt1lj","items":{"CurrentHumidity":{"time":1726044790054,"value":36},"CurrentTemperature":{"time":1726044790054,"value":86}}}
java代码实现服务端订阅
导入依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.0-android</version>
</dependency>
计算服务端设备的Mqtt建联参数
查看服务端设备的三元组信息,通过下面代码计算得到,按照自己的设备更改代码中三元组信息
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.*;
class MqttPostPropertyMessageListener implements IMqttMessageListener {
@Override
public void messageArrived(String var1, MqttMessage var2) throws Exception {
System.out.println("reply topic : " + var1);
System.out.println("reply payload: " + var2.toString());
}
}
public class App
{
public static void main( String[] args )
{
String productKey = "k1a8gKQzYMD";
String deviceName = "ejP0IrtXk49UIMz7Wsn1";
String deviceSecret = "f55479e91aab6cf09c404bfe697e6b67";
//计算Mqtt建联参数
MqttSign sign = new MqttSign();
sign.calculate(productKey, deviceName, deviceSecret);
System.out.println("username: " + sign.getUsername());
System.out.println("password: " + sign.getPassword());
System.out.println("clientid: " + sign.getClientid());
//使用Paho连接阿里云物联网平台
String port = "443";
String broker = "ssl://" + productKey + ".iot-as-mqtt.cn-shanghai.aliyuncs.com" + ":" + port;
MemoryPersistence persistence = new MemoryPersistence();
try{
//Paho Mqtt 客户端
MqttClient sampleClient = new MqttClient(broker, sign.getClientid(), persistence);
//Paho Mqtt 连接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setKeepAliveInterval(180);
connOpts.setUserName(sign.getUsername());
connOpts.setPassword(sign.getPassword().toCharArray());
sampleClient.connect(connOpts);
System.out.println("broker: " + broker + " Connected");
//Paho Mqtt 消息订阅
String topicReply = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post_reply";
sampleClient.subscribe(topicReply, new MqttPostPropertyMessageListener());
System.out.println("subscribe: " + topicReply);
Thread.sleep(2000);
//Paho Mqtt 断开连接
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
}catch (MqttException e) {
System.out.println("reason " + e.getReasonCode());
System.out.println("msg " + e.getMessage());
System.out.println("loc " + e.getLocalizedMessage());
System.out.println("cause " + e.getCause());
System.out.println("excep " + e);
e.printStackTrace();
}catch (InterruptedException e ) {
e.printStackTrace();
}
}
}
输出信息
username: ejP0IrtXk49UIMz7Wsn1&k1a8gKQzYMD
password: 63344fb712e72aaa240b3bdb400410f615ad006acc4db8c31171d9d17066caec
clientid: k1a8gKQzYMD.ejP0IrtXk49UIMz7Wsn1|timestamp=1726046809430,_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|
broker: ssl://k1a8gKQzYMD.iot-as-mqtt.cn-shanghai.aliyuncs.com:443 Connected
subscribe: /sys/k1a8gKQzYMD/ejP0IrtXk49UIMz7Wsn1/thing/event/property/post_reply
Disconnected
将输出的属性信息填入下面代码中(username,password,clientid,broker),代码中
client.subscribe(“/k1a8gtWJPeD/ixWeHZp2Xrx12zeNySdn/user/test”);要注意替换为自定义的Topic,和解析脚本中的一致
package com.aliyun.iot;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttClientSample {
/**
@Resource
DeviceService deviceService1;
@Resource
SensorService sensorService1;
private static DeviceService deviceService;
private static SensorService sensorService;
@PostConstruct
public void init() {
deviceService = deviceService1;
sensorService = sensorService1;
}
*/
//静态方法中使用@Resource注解的方式
public static void main(String[] args) throws Exception {
String broker = "ssl://k1a8gKQzYMD.iot-as-mqtt.cn-shanghai.aliyuncs.com:443";
// 替换为你的MQTT代理地址和端口
String clientId = "k1a8gKQzYMD.ejP0IrtXk49UIMz7Wsn1|timestamp=1726046809430,_v=paho-java-1.0.0,securemode=2,signmethod=hmacsha256|";
// 客户端ID
String username = "ejP0IrtXk49UIMz7Wsn1&k1a8gKQzYMD";
// 用户名
String password = "63344fb712e72aaa240b3bdb400410f615ad006acc4db8c31171d9d17066caec";
// 密码
MqttClient client = new MqttClient(broker, clientId);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost: " + cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String s = new String(message.getPayload());
JsonElement element = JsonParser.parseString(s);
JsonObject obj = element.getAsJsonObject();
String deviceName = obj.getAsJsonPrimitive("deviceName").getAsString();
JsonObject items = obj.getAsJsonObject("items");
float currentHumidityValue = items.getAsJsonObject("CurrentHumidity").getAsJsonPrimitive("value").getAsFloat();
float currentTemperatureValue = items.getAsJsonObject("CurrentTemperature").getAsJsonPrimitive("value").getAsFloat();
System.out.println(deviceName + " " + currentHumidityValue + " " + currentTemperatureValue);
//可以写一些数据库操作
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Delivery complete");
}
});
client.connect(options);
// 替换为传感器设备上传数据的Topic
client.subscribe("/k1a8gKQzYMD/ejP0IrtXk49UIMz7Wsn1/user/tento");
System.out.println("Connected and subscribed");
}
}
运行MqttClientSample代码
在阿里云平台上开启真实设备的模拟器,模拟真实设备上传数据到阿里云平台,发送85,21
java后台收到数据,这样实现的就是当阿里云物联网平台数据更新后后端实时获取更新的数据的操作,实现MQTT发布订阅机制。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
上一篇:在Java中使用Redis
下一篇:PageHelper 解析及实现原理