MQTT协议-Eclipse Paho Java Client源码分析(二)


前文讲到主要从五个点来进行源码学习

  • 连接broker
  • 创建主题
  • 发布主题内容
  • 订阅主题
  • 接收主题内容

连接broker

上文写过的简单例子可以知道首先调用的是MqttClient的connect方法

image-20220517140035502

而aClient是MqttAsyncClient所以调用的是MqttAsyncClient的connect方法,最终指向都是如下(代码太长就不贴图了)

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException {
        if (this.comms.isConnected()) {
            throw ExceptionHelper.createMqttException(32100);
        } else if (this.comms.isConnecting()) {
            throw new MqttException(32110);
        } else if (this.comms.isDisconnecting()) {
            throw new MqttException(32102);
        } else if (this.comms.isClosed()) {
            throw new MqttException(32111);
        } else {
            if (options == null) {
                options = new MqttConnectOptions();
            }

            this.connOpts = options;
            this.userContext = userContext;
            boolean automaticReconnect = options.isAutomaticReconnect();
            log.fine(CLASS_NAME, "connect", "103", new Object[]{options.isCleanSession(), new Integer(options.getConnectionTimeout()), new Integer(options.getKeepAliveInterval()), options.getUserName(), options.getPassword() == null ? "[null]" : "[notnull]", options.getWillMessage() == null ? "[null]" : "[notnull]", userContext, callback});
            this.comms.setNetworkModules(this.createNetworkModules(this.serverURI, options));
            this.comms.setReconnectCallback(new MqttAsyncClient.MqttReconnectCallback(automaticReconnect));
            MqttToken userToken = new MqttToken(this.getClientId());
            ConnectActionListener connectActionListener = new ConnectActionListener(this, this.persistence, this.comms, options, userToken, userContext, callback, this.reconnecting);
            userToken.setActionCallback(connectActionListener);
            userToken.setUserContext(this);
            if (this.mqttCallback instanceof MqttCallbackExtended) {
                connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
            }

            this.comms.setNetworkModuleIndex(0);
            connectActionListener.connect();
            return userToken;
        }
    }

首先调用了MqttAsyncClient中创建的ClientComms对象中isConnected、isConnecting、isDisconnecting、isClosed四个方法(这样的方法存在6个)

image-20220517141018924

分别通过conState的值去判断Clinet当前的状态,如果是连接中,正在连接,正在断开,关闭这几个状态就会进入异常处理

image-20220517141059088

也就是正常连接时ClientComms.conState应该为3

image-20220517142838058

再往下判断options是否为null,如果为null会创建一个MqttConnectOptions对象,下面就会用到

之后再调用MqttConnectOptions#isAutomaticReconnect,返回automaticReconnect,默认为false

private boolean automaticReconnect = false;

image-20220517143241060

这个标志位的作用就是客户端断开是否会自动尝试重新连接,默认不进行重连

NetworkModules

首先是MqttAsyncClient#createNetworkModules方法来创建NetworkModule对象

protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)
            throws MqttException, MqttSecurityException {
        final String methodName = "createNetworkModules";
        // @TRACE 116=URI={0}
        log.fine(CLASS_NAME, methodName, "116", new Object[] { address });

        NetworkModule[] networkModules = null;
        String[] serverURIs = options.getServerURIs();
        String[] array = null;
        if (serverURIs == null) {
            array = new String[] { address };
        } else if (serverURIs.length == 0) {
            array = new String[] { address };
        } else {
            array = serverURIs;
        }

        networkModules = new NetworkModule[array.length];
        for (int i = 0; i < array.length; i++) {
            networkModules[i] = createNetworkModule(array[i], options);
        }

        log.fine(CLASS_NAME, methodName, "108");
        return networkModules;
    }

尝试从MqttConnectOptions中获取serverURL,如果获取不到会使用传入的serverURL,然后new 了一个实现NetworkModule接口的对象

image-20220517144523202

通过循环将serverURL数组中的值调用createNetworkModule然后存入networkModules返回。

看下createNetworkModule

