Jim Flynn / Mbed OS aws-iot-device-sdk-mbed-c
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers aws_iot_mqtt_client_subscribe.c Source File

aws_iot_mqtt_client_subscribe.c

Go to the documentation of this file.
00001 /*
00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
00003 *
00004 * Licensed under the Apache License, Version 2.0 (the "License").
00005 * You may not use this file except in compliance with the License.
00006 * A copy of the License is located at
00007 *
00008 * http://aws.amazon.com/apache2.0
00009 *
00010 * or in the "license" file accompanying this file. This file is distributed
00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
00012 * express or implied. See the License for the specific language governing
00013 * permissions and limitations under the License.
00014 */
00015 
00016 // Based on Eclipse Paho.
00017 /*******************************************************************************
00018  * Copyright (c) 2014 IBM Corp.
00019  *
00020  * All rights reserved. This program and the accompanying materials
00021  * are made available under the terms of the Eclipse Public License v1.0
00022  * and Eclipse Distribution License v1.0 which accompany this distribution.
00023  *
00024  * The Eclipse Public License is available at
00025  *    http://www.eclipse.org/legal/epl-v10.html
00026  * and the Eclipse Distribution License is available at
00027  *   http://www.eclipse.org/org/documents/edl-v10.php.
00028  *
00029  * Contributors:
00030  *    Ian Craggs - initial API and implementation and/or initial documentation
00031  *******************************************************************************/
00032 
00033 /**
00034  * @file aws_iot_mqtt_client_subscribe.c
00035  * @brief MQTT client subscribe API definitions
00036  */
00037 
00038 #ifdef __cplusplus
00039 extern "C" {
00040 #endif
00041 
00042 #include "aws_iot_mqtt_client_common_internal.h"
00043 
00044 /**
00045   * Serializes the supplied subscribe data into the supplied buffer, ready for sending
00046   * @param pTxBuf the buffer into which the packet will be serialized
00047   * @param txBufLen the length in bytes of the supplied buffer
00048   * @param dup unsigned char - the MQTT dup flag
00049   * @param packetId uint16_t - the MQTT packet identifier
00050   * @param topicCount - number of members in the topicFilters and reqQos arrays
00051   * @param pTopicNameList - array of topic filter names
00052   * @param pTopicNameLenList - array of length of topic filter names
00053   * @param pRequestedQoSs - array of requested QoS
00054   * @param pSerializedLen - the length of the serialized data
00055   *
00056   * @return An IoT Error Type defining successful/failed operation
00057   */
00058 static IoT_Error_t _aws_iot_mqtt_serialize_subscribe(unsigned char *pTxBuf, size_t txBufLen,
00059                                                      unsigned char dup, uint16_t packetId, uint32_t topicCount,
00060                                                      const char **pTopicNameList, uint16_t *pTopicNameLenList,
00061                                                      QoS *pRequestedQoSs, uint32_t *pSerializedLen) {
00062     unsigned char *ptr;
00063     uint32_t itr, rem_len;
00064     IoT_Error_t rc;
00065     MQTTHeader header = {0};
00066 
00067     FUNC_ENTRY;
00068     if(NULL == pTxBuf || NULL == pSerializedLen) {
00069         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00070     }
00071 
00072     ptr = pTxBuf;
00073     rem_len = 2; /* packetId */
00074 
00075     for(itr = 0; itr < topicCount; ++itr) {
00076         rem_len += (uint32_t) (pTopicNameLenList[itr] + 2 + 1); /* topic + length + req_qos */
00077     }
00078 
00079     if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) {
00080         FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR);
00081     }
00082 
00083     rc = aws_iot_mqtt_internal_init_header(&header, SUBSCRIBE, QOS1, dup, 0);
00084     if(AWS_SUCCESS != rc) {
00085         FUNC_EXIT_RC(rc);
00086     }
00087     /* write header */
00088     aws_iot_mqtt_internal_write_char(&ptr, header.byte);
00089 
00090     /* write remaining length */
00091     ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len);
00092 
00093     aws_iot_mqtt_internal_write_uint_16(&ptr, packetId);
00094 
00095     for(itr = 0; itr < topicCount; ++itr) {
00096         aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicNameList[itr], pTopicNameLenList[itr]);
00097         aws_iot_mqtt_internal_write_char(&ptr, (unsigned char) pRequestedQoSs[itr]);
00098     }
00099 
00100     *pSerializedLen = (uint32_t) (ptr - pTxBuf);
00101 
00102     FUNC_EXIT_RC(AWS_SUCCESS);
00103 }
00104 
00105 /**
00106   * Deserializes the supplied (wire) buffer into suback data
00107   * @param pPacketId returned integer - the MQTT packet identifier
00108   * @param maxExpectedQoSCount - the maximum number of members allowed in the grantedQoSs array
00109   * @param pGrantedQoSCount returned uint32_t - number of members in the grantedQoSs array
00110   * @param pGrantedQoSs returned array of QoS type - the granted qualities of service
00111   * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field
00112   * @param rxBufLen the length in bytes of the data in the supplied buffer
00113   *
00114   * @return An IoT Error Type defining successful/failed operation
00115   */
00116 static IoT_Error_t _aws_iot_mqtt_deserialize_suback(uint16_t *pPacketId, uint32_t maxExpectedQoSCount,
00117                                                     uint32_t *pGrantedQoSCount, QoS *pGrantedQoSs,
00118                                                     unsigned char *pRxBuf, size_t rxBufLen) {
00119     unsigned char *curData, *endData;
00120     uint32_t decodedLen, readBytesLen;
00121     IoT_Error_t decodeRc;
00122     MQTTHeader header = {0};
00123 
00124     FUNC_ENTRY;
00125     if(NULL == pPacketId || NULL == pGrantedQoSCount || NULL == pGrantedQoSs) {
00126         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00127     }
00128 
00129     curData = pRxBuf;
00130     endData = NULL;
00131     decodeRc = FAILURE;
00132     decodedLen = 0;
00133     readBytesLen = 0;
00134 
00135     /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload
00136      * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9
00137      */
00138     if(5 > rxBufLen) {
00139         FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR);
00140     }
00141 
00142     header.byte = aws_iot_mqtt_internal_read_char(&curData);
00143     if(SUBACK != MQTT_HEADER_FIELD_TYPE(header.byte)) {
00144         FUNC_EXIT_RC(FAILURE);
00145     }
00146 
00147     /* read remaining length */
00148     decodeRc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen);
00149     if(AWS_SUCCESS != decodeRc) {
00150         FUNC_EXIT_RC(decodeRc);
00151     }
00152 
00153     curData += (readBytesLen);
00154     endData = curData + decodedLen;
00155     if(endData - curData < 2) {
00156         FUNC_EXIT_RC(FAILURE);
00157     }
00158 
00159     *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData);
00160 
00161     *pGrantedQoSCount = 0;
00162     while(curData < endData) {
00163         if(*pGrantedQoSCount > maxExpectedQoSCount) {
00164             FUNC_EXIT_RC(FAILURE);
00165         }
00166         pGrantedQoSs[(*pGrantedQoSCount)++] = (QoS) aws_iot_mqtt_internal_read_char(&curData);
00167     }
00168 
00169     FUNC_EXIT_RC(AWS_SUCCESS);
00170 }
00171 
00172 /* Returns MAX_MESSAGE_HANDLERS value if no free index is available */
00173 static uint32_t _aws_iot_mqtt_get_free_message_handler_index(AWS_IoT_Client *pClient) {
00174     uint32_t itr;
00175 
00176     FUNC_ENTRY;
00177 
00178     for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; itr++) {
00179         if(pClient->clientData.messageHandlers[itr].topicName == NULL) {
00180             break;
00181         }
00182     }
00183 
00184     FUNC_EXIT_RC((int)itr);
00185 }
00186 
00187 /**
00188  * @brief Subscribe to an MQTT topic.
00189  *
00190  * Called to send a subscribe message to the broker requesting a subscription
00191  * to an MQTT topic. This is the internal function which is called by the
00192  * subscribe API to perform the operation. Not meant to be called directly as
00193  * it doesn't do validations or client state changes
00194  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00195  * @warning pTopicName and pApplicationHandlerData need to be static in memory.
00196  *
00197  * @param pClient Reference to the IoT Client
00198  * @param pTopicName Topic Name to publish to. pTopicName needs to be static in memory since 
00199  *     no malloc are performed by the SDK
00200  * @param topicNameLen Length of the topic name
00201  * @param pApplicationHandler_t Reference to the handler function for this subscription
00202  * @param pApplicationHandlerData Point to data passed to the callback. 
00203  *    pApplicationHandlerData also needs to be static in memory  since no malloc are performed by the SDK
00204  *
00205  * @return An IoT Error Type defining successful/failed subscription
00206  */
00207 static IoT_Error_t _aws_iot_mqtt_internal_subscribe(AWS_IoT_Client *pClient, const char *pTopicName,
00208                                                     uint16_t topicNameLen, QoS qos,
00209                                                     pApplicationHandler_t pApplicationHandler,
00210                                                     void *pApplicationHandlerData) {
00211     uint16_t txPacketId, rxPacketId;
00212     uint32_t serializedLen, indexOfFreeMessageHandler, count;
00213     IoT_Error_t rc;
00214     awsTimer timer;
00215     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00216 
00217     FUNC_ENTRY;
00218     init_timer(&timer);
00219     countdown_ms(&timer, pClient->clientData.commandTimeoutMs);
00220 
00221     serializedLen = 0;
00222     count = 0;
00223     txPacketId = aws_iot_mqtt_get_next_packet_id(pClient);
00224     rxPacketId = 0;
00225 
00226     rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0,
00227                                            txPacketId, 1, &pTopicName, &topicNameLen, &qos, &serializedLen);
00228     if(AWS_SUCCESS != rc) {
00229         FUNC_EXIT_RC(rc);
00230     }
00231 
00232     indexOfFreeMessageHandler = _aws_iot_mqtt_get_free_message_handler_index(pClient);
00233     if(AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS <= indexOfFreeMessageHandler) {
00234         FUNC_EXIT_RC(MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR);
00235     }
00236 
00237     /* send the subscribe packet */
00238     rc = aws_iot_mqtt_internal_send_packet(pClient, serializedLen, &timer);
00239     if(AWS_SUCCESS != rc) {
00240         FUNC_EXIT_RC(rc);
00241     }
00242 
00243     /* wait for suback */
00244     rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer);
00245     if(AWS_SUCCESS != rc) {
00246         FUNC_EXIT_RC(rc);
00247     }
00248 
00249     /* Granted QoS can be 0, 1 or 2 */
00250     rc = _aws_iot_mqtt_deserialize_suback(&rxPacketId, 1, &count, grantedQoS, pClient->clientData.readBuf,
00251                                           pClient->clientData.readBufSize);
00252     if(AWS_SUCCESS != rc) {
00253         FUNC_EXIT_RC(rc);
00254     }
00255 
00256     /* TODO : Figure out how to test this before activating this check */
00257     //if(txPacketId != rxPacketId) {
00258     /* Different SUBACK received than expected. Return error
00259      * This can cause issues if the request timeout value is too small */
00260     //  return RX_MESSAGE_INVALID_ERROR;
00261     //}
00262 
00263     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicName =
00264             pTopicName;
00265     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicNameLen =
00266             topicNameLen;
00267     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandler =
00268             pApplicationHandler;
00269     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandlerData =
00270             pApplicationHandlerData;
00271     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].qos = qos;
00272 
00273     FUNC_EXIT_RC(AWS_SUCCESS);
00274 }
00275 
00276 /**
00277  * @brief Subscribe to an MQTT topic.
00278  *
00279  * Called to send a subscribe message to the broker requesting a subscription
00280  * to an MQTT topic. This is the outer function which does the validations and
00281  * calls the internal subscribe above to perform the actual operation.
00282  * It is also responsible for client state changes
00283  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00284  * @warning pTopicName and pApplicationHandlerData need to be static in memory.
00285  *
00286  * @param pClient Reference to the IoT Client
00287  * @param pTopicName Topic Name to publish to. pTopicName needs to be static in memory since 
00288  *     no malloc are performed by the SDK
00289  * @param topicNameLen Length of the topic name
00290  * @param pApplicationHandler_t Reference to the handler function for this subscription
00291  * @param pApplicationHandlerData Point to data passed to the callback. 
00292  *    pApplicationHandlerData also needs to be static in memory  since no malloc are performed by the SDK
00293  *
00294  * @return An IoT Error Type defining successful/failed subscription
00295  */
00296 IoT_Error_t aws_iot_mqtt_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen,
00297                                    QoS qos, pApplicationHandler_t pApplicationHandler, void *pApplicationHandlerData) {
00298     ClientState clientState;
00299     IoT_Error_t rc, subRc;
00300 
00301     FUNC_ENTRY;
00302 
00303     if(NULL == pClient || NULL == pTopicName || NULL == pApplicationHandler) {
00304         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00305     }
00306 
00307     if(!aws_iot_mqtt_is_client_connected(pClient)) {
00308         FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR);
00309     }
00310 
00311     clientState = aws_iot_mqtt_get_client_state(pClient);
00312     if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) {
00313         FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR);
00314     }
00315 
00316     rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS);
00317     if(AWS_SUCCESS != rc) {
00318         FUNC_EXIT_RC(rc);
00319     }
00320 
00321     subRc = _aws_iot_mqtt_internal_subscribe(pClient, pTopicName, topicNameLen, qos,
00322                                              pApplicationHandler, pApplicationHandlerData);
00323 
00324     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS, clientState);
00325     if(AWS_SUCCESS == subRc && AWS_SUCCESS != rc) {
00326         subRc = rc;
00327     }
00328 
00329     FUNC_EXIT_RC(subRc);
00330 }
00331 
00332 /**
00333  * @brief Subscribe to an MQTT topic.
00334  *
00335  * Called to send a subscribe message to the broker requesting a subscription
00336  * to an MQTT topic.
00337  * This is the internal function which is called by the resubscribe API to perform the operation.
00338  * Not meant to be called directly as it doesn't do validations or client state changes
00339  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00340  *
00341  * @param pClient Reference to the IoT Client
00342  *
00343  * @return An IoT Error Type defining successful/failed subscription
00344  */
00345 static IoT_Error_t _aws_iot_mqtt_internal_resubscribe(AWS_IoT_Client *pClient) {
00346     uint16_t packetId;
00347     uint32_t len, count, existingSubCount, itr;
00348     IoT_Error_t rc;
00349     awsTimer timer;
00350     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00351 
00352     FUNC_ENTRY;
00353 
00354     packetId = 0;
00355     len = 0;
00356     count = 0;
00357     existingSubCount = _aws_iot_mqtt_get_free_message_handler_index(pClient);
00358 
00359     for(itr = 0; itr < existingSubCount; itr++) {
00360         if(pClient->clientData.messageHandlers[itr].topicName == NULL) {
00361             continue;
00362         }
00363 
00364         init_timer(&timer);
00365         countdown_ms(&timer, pClient->clientData.commandTimeoutMs);
00366 
00367         rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0,
00368                                                aws_iot_mqtt_get_next_packet_id(pClient), 1,
00369                                                &(pClient->clientData.messageHandlers[itr].topicName),
00370                                                &(pClient->clientData.messageHandlers[itr].topicNameLen),
00371                                                &(pClient->clientData.messageHandlers[itr].qos), &len);
00372         if(AWS_SUCCESS != rc) {
00373             FUNC_EXIT_RC(rc);
00374         }
00375 
00376         /* send the subscribe packet */
00377         rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer);
00378         if(AWS_SUCCESS != rc) {
00379             FUNC_EXIT_RC(rc);
00380         }
00381 
00382         /* wait for suback */
00383         rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer);
00384         if(AWS_SUCCESS != rc) {
00385             FUNC_EXIT_RC(rc);
00386         }
00387 
00388         /* Granted QoS can be 0, 1 or 2 */
00389         rc = _aws_iot_mqtt_deserialize_suback(&packetId, 1, &count, grantedQoS, pClient->clientData.readBuf,
00390                                               pClient->clientData.readBufSize);
00391         if(AWS_SUCCESS != rc) {
00392             FUNC_EXIT_RC(rc);
00393         }
00394     }
00395 
00396     FUNC_EXIT_RC(AWS_SUCCESS);
00397 }
00398 
00399 /**
00400  * @brief Subscribe to an MQTT topic.
00401  *
00402  * Called to send a subscribe message to the broker requesting a subscription
00403  * to an MQTT topic.
00404  * This is the outer function which does the validations and calls the internal resubscribe above
00405  * to perform the actual operation. It is also responsible for client state changes
00406  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00407  *
00408  * @param pClient Reference to the IoT Client
00409  *
00410  * @return An IoT Error Type defining successful/failed subscription
00411  */
00412 IoT_Error_t aws_iot_mqtt_resubscribe(AWS_IoT_Client *pClient) {
00413     IoT_Error_t rc, resubRc;
00414 
00415     FUNC_ENTRY;
00416 
00417     if(NULL == pClient) {
00418         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00419     }
00420 
00421     if(false == aws_iot_mqtt_is_client_connected(pClient)) {
00422         FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR);
00423     }
00424 
00425     if(CLIENT_STATE_CONNECTED_IDLE != aws_iot_mqtt_get_client_state(pClient)) {
00426         FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR);
00427     }
00428 
00429     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_IDLE,
00430                                        CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS);
00431     if(AWS_SUCCESS != rc) {
00432         FUNC_EXIT_RC(rc);
00433     }
00434 
00435     resubRc = _aws_iot_mqtt_internal_resubscribe(pClient);
00436 
00437     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS,
00438                                        CLIENT_STATE_CONNECTED_IDLE);
00439     if(AWS_SUCCESS == resubRc && AWS_SUCCESS != rc) {
00440         resubRc = rc;
00441     }
00442 
00443     FUNC_EXIT_RC(resubRc);
00444 }
00445 
00446 #ifdef __cplusplus
00447 }
00448 #endif
00449