前文讲到主要从五个点来进行源码学习
- 连接broker
- 创建主题
- 发布主题内容
- 订阅主题
- 接收主题内容
连接broker
上文写过的简单例子可以知道首先调用的是MqttClient的connect方法
而aClient是MqttAsyncClient所以调用的是MqttAsyncClient的connect方法,最终指向都是如下(代码太长就不贴图了)
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException {
if (this.comms.isConnected()) {
throw ExceptionHelper.createMqttException(32100);
} else if (this.comms.isConnecting()) {
throw new MqttException(32110);
} else if (this.comms.isDisconnecting()) {
throw new MqttException(32102);
} else if (this.comms.isClosed()) {
throw new MqttException(32111);
} else {
if (options == null) {
options = new MqttConnectOptions();
}
this.connOpts = options;
this.userContext = userContext;
boolean automaticReconnect = options.isAutomaticReconnect();
log.fine(CLASS_NAME, "connect", "103", new Object[]{options.isCleanSession(), new Integer(options.getConnectionTimeout()), new Integer(options.getKeepAliveInterval()), options.getUserName(), options.getPassword() == null ? "[null]" : "[notnull]", options.getWillMessage() == null ? "[null]" : "[notnull]", userContext, callback});
this.comms.setNetworkModules(this.createNetworkModules(this.serverURI, options));
this.comms.setReconnectCallback(new MqttAsyncClient.MqttReconnectCallback(automaticReconnect));
MqttToken userToken = new MqttToken(this.getClientId());
ConnectActionListener connectActionListener = new ConnectActionListener(this, this.persistence, this.comms, options, userToken, userContext, callback, this.reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
if (this.mqttCallback instanceof MqttCallbackExtended) {
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
}
this.comms.setNetworkModuleIndex(0);
connectActionListener.connect();
return userToken;
}
}
首先调用了MqttAsyncClient中创建的ClientComms对象中isConnected、isConnecting、isDisconnecting、isClosed四个方法(这样的方法存在6个)
分别通过conState的值去判断Clinet当前的状态,如果是连接中,正在连接,正在断开,关闭这几个状态就会进入异常处理
也就是正常连接时ClientComms.conState应该为3
再往下判断options是否为null,如果为null会创建一个MqttConnectOptions对象,下面就会用到
之后再调用MqttConnectOptions#isAutomaticReconnect,返回automaticReconnect,默认为false
private boolean automaticReconnect = false;
这个标志位的作用就是客户端断开是否会自动尝试重新连接,默认不进行重连
NetworkModules
首先是MqttAsyncClient#createNetworkModules方法来创建NetworkModule对象
protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)
throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModules";
// @TRACE 116=URI={0}
log.fine(CLASS_NAME, methodName, "116", new Object[] { address });
NetworkModule[] networkModules = null;
String[] serverURIs = options.getServerURIs();
String[] array = null;
if (serverURIs == null) {
array = new String[] { address };
} else if (serverURIs.length == 0) {
array = new String[] { address };
} else {
array = serverURIs;
}
networkModules = new NetworkModule[array.length];
for (int i = 0; i < array.length; i++) {
networkModules[i] = createNetworkModule(array[i], options);
}
log.fine(CLASS_NAME, methodName, "108");
return networkModules;
}
尝试从MqttConnectOptions中获取serverURL,如果获取不到会使用传入的serverURL,然后new 了一个实现NetworkModule接口的对象
通过循环将serverURL数组中的值调用createNetworkModule然后存入networkModules返回。
看下createNetworkModule
private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModule";
// @TRACE 115=URI={0}
log.fine(CLASS_NAME,methodName, "115", new Object[] {address});
NetworkModule netModule;
SocketFactory factory = options.getSocketFactory();
int serverURIType = MqttConnectOptions.validateURI(address);
URI uri;
try {
uri = new URI(address);
// If the returned uri contains no host and the address contains underscores,
// then it's likely that Java did not parse the URI
if(uri.getHost() == null && address.contains("_")){
try {
final Field hostField = URI.class.getDeclaredField("host");
hostField.setAccessible(true);
// Get everything after the scheme://
String shortAddress = address.substring(uri.getScheme().length() + 3);
hostField.set(uri, getHostName(shortAddress));
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
throw ExceptionHelper.createMqttException(e.getCause());
}
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Malformed URI: " + address + ", " + e.getMessage());
}
String host = uri.getHost();
int port = uri.getPort(); // -1 if not defined
switch (serverURIType) {
case MqttConnectOptions.URI_TYPE_TCP :
if (port == -1) {
port = 1883;
}
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new TCPNetworkModule(factory, host, port, clientId);
((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_SSL:
if (port == -1) {
port = 8883;
}
SSLSocketFactoryFactory factoryFactory = null;
if (factory == null) {
// try {
factoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
factoryFactory.initialize(sslClientProps, null);
factory = factoryFactory.createSocketFactory(null);
// }
// catch (MqttDirectException ex) {
// throw ExceptionHelper.createMqttException(ex.getCause());
// }
}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
// Create the network module...
netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
((SSLNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
((SSLNetworkModule)netModule).setSSLHostnameVerifier(options.getSSLHostnameVerifier());
// Ciphers suites need to be set, if they are available
if (factoryFactory != null) {
String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
case MqttConnectOptions.URI_TYPE_WS:
if (port == -1) {
port = 80;
}
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_WSS:
if (port == -1) {
port = 443;
}
SSLSocketFactoryFactory wSSFactoryFactory = null;
if (factory == null) {
wSSFactoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
wSSFactoryFactory.initialize(sslClientProps, null);
factory = wSSFactoryFactory.createSocketFactory(null);
}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
// Create the network module...
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
// Ciphers suites need to be set, if they are available
if (wSSFactoryFactory != null) {
String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
default:
// This shouldn't happen, as long as validateURI() has been called.
log.fine(CLASS_NAME,methodName, "119", new Object[] {address});
netModule = null;
}
return netModule;
}
代码很多但是不难读,首先作用是工厂方法创建正确的网络模块。开始通过getSocketFactory获取工厂,默认为null,然后利用之前讲过的MqttConnectOptions#validateURI去判断serverURI的类型
- tcp->0
- ssl->1
- local->2
- ws->3
- wss->4
然后创建URL对象
- 如果address包含_或者URL对象获取Host失败,就会尝试利用反射已经MqttAsyncClient#getHostName方法去解析不正常的address
- 正常就通过URL对象直接获取Host和Port
在通过不同的serverURL类型来进入不同的处理,这里以tcp为例讲一下(ssl和wss会因为加密多一些步骤)
首先如果port没有给出,会根据不同类型赋一个默认的值
- tcp->1883
- ssl->8883
- ws->80
- wss->443
然后SocketFactory#getDefault返回环境默认套接字工厂的副本,在利用这个套接字去创建TCP的网络模块TCPNetworkModule对象去返回
然后ClientComms#setNetworkModules获取到TCPNetworkModule并赋值给networkModules
回到org.eclipse.paho.client.mqttv3.MqttAsyncClient#connect
networkModules之后将automaticReconnect值作为参数传入(默认为false)创建MqttReconnectCallback对象,然后赋值给CommsCallback的reconnectInternalCallback参数
ConnectActionListener
通过ClientId创建了MqttToken,MqttToken是提供一种机制来跟踪异步操作的完成情况(这里不着重讲了)。
再往下创建ConnectActionListener对象,用来处理 AsyncClient 到可用 URL 之一的连接。
在创建客户端时作提供单一URL ,或者作为连接选项中的列表提供,尝试连接到列表中的每个 URL,直到连接尝试成功或尝试了所有 URL
此类使用自己的 onSuccess 和 onFailure 回调,而不是用户提供的回调
如果连接成功,则通知用户令牌并调用 用户的onSuccess 回调函数
如果连接失败,则尝试列表中的另一个 URL
如果已经没有URL则通知用户令牌并调用 onFailure 回调
回到org.eclipse.paho.client.mqttv3.MqttAsyncClient#connect
调用了MqttToken#setActionCallback来将ConnectActionListener对象注册为侦听器以在操作完成时收到通知。
再调用MqttToken#setUserContext上下文来存储MqttAsyncClient对象。
后面是一个可选项
If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
}
如果使用 MqttCallbackExtended,请将其设置在connectActionListener里,而MqttCallbackExtended是用来扩展MqttCallback
以允许新的回调而不破坏现有应用程序的 API实现此接口的类可以在两种类型的客户端上注册:IMqttClient.setCallback(MqttCallback)
和IMqttAsyncClient.setCallback(MqttCallback)
之后ClientComms#setNetworkModuleIndex设置了网络模块的索引
然后调用ConnectActionListener#connect(这里才是连接的开始),也就是上面创建的ConnectActionListener对象
MqttToken已经说过了,而org.eclipse.paho.client.mqttv3.MqttClientPersistence#open是用来初始化持久存储,在之前MqttClient类对象的创建里指定MqttClient
它将用于持久化 QoS 1 和 2 消息。
之后是参数的配置1.是否会话维持2.设置MQTT版本
然后调用了org.eclipse.paho.client.mqttv3.internal.ClientComms#connect,如果失败调用onFailure这里前文讲了
漆面是通过MqttConnectOptions传递进来的参数来创建MqttConnect对象,然后进行一些配置设置,基本上前文都涉及到了。
ConnectBG
这里重点看一下连接的实现org.eclipse.paho.client.mqttv3.internal.ClientComms.ConnectBG#start
ConnectBG是实现了Runnable的类,运行在线程池中
这里看一下线程体的run方法
首先看关键主题内容,启动网络模块,发起网络连接
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
进入到org.eclipse.paho.client.mqttv3.internal.NetworkModule#start
四个实现类
这里我们进入TCP分析
可以看到调用了socket去实现连接
连接完成后,启动receiver和,负责从broker接收消息以及向broker发送消息,两个类都实现了Runnable的类,运行在线程池中,直接看run函数
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
receiver收到消息后,响应消息的消息类型为MqttAck,由于CONACK数据包是MqttAck类型,且token不为null,故会执行clientState.notifyReceivedAck函数.
public void run() {
recThread = Thread.currentThread();
recThread.setName(threadName);
final String methodName = "run";
MqttToken token = null;
try {
runningSemaphore.acquire();
} catch (InterruptedException e) {
running = false;
return;
}
while (running && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME,methodName,"852");
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck)message);
}
} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
//This is an ack for a message we no longer have a ticket for.
//This probably means we already received this message and it's being send again
//because of timeouts, crashes, disconnects, restarts etc.
//It should be safe to ignore these unexpected messages.
log.fine(CLASS_NAME, methodName, "857");
} else {
// It its an ack and there is no token then something is not right.
// An ack should always have a token assoicated with it.
throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
} else {
if (message != null) {
// A new message has arrived
clientState.notifyReceivedMsg(message);
}
}
}
catch (MqttException ex) {
//@TRACE 856=Stopping, MQttException
log.fine(CLASS_NAME,methodName,"856",null,ex);
running = false;
// Token maybe null but that is handled in shutdown
clientComms.shutdownConnection(token, ex);
}
catch (IOException ioe) {
//@TRACE 853=Stopping due to IOException
log.fine(CLASS_NAME,methodName,"853");
running = false;
// An EOFException could be raised if the broker processes the
// DISCONNECT and ends the socket before we complete. As such,
// only shutdown the connection if we're not already shutting down.
if (!clientComms.isDisconnecting()) {
clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
}
}
finally {
receiving = false;
runningSemaphore.release();
}
}
//@TRACE 854=<
log.fine(CLASS_NAME,methodName,"854");
}
notifyReceivedAck函数中,处理各种broker返回消息,而连接消息处理最后会到connected()连接完成的方法中,该方法设置连接完成状态以及开始发送心跳。
protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
final String methodName = "notifyReceivedMsg";
this.lastInboundActivity = System.currentTimeMillis();
// @TRACE 651=received key={0} message={1}
log.fine(CLASS_NAME, methodName, "651", new Object[] {
new Integer(message.getMessageId()), message });
if (!quiescing) {
if (message instanceof MqttPublish) {
MqttPublish send = (MqttPublish) message;
switch (send.getMessage().getQos()) {
case 0:
case 1:
if (callback != null) {
callback.messageArrived(send);
}
break;
case 2:
persistence.put(getReceivedPersistenceKey(message),
(MqttPublish) message);
inboundQoS2.put(new Integer(send.getMessageId()), send);
this.send(new MqttPubRec(send), null);
break;
default:
//should NOT reach here
}
} else if (message instanceof MqttPubRel) {
MqttPublish sendMsg = (MqttPublish) inboundQoS2
.get(new Integer(message.getMessageId()));
if (sendMsg != null) {
if (callback != null) {
callback.messageArrived(sendMsg);
}
} else {
// Original publish has already been delivered.
MqttPubComp pubComp = new MqttPubComp(message
.getMessageId());
this.send(pubComp, null);
}
}
}
}
sender不断循环从clientState获取待发送的消息然后通过org.eclipse.paho.client.mqttv3.internal.ClientState#notifySent将 MQTT连接的消息发送出去
public void run() {
sendThread = Thread.currentThread();
sendThread.setName(threadName);
final String methodName = "run";
MqttWireMessage message = null;
try {
runningSemaphore.acquire();
} catch (InterruptedException e) {
running = false;
return;
}
try {
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
//@TRACE 802=network send key={0} msg={1}
log.fine(CLASS_NAME,methodName,"802", new Object[] {message.getKey(),message});
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
} else { // null message
//@TRACE 803=get message returned null, stopping}
log.fine(CLASS_NAME,methodName,"803");
running = false;
}
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
} // end while
} finally {
running = false;
runningSemaphore.release();
}
//@TRACE 805=<
log.fine(CLASS_NAME, methodName,"805");
}
然后启动回调监听
callback.start("MQTT Call: "+getClient().getClientId(), executorService);
最后调用internalSend发送mqtt的CONNECT数据包
internalSend(conPacket, conToken);
org.eclipse.paho.client.mqttv3.internal.ClientState#send来进行发送
单单连接broker的流程分析就十分繁琐,剩下的流程分析创建主题和发布主题内容是一个入口,订阅主题和接收主题内容也是一个入口,这里给出流程的入口,感兴趣可以一步一步调试分析
- 创建主题&发布主题内容
- 入口函数:org.eclipse.paho.client.mqttv3.MqttAsyncClient#publish()
- 订阅主题&接收主题内容
- 入口函数:org.eclipse.paho.client.mqttv3.MqttAsyncClient#subscribe()
MQTT安全
直接通过实例来了解一下MQTT安全。由于MQTT运用也比较新,而java库Eclipse Paho Java Client相关漏洞就更少,对于他本身并没有什么漏洞,而是有两个MQTT信息接收之后存储输出不当的例子。
分别是hivemq(CVE-2020-13821)和apache-artemis(CVE-2020-13932)
这里使用mosquitto作为客户端去实现
hivemq(CVE-2020-13821)
这里是利用clintid来进行攻击
连接broker
刷新snapshot,没有过滤直接打印了客户端的clintid导致xss
apache-artemis(CVE-2020-13932)
访问
构造恶意的clientid或topics名字来攻击控制台
<img src=x onerror=alert(1)>
这里通过主题名称来进行攻击,在Diagram模块将未过滤的主题名称直接打印导致xss
上面两个例子虽然在MQTT协议中都不属于客户端而是Broker,但是也可以看到在MQTT中web漏洞的重点是信息的交互
如果应用了客户端,用来连接broker以及接收和发送主题信息
- 发送是自己控制所以不可控的就是接收的主题信息,就要做好过滤以及减少可能的危险操作
如果是应用的broker,用来处理连接的信息以及客户端之间交互的主题信息
- 那么连接信息和客户端之间交互的信息都是不可控的,更要做好充足的过滤以及防护手段
在web应用层对于MQTT协议来说,更多的漏洞是在于你对信息的处理不当
除了以上两个项目还有很多使用mqtt客户端或broker的物联网项目如
- wso2
- thingsboard
也可以根据信息获取之后的利用来进行漏洞挖掘
参考
- mqttv3文档:https://www.eclipse.org/paho/files/javadoc/index.html
- mqttv3项目:https://github.com/eclipse/paho.mqtt.java
- apache-artemis漏洞环境:https://www.apache.org/dyn/closer.cgi?filename=activemq/activemq-artemis/2.22.0/apache-artemis-2.22.0-bin.tar.gz&action=download
- hivemq漏洞环境:docker pull hivemq/hivemq4:4.3.2