private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
        final String methodName = "createNetworkModule";
        // @TRACE 115=URI={0}
        log.fine(CLASS_NAME,methodName, "115", new Object[] {address});

        NetworkModule netModule;
        SocketFactory factory = options.getSocketFactory();

        int serverURIType = MqttConnectOptions.validateURI(address);

        URI uri;
        try {
            uri = new URI(address);
            // If the returned uri contains no host and the address contains underscores,
            // then it's likely that Java did not parse the URI
            if(uri.getHost() == null && address.contains("_")){
                try {
                    final Field hostField = URI.class.getDeclaredField("host");
                    hostField.setAccessible(true);
                    // Get everything after the scheme://
                    String shortAddress = address.substring(uri.getScheme().length() + 3);
                    hostField.set(uri, getHostName(shortAddress));

                } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
                    throw ExceptionHelper.createMqttException(e.getCause());
                } 

            }
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Malformed URI: " + address + ", " + e.getMessage());
        }

        String host = uri.getHost();
        int port = uri.getPort(); // -1 if not defined

        switch (serverURIType) {
        case MqttConnectOptions.URI_TYPE_TCP :
            if (port == -1) {
                port = 1883;
            }
            if (factory == null) {
                factory = SocketFactory.getDefault();
            }
            else if (factory instanceof SSLSocketFactory) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }
            netModule = new TCPNetworkModule(factory, host, port, clientId);
            ((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
            break;
        case MqttConnectOptions.URI_TYPE_SSL:
            if (port == -1) {
                port = 8883;
            }
            SSLSocketFactoryFactory factoryFactory = null;
            if (factory == null) {
//              try {
                    factoryFactory = new SSLSocketFactoryFactory();
                    Properties sslClientProps = options.getSSLProperties();
                    if (null != sslClientProps)
                        factoryFactory.initialize(sslClientProps, null);
                    factory = factoryFactory.createSocketFactory(null);
//              }
//              catch (MqttDirectException ex) {
//                  throw ExceptionHelper.createMqttException(ex.getCause());
//              }
            }
            else if ((factory instanceof SSLSocketFactory) == false) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }

            // Create the network module...
            netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
            ((SSLNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
            ((SSLNetworkModule)netModule).setSSLHostnameVerifier(options.getSSLHostnameVerifier());
            // Ciphers suites need to be set, if they are available
            if (factoryFactory != null) {
                String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
                if (enabledCiphers != null) {
                    ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
                }
            }
            break;
        case MqttConnectOptions.URI_TYPE_WS:
            if (port == -1) {
                port = 80;
            }
            if (factory == null) {
                factory = SocketFactory.getDefault();
            }
            else if (factory instanceof SSLSocketFactory) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }
            netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
            ((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
            break;
        case MqttConnectOptions.URI_TYPE_WSS:
            if (port == -1) {
                port = 443;
            }
            SSLSocketFactoryFactory wSSFactoryFactory = null;
            if (factory == null) {
                wSSFactoryFactory = new SSLSocketFactoryFactory();
                    Properties sslClientProps = options.getSSLProperties();
                    if (null != sslClientProps)
                        wSSFactoryFactory.initialize(sslClientProps, null);
                    factory = wSSFactoryFactory.createSocketFactory(null);

            }
            else if ((factory instanceof SSLSocketFactory) == false) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
            }

            // Create the network module...
            netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
            ((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
            // Ciphers suites need to be set, if they are available
            if (wSSFactoryFactory != null) {
                String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);
                if (enabledCiphers != null) {
                    ((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
                }
            }
            break;
        default:
            // This shouldn't happen, as long as validateURI() has been called.
            log.fine(CLASS_NAME,methodName, "119", new Object[] {address});
            netModule = null;
        }
        return netModule;
    }

代码很多但是不难读,首先作用是工厂方法创建正确的网络模块。开始通过getSocketFactory获取工厂,默认为null,然后利用之前讲过的MqttConnectOptions#validateURI去判断serverURI的类型

  • tcp->0
  • ssl->1
  • local->2
  • ws->3
  • wss->4

然后创建URL对象

  • 如果address包含_或者URL对象获取Host失败,就会尝试利用反射已经MqttAsyncClient#getHostName方法去解析不正常的address
  • 正常就通过URL对象直接获取Host和Port

在通过不同的serverURL类型来进入不同的处理,这里以tcp为例讲一下(ssl和wss会因为加密多一些步骤)

首先如果port没有给出,会根据不同类型赋一个默认的值

  • tcp->1883
  • ssl->8883
  • ws->80
  • wss->443

然后SocketFactory#getDefault返回环境默认套接字工厂的副本,在利用这个套接字去创建TCP的网络模块TCPNetworkModule对象去返回

然后ClientComms#setNetworkModules获取到TCPNetworkModule并赋值给networkModules

回到org.eclipse.paho.client.mqttv3.MqttAsyncClient#connect

networkModules之后将automaticReconnect值作为参数传入(默认为false)创建MqttReconnectCallback对象,然后赋值给CommsCallback的reconnectInternalCallback参数

ConnectActionListener

通过ClientId创建了MqttToken,MqttToken是提供一种机制来跟踪异步操作的完成情况(这里不着重讲了)。

再往下创建ConnectActionListener对象,用来处理 AsyncClient 到可用 URL 之一的连接。

在创建客户端时作提供单一URL ,或者作为连接选项中的列表提供,尝试连接到列表中的每个 URL,直到连接尝试成功或尝试了所有 URL

此类使用自己的 onSuccess 和 onFailure 回调,而不是用户提供的回调

如果连接成功,则通知用户令牌并调用 用户的onSuccess 回调函数

image-20220613145552470

如果连接失败,则尝试列表中的另一个 URL

image-20220613145804411

如果已经没有URL则通知用户令牌并调用 onFailure 回调

image-20220613145819629

回到org.eclipse.paho.client.mqttv3.MqttAsyncClient#connect

调用了MqttToken#setActionCallback来将ConnectActionListener对象注册为侦听器以在操作完成时收到通知。

再调用MqttToken#setUserContext上下文来存储MqttAsyncClient对象。

后面是一个可选项

 If we are using the MqttCallbackExtended, set it on the
// connectActionListener
if (this.mqttCallback instanceof MqttCallbackExtended) {
            connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
        }

如果使用 MqttCallbackExtended,请将其设置在connectActionListener里,而MqttCallbackExtended是用来扩展MqttCallback以允许新的回调而不破坏现有应用程序的 API实现此接口的类可以在两种类型的客户端上注册:IMqttClient.setCallback(MqttCallback)IMqttAsyncClient.setCallback(MqttCallback)

之后ClientComms#setNetworkModuleIndex设置了网络模块的索引

然后调用ConnectActionListener#connect(这里才是连接的开始),也就是上面创建的ConnectActionListener对象

image-20220613150304274

MqttToken已经说过了,而org.eclipse.paho.client.mqttv3.MqttClientPersistence#open是用来初始化持久存储,在之前MqttClient类对象的创建里指定MqttClient它将用于持久化 QoS 1 和 2 消息。

之后是参数的配置1.是否会话维持2.设置MQTT版本

然后调用了org.eclipse.paho.client.mqttv3.internal.ClientComms#connect,如果失败调用onFailure这里前文讲了

image-20220613151142241

漆面是通过MqttConnectOptions传递进来的参数来创建MqttConnect对象,然后进行一些配置设置,基本上前文都涉及到了。

ConnectBG

这里重点看一下连接的实现org.eclipse.paho.client.mqttv3.internal.ClientComms.ConnectBG#start

ConnectBG是实现了Runnable的类,运行在线程池中

image-20220613152411548

这里看一下线程体的run方法

image-20220613152521599

首先看关键主题内容,启动网络模块,发起网络连接

NetworkModule networkModule = networkModules[networkModuleIndex]; 
networkModule.start(); 

进入到org.eclipse.paho.client.mqttv3.internal.NetworkModule#start

四个实现类

image-20220613152755964

这里我们进入TCP分析

image-20220613152818845

可以看到调用了socket去实现连接

连接完成后,启动receiver和,负责从broker接收消息以及向broker发送消息,两个类都实现了Runnable的类,运行在线程池中,直接看run函数

receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());     
sender.start("MQTT Snd: "+getClient().getClientId(), executorService); 

receiver收到消息后,响应消息的消息类型为MqttAck,由于CONACK数据包是MqttAck类型,且token不为null,故会执行clientState.notifyReceivedAck函数.

public void run() {
        recThread = Thread.currentThread();
        recThread.setName(threadName);
        final String methodName = "run";
        MqttToken token = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }

        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME,methodName,"852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // instanceof checks if message is null
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token!=null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                            clientState.notifyReceivedAck((MqttAck)message);
                        }
                    } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                        //This is an ack for a message we no longer have a ticket for.
                        //This probably means we already received this message and it's being send again
                        //because of timeouts, crashes, disconnects, restarts etc.
                        //It should be safe to ignore these unexpected messages.
                        log.fine(CLASS_NAME, methodName, "857");
                    } else {
                        // It its an ack and there is no token then something is not right.
                        // An ack should always have a token assoicated with it.
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    if (message != null) {
                        // A new message has arrived
                        clientState.notifyReceivedMsg(message);
                    }
                }
            }
            catch (MqttException ex) {
                //@TRACE 856=Stopping, MQttException
                log.fine(CLASS_NAME,methodName,"856",null,ex);
                running = false;
                // Token maybe null but that is handled in shutdown
                clientComms.shutdownConnection(token, ex);
            }
            catch (IOException ioe) {
                //@TRACE 853=Stopping due to IOException
                log.fine(CLASS_NAME,methodName,"853");

                running = false;
                // An EOFException could be raised if the broker processes the
                // DISCONNECT and ends the socket before we complete. As such,
                // only shutdown the connection if we're not already shutting down.
                if (!clientComms.isDisconnecting()) {
                    clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe));
                }
            }
            finally {
                receiving = false;
                runningSemaphore.release();
            }
        }

        //@TRACE 854=<
        log.fine(CLASS_NAME,methodName,"854");
    }

