MQTT协议
Message Queue Telemetry Transport,消息队列遥测传输
MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。基于发布/订阅模式的物联网通信协议,简单易实现、支持 QoS、报文小等特点,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计。由于以上轻量级的特点,是实现智能家居的首选传输协议。
具体的工作原理学习可以参考以下几个链接,这里不着重讲了
- http://www.taichi-maker.com/homepage/esp8266-nodemcu-iot/iot-tuttorial/mqtt-tutorial/
- https://zhuanlan.zhihu.com/p/89057819
- https://zhuanlan.zhihu.com/p/386994328
- http://www.taichi-maker.com/homepage/esp8266-nodemcu-iot/iot-tuttorial/mqtt-tutorial/
MQTT 协议在车联网中的应用
车辆数据主动上报
车载设备(T-box,车机等)作为车辆运行数据的收集者,基于固定频率将车内各类控制器、传感器等数据打包发送到平台端。
例如在用户同意的前提下,车辆在行驶过程中会将位置、车速、电量等信息按照固定频率上报云平台,云端应用基于这些数据,提供位置查找、超速提醒、电量提醒、地理围栏服务给终端用户使用。
- 车->broker->云平台->broker->车
平台请求下发后车辆数据上报
当云平台需要获取车辆的最新状态及信息时,可以主动下发命令要求车辆上报数据。
例如在诊断场景下,平台通过 MQTT 下发诊断命令至车辆,当车内各设备完成诊断操作后,会将诊断数据打包后上报至云平台,车辆诊断工程师将根据采集到的诊断数据对于车况进行整体的分析及问题定位。
- 平台->broker->车->broker->平台
平台指令下发
车辆远程控制是车联网业务中最常见、最典型的场景,各主机厂均在手机 App 中提供各种远控功能,例如远程启动、远程开车门、远程闪灯鸣笛等等。
此类场景下,手机 App 发送控制命令至云平台,平台应用经过权限检查、安全检查等一系列操作后,通过 MQTT 将命令下发至车辆执行,车辆端执行成功后,异步通知平台执行结果。
- app->云平台->broker->车
车辆客户端请求后平台数据下发
在 SDV(软件定义汽车)的大背景下,车内很多配置是可以做到动态变化的,例如数据采集规则、安全访问规则,所以车辆在点火启动后,会主动请求平台最新的相关配置,若两侧配置不一致,平台侧会下发最新的配置信息至车辆,车辆侧实时生效。
- 车->broker->云平台->broker->车
数据集成
将流经 EMQX 的海量车联网数据与业务系统连接是客户最重视的能力,EMQX 内置了规则引擎和数据桥接能力,可以将 MQTT 数据流式传输到 Kafka、各类 SQL / NoSQL / 时序数据库中,而实际项目中绝大多数客户都使用 Kafka 作为后端流处理组件。
Kafka 专注于数据的存储和读取,而 EMQX 则侧重于客户端和服务器之间的通信,EMQX 用来快速接收和处理来自大量物联网设备的消息,Kafka 可以收集并存储这些数据并将其发送给后端程序来分析和处理,这个架构是目前应用最广的数据集成方案。
- 车->broker->数据库
EMQX中http转发
EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器,承担MQTT协议中broker角色。
下面利用EMQX承担broker,java mqttv3 以及mqtt.fx承担client来简单模拟一个用户车上报车速云平台下发通知的过程。
车主动上报车速
这里是通过MQTT.fx来担任车的mqtt消息发送
连接配置
发布主题信息
接收主题信息
EMQX将MQTT信息转发至云平台
首先创建web服务资源
然后创建响应动作
最后编辑规则
即可将http请求进行转发
云平台通过MQTT信息返回消息
这里可以平台端直接订阅车端car_speed拿到信息然后返回
信息接收+处理
package at.dallermassl.ap.security.taint.webapp;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public class Test2 extends HttpServlet {
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
try {
String speed = request.getParameter("car_speed");
System.out.println("speed:"+speed);
String serverURI="tcp://192.168.100.146:1883";
String clientID="message_send";
MqttProducer mqttProducer = new MqttProducer(serverURI, clientID);
String msg ="";
if (Integer.parseInt(speed)>=90){
msg = "hypervelocity in time:" + System.currentTimeMillis() ;
mqttProducer.send("car/message", 1, true, msg);
}else {
msg = "normal in time:"+ System.currentTimeMillis();
mqttProducer.send("car/message", 1, true, msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}
}
MqttProducer
package at.dallermassl.ap.security.taint.webapp;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttProducer {
private MqttClient mqttClient;
public MqttProducer(String SERVER_URI,String CLIENT_ID){
try {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+ SERVER_URI);
mqttClient.connect(connOpts);
}catch (Exception ex){
ex.printStackTrace();
}
}
public void send(String topic, int qos, boolean retained, String payload) {
if (mqttClient == null){
return;
}
try {
mqttClient.publish(topic, payload.getBytes(), qos, retained);
} catch (MqttException e) {
System.out.println(e.getMessage());
}
}
}
最后成果
idea接收信息
客户端直接返回
车端连接
broker
emqx
云平台
package com.example.demo.mqtt;
public class Test2 {
public static void main(String[] args) {
String serverURI="tcp://192.168.100.146:1883";
String clientID="cloud_platform";
MqttSubscriber mqttSubscriber = new MqttSubscriber(serverURI, clientID);
mqttSubscriber.subscribe("car/speed");
}
}
MqttSubscriber
package com.example.demo.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MqttSubscriber {
private MqttClient mqttClient;
public MqttSubscriber(String SERVER_URI,String CLIENT_ID){
try {
MemoryPersistence persistence = new MemoryPersistence();
mqttClient = new MqttClient(SERVER_URI, CLIENT_ID,persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
mqttClient.connect(connOpts);
}catch (Exception ex){
ex.printStackTrace();
}
}
public void subscribe(String topic) {
String SERVER_URI = "tcp://192.168.100.146:1883";
String CLIENT_ID = "cloud_platform_send";
if (mqttClient == null){
return;
}
try {
mqttClient.subscribe(topic);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("连接丢失");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
System.out.println(topic);
System.out.println(mqttMessage.toString());
MqttProducer mqttProducer = new MqttProducer(SERVER_URI, CLIENT_ID);
String msg ="";
if (Integer.parseInt(mqttMessage.toString())>=90){
msg = "hypervelocity! in time:" + System.currentTimeMillis();
}else {
msg = "normal! in time:"+ System.currentTimeMillis();
}
mqttProducer.send("car/message", 1, true, msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("delivery isComplete:" + iMqttDeliveryToken.isComplete());
}
});
} catch (MqttException e) {
System.out.println(e.getMessage());
}
}
}
最终效果和http转发一致
idea返回信息
MQTT-v3 客户端库
在上面使用java代码中使用的是Eclipse Paho Java Client
Eclipse Paho Java Client (opens new window)是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。
从上面的简单的示例代码先学习一下一些概念
MqttClient
继承自IMqttClient接口,定义了如下方法
基本可以做到望字知意。
再回到MqttClient,仔细看构造函数,可以发现MqttClient对象中还创建了org.eclipse.paho.client.mqttv3.MqttAsyncClient
public MqttClient(String serverURI, String clientId) throws MqttException {
this(serverURI, clientId, new MqttDefaultFilePersistence());
}
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
this.aClient = null;
this.timeToWait = -1L;
this.aClient = new MqttAsyncClient(serverURI, clientId, persistence);
}
public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence, ScheduledExecutorService executorService) throws MqttException {
this.aClient = null;
this.timeToWait = -1L;
this.aClient = new MqttAsyncClient(serverURI, clientId, persistence, new ScheduledExecutorPingSender(executorService), executorService);
}
org.eclipse.paho.client.mqttv3.MqttAsyncClient中定义了mqtt客户端参数
实现自IMqttAsyncClient接口,方法如下(基本和IMqttClient一样)
查看package.html对两个接口的描述
{@link org.eclipse.paho.client.mqttv3.IMqttAsyncClient MqttAsyncClient} which provides a non-blocking interface
{@link org.eclipse.paho.client.mqttv3.IMqttClient MqttClient} where methods block until the operation has completed.
可以知道IMqttAsyncClient是非堵塞的而IMqttClient是堵塞的
再从文档看接口描述:
- IMqttClient 提供了一组方法,一旦 MQTT 操作完成,就会阻止并将控制权返回给应用程序。它是位于 IMqttAsyncClient 实现之上的薄层,主要用于与早期版本的 MQTT 客户端兼容。在大多数情况下,建议使用基于 IMqttAsyncClient 的客户端,它允许应用程序混合非阻塞和阻塞调用。
- 如果使用基于 IMqttAsyncClient 的客户端,则应用程序不限于使用一种样式,因为可以在同一应用程序中使用阻塞和非阻塞方法。如果使用基于 IMqttClient 的客户端,则应用程序只能使用阻塞方法。有关阻塞客户端的更多详细信息,请参阅 IMqttClient
显然,常用者是MqttAsyncClient ,实际调试看看
MqttClient中存储的只有MqttAsyncClient 对象
MqttAsyncClient
看一下IMqttAsyncClient 方法,首先是静态方法
static {
log = LoggerFactory.getLogger("org.eclipse.paho.client.mqttv3.internal.nls.logcat", CLASS_NAME);
reconnectDelay = 1000;
clientLock = new Object();
}
定义了log对象,mqtt重连延迟,以及锁,再来看构造方法
public MqttAsyncClient(String serverURI, String clientId) throws MqttException {
this(serverURI, clientId, new MqttDefaultFilePersistence());
}
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
this(serverURI, clientId, persistence, new TimerPingSender());
}
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
this(serverURI, clientId, persistence, pingSender, (ScheduledExecutorService)null);
}
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
this.reconnecting = false;
log.setResourceName(clientId);
if (clientId == null) {
throw new IllegalArgumentException("Null clientId");
} else {
int clientIdLength = 0;
for(int i = 0; i < clientId.length() - 1; ++i) {
if (Character_isHighSurrogate(clientId.charAt(i))) {
++i;
}
++clientIdLength;
}
if (clientIdLength > 65535) {
throw new IllegalArgumentException("ClientId longer than 65535 characters");
} else {
MqttConnectOptions.validateURI(serverURI);
this.serverURI = serverURI;
this.clientId = clientId;
this.persistence = persistence;
if (this.persistence == null) {
this.persistence = new MemoryPersistence();
}
this.executorService = executorService;
if (this.executorService == null) {
this.executorService = Executors.newScheduledThreadPool(10);
}
log.fine(CLASS_NAME, "MqttAsyncClient", "101", new Object[]{clientId, serverURI, persistence});
this.persistence.open(clientId, serverURI);
this.comms = new ClientComms(this, this.persistence, pingSender, this.executorService);
this.persistence.close();
this.topics = new Hashtable();
}
}
}
最终都指向org.eclipse.paho.client.mqttv3.MqttAsyncClient#MqttAsyncClient(java.lang.String, java.lang.String, org.eclipse.paho.client.mqttv3.MqttClientPersistence, org.eclipse.paho.client.mqttv3.MqttPingSender, java.util.concurrent.ScheduledExecutorService)
首先设置参数和日志,然后clientid不能为null也不能大于65535个字符,然后进入到连接步骤
调用了org.eclipse.paho.client.mqttv3.MqttConnectOptions#validateURI
这里的MqttConnectOptions是一个工具类,用来设置连接的参数以及调用一些连接要用到的方法
public static int validateURI(String srvURI) {
try {
URI vURI = new URI(srvURI);
if ("ws".equals(vURI.getScheme())) {
return 3;
} else if ("wss".equals(vURI.getScheme())) {
return 4;
} else if (vURI.getPath() != null && !vURI.getPath().isEmpty()) {
throw new IllegalArgumentException(srvURI);
} else if ("tcp".equals(vURI.getScheme())) {
return 0;
} else if ("ssl".equals(vURI.getScheme())) {
return 1;
} else if ("local".equals(vURI.getScheme())) {
return 2;
} else {
throw new IllegalArgumentException(srvURI);
}
} catch (URISyntaxException var2) {
throw new IllegalArgumentException(srvURI);
}
}
将serverUrl转为URL对象然后通过getScheme方法来判断使用的网络前缀,并没有获取返回值,只要是这五个网络前缀就不会进入到异常处理(相当于一份白名单处理)
然后进行赋值,serverURL和clientID不用说,persistence是mqtt自己的持久化机制,如果为空则创建org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
MqttAsyncClient不传入persistence时也会默认创建一个org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
也就是说MqttDefaultFilePersistence是默认方式,但是persistence为null时变为瞬时消息保存在内存中。如果在连接中MqttConnectOptions.setCleanSession(boolean)这个参数为true,那么client掉线下次重连,将清空内存persistence消息,如果为false,就会使用持久化机制去重传。(对应了mqtt协议中CONNECT消息里的CleanSession字段)
再往下创建了线程池,默认size为10,executorService就是返回的线程池。
然后日志记录,调用org.eclipse.paho.client.mqttv3.MqttClientPersistence#open,这里如果是MemoryPersistence则hashtable为空。如果是MqttDefaultFilePersistence则做一个持久化操作:将clientid和serverurl用-拼接然后保存在本地。
public void open(String clientId, String theConnection) throws MqttPersistenceException {
if (this.dataDir.exists() && !this.dataDir.isDirectory()) {
throw new MqttPersistenceException();
} else if (!this.dataDir.exists() && !this.dataDir.mkdirs()) {
throw new MqttPersistenceException();
} else if (!this.dataDir.canWrite()) {
throw new MqttPersistenceException();
} else {
StringBuffer keyBuffer = new StringBuffer();
int i;
char c;
for(i = 0; i < clientId.length(); ++i) {
c = clientId.charAt(i);
if (this.isSafeChar(c)) {
keyBuffer.append(c);
}
}
keyBuffer.append("-");
for(i = 0; i < theConnection.length(); ++i) {
c = theConnection.charAt(i);
if (this.isSafeChar(c)) {
keyBuffer.append(c);
}
}
synchronized(this) {
if (this.clientDir == null) {
String key = keyBuffer.toString();
this.clientDir = new File(this.dataDir, key);
if (!this.clientDir.exists()) {
this.clientDir.mkdir();
}
}
try {
this.fileLock = new FileLock(this.clientDir, ".lck");
} catch (Exception var6) {
}
this.restoreBackups(this.clientDir);
}
}
}
再往下创建org.eclipse.paho.client.mqttv3.internal.ClientComms对象
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException {
this.conState = 3;
this.client = client;
this.persistence = persistence;
this.pingSender = pingSender;
this.pingSender.init(this);
this.executorService = executorService;
this.tokenStore = new CommsTokenStore(this.getClient().getClientId());
this.callback = new CommsCallback(this);
this.clientState = new ClientState(persistence, this.tokenStore, this.callback, this, pingSender);
this.callback.setClientState(this.clientState);
log.setResourceName(this.getClient().getClientId());
}
这里讲一下pingSender,这是在创建MqttAsyncClient对象时创建的
用于在每个保持活动间隔向 MQTT 代理发送 ping 数据包的对象,也就是心跳时间
public void init(ClientComms comms) {
if (comms == null) {
throw new IllegalArgumentException("ClientComms cannot be null.");
} else {
this.comms = comms;
}
}
ini初始方法,传入了ClientComms,也就是当前客户端的内部状态。
然后分别创建了CommsTokenStore,CommsCallback,ClientState对象,这里大概讲一下三个类的作用
先讲CommsTokenStore,CommsTokenStore提供提供基于“token”的系统,用于跨多个线程存储和跟踪操作(多线程),所关联的Token将会使用saveToken保存,任何一个感兴趣的追踪状态通过getTkoen在wait方法或者使用监听进行回调。
而CommsCallback是Receiver 和外部 API 之间的桥梁。 用来桥接Receiver和外部api进行回调,它将转换MQTT message objects进行最终回调。
最重要的就是ClientState类,它是客户端的核心,它保存待处理和正在进行的消息的状态信息。已接受传递的消息在传递时会在多个对象之间移动。
1) 当客户端不运行时,消息存储在实现 MqttClientPersistent 接口的持久存储中。默认值为 MqttDefaultFilePersistencew,它在故障和系统重新启动时安全地存储消息。如果没有指定持久性,则回退到 MemoryPersistence,它将在实例化 Mqtt 客户端时保存和维护消息。 (CleanSession的实现)
2) 当客户端或特别是 ClientState 被实例化时
- 如果 QoS 2 PUBLISH 或 PUBREL 消息从持久存储将会读取到 outboundqos2 hashtable
- 如果 QoS 1 PUBLISH 消息从持久存储将会读取到 outboundqos1 hashtable
3) 在连接时,根据 messageid 将outbound hashtables中的messages顺序发送到 pendingMessages 或 pendingFlows vector。
1) 初始消息发布进入endingmessages buffer(等待数据队列)
2) PUBREL 进入pendingflows buffer (飞行窗口队列)
4) 发送线程同时从pendingflows 和pendingmessages 缓冲区读取消息。消息从pendingbuffer中删除,但仍保留在outbound hashtable中。
5) 接收线程
- 如果是 QoS 1 消息,则移除持久化已经从 outboundqos1 中删除 消息
- 如果 是QoS 2 PUBREC send PUBREL 消息,使用 PUBREL 更新 outboundqos2 条目并持久化。
- 如果 是QoS 2 PUBCOMP 消息,将会移除持久化和outboundqos2对应消息
流程分析
学习了这些概念后,后续可以再通过debug来从流程的角度进行学习
由于是客户端,主要从五个点来源码学习
- 连接broker
- 创建主题
- 发布主题内容
- 订阅主题
- 接收主题内容
后续会进行具体调试分析