notifyReceivedAck函数中,处理各种broker返回消息,而连接消息处理最后会到connected()连接完成的方法中,该方法设置连接完成状态以及开始发送心跳。

protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
        final String methodName = "notifyReceivedMsg";
        this.lastInboundActivity = System.currentTimeMillis();

        // @TRACE 651=received key={0} message={1}
        log.fine(CLASS_NAME, methodName, "651", new Object[] {
                new Integer(message.getMessageId()), message });

        if (!quiescing) {
            if (message instanceof MqttPublish) {
                MqttPublish send = (MqttPublish) message;
                switch (send.getMessage().getQos()) {
                case 0:
                case 1:
                    if (callback != null) {
                        callback.messageArrived(send);
                    }
                    break;
                case 2:
                    persistence.put(getReceivedPersistenceKey(message),
                            (MqttPublish) message);
                    inboundQoS2.put(new Integer(send.getMessageId()), send);
                    this.send(new MqttPubRec(send), null);
                    break;

                default:
                    //should NOT reach here
                }
            } else if (message instanceof MqttPubRel) {
                MqttPublish sendMsg = (MqttPublish) inboundQoS2
                        .get(new Integer(message.getMessageId()));
                if (sendMsg != null) {
                    if (callback != null) {
                        callback.messageArrived(sendMsg);
                    }
                } else {
                    // Original publish has already been delivered.
                    MqttPubComp pubComp = new MqttPubComp(message
                            .getMessageId());
                    this.send(pubComp, null);
                }
            }
        }
    }

sender不断循环从clientState获取待发送的消息然后通过org.eclipse.paho.client.mqttv3.internal.ClientState#notifySent将 MQTT连接的消息发送出去

public void run() {
        sendThread = Thread.currentThread();
        sendThread.setName(threadName);
        final String methodName = "run";
        MqttWireMessage message = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }

        try {
            while (running && (out != null)) {
                try {
                    message = clientState.get();
                    if (message != null) {
                        //@TRACE 802=network send key={0} msg={1}
                        log.fine(CLASS_NAME,methodName,"802", new Object[] {message.getKey(),message});

                        if (message instanceof MqttAck) {
                            out.write(message);
                            out.flush();
                        } else {
                            MqttToken token = tokenStore.getToken(message);
                            // While quiescing the tokenstore can be cleared so need
                            // to check for null for the case where clear occurs
                            // while trying to send a message.
                            if (token != null) {
                                synchronized (token) {
                                    out.write(message);
                                    try {
                                        out.flush();
                                    } catch (IOException ex) {
                                        // The flush has been seen to fail on disconnect of a SSL socket
                                        // as disconnect is in progress this should not be treated as an error
                                        if (!(message instanceof MqttDisconnect)) {
                                            throw ex;
                                        }
                                    }
                                    clientState.notifySent(message);
                                }
                            }
                        }
                    } else { // null message
                        //@TRACE 803=get message returned null, stopping}
                        log.fine(CLASS_NAME,methodName,"803");

                        running = false;
                    }
                } catch (MqttException me) {
                    handleRunException(message, me);
                } catch (Exception ex) {
                    handleRunException(message, ex);
                }
            } // end while
        } finally {
            running = false;
            runningSemaphore.release();
        }

        //@TRACE 805=<
        log.fine(CLASS_NAME, methodName,"805");

    }

然后启动回调监听

callback.start("MQTT Call: "+getClient().getClientId(), executorService);

最后调用internalSend发送mqtt的CONNECT数据包

internalSend(conPacket, conToken);

image-20220613154539660

org.eclipse.paho.client.mqttv3.internal.ClientState#send来进行发送

单单连接broker的流程分析就十分繁琐,剩下的流程分析创建主题和发布主题内容是一个入口,订阅主题和接收主题内容也是一个入口,这里给出流程的入口,感兴趣可以一步一步调试分析

  • 创建主题&发布主题内容
  • 入口函数:org.eclipse.paho.client.mqttv3.MqttAsyncClient#publish()
  • 订阅主题&接收主题内容
  • 入口函数:org.eclipse.paho.client.mqttv3.MqttAsyncClient#subscribe()

MQTT安全

直接通过实例来了解一下MQTT安全。由于MQTT运用也比较新,而java库Eclipse Paho Java Client相关漏洞就更少,对于他本身并没有什么漏洞,而是有两个MQTT信息接收之后存储输出不当的例子。

分别是hivemq(CVE-2020-13821)和apache-artemis(CVE-2020-13932)

这里使用mosquitto作为客户端去实现

hivemq(CVE-2020-13821)

这里是利用clintid来进行攻击

image-20220509155307161

连接broker

image-20220509155402873

刷新snapshot,没有过滤直接打印了客户端的clintid导致xss

image-20220509155428340

apache-artemis(CVE-2020-13932)

访问

image-20220509170516354

构造恶意的clientid或topics名字来攻击控制台

<img src=x onerror=alert(1)>

image-20220509172749949

这里通过主题名称来进行攻击,在Diagram模块将未过滤的主题名称直接打印导致xss

image-20220509172734304

上面两个例子虽然在MQTT协议中都不属于客户端而是Broker,但是也可以看到在MQTT中web漏洞的重点是信息的交互

如果应用了客户端,用来连接broker以及接收和发送主题信息

  • 发送是自己控制所以不可控的就是接收的主题信息,就要做好过滤以及减少可能的危险操作

如果是应用的broker,用来处理连接的信息以及客户端之间交互的主题信息

  • 那么连接信息和客户端之间交互的信息都是不可控的,更要做好充足的过滤以及防护手段

在web应用层对于MQTT协议来说,更多的漏洞是在于你对信息的处理不当

除了以上两个项目还有很多使用mqtt客户端或broker的物联网项目如

  • wso2
  • thingsboard

也可以根据信息获取之后的利用来进行漏洞挖掘

参考

评论

6right

这个人很懒,没有留下任何介绍

随机分类

其他 文章:95 篇
Web安全 文章:248 篇
CTF 文章:62 篇
网络协议 文章:18 篇
逆向安全 文章:70 篇

扫码关注公众号

WeChat Offical Account QRCode

最新评论

Yukong

🐮皮

H

HHHeey

好的,谢谢师傅的解答

Article_kelp

a类中的变量secret_class_var = "secret"是在merge

H

HHHeey

secret_var = 1 def test(): pass

H

hgsmonkey

tql!!!

目录