this is fork and i will modify for STM32

Fork of AWS-test by Pierre-Marie Ancèle

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers aws_iot_mqtt_client_subscribe.cpp Source File

aws_iot_mqtt_client_subscribe.cpp

00001 /*
00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
00003 *
00004 * Licensed under the Apache License, Version 2.0 (the "License").
00005 * You may not use this file except in compliance with the License.
00006 * A copy of the License is located at
00007 *
00008 * http://aws.amazon.com/apache2.0
00009 *
00010 * or in the "license" file accompanying this file. This file is distributed
00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
00012 * express or implied. See the License for the specific language governing
00013 * permissions and limitations under the License.
00014 */
00015 
00016 // Based on Eclipse Paho.
00017 /*******************************************************************************
00018  * Copyright (c) 2014 IBM Corp.
00019  *
00020  * All rights reserved. This program and the accompanying materials
00021  * are made available under the terms of the Eclipse Public License v1.0
00022  * and Eclipse Distribution License v1.0 which accompany this distribution.
00023  *
00024  * The Eclipse Public License is available at
00025  *    http://www.eclipse.org/legal/epl-v10.html
00026  * and the Eclipse Distribution License is available at
00027  *   http://www.eclipse.org/org/documents/edl-v10.php.
00028  *
00029  * Contributors:
00030  *    Ian Craggs - initial API and implementation and/or initial documentation
00031  *******************************************************************************/
00032 
00033 /**
00034  * @file aws_iot_mqtt_client_subscribe.c
00035  * @brief MQTT client subscribe API definitions
00036  */
00037 
00038 #ifdef __cplusplus
00039 extern "C" {
00040 #endif
00041 
00042 #include "aws_iot_mqtt_client_common_internal.h"
00043 
00044 /**
00045   * Serializes the supplied subscribe data into the supplied buffer, ready for sending
00046   * @param pTxBuf the buffer into which the packet will be serialized
00047   * @param txBufLen the length in bytes of the supplied buffer
00048   * @param dup unsigned char - the MQTT dup flag
00049   * @param packetId uint16_t - the MQTT packet identifier
00050   * @param topicCount - number of members in the topicFilters and reqQos arrays
00051   * @param pTopicNameList - array of topic filter names
00052   * @param pTopicNameLenList - array of length of topic filter names
00053   * @param pRequestedQoSs - array of requested QoS
00054   * @param pSerializedLen - the length of the serialized data
00055   *
00056   * @return An IoT Error Type defining successful/failed operation
00057   */
00058 static IoT_Error_t _aws_iot_mqtt_serialize_subscribe(unsigned char *pTxBuf, size_t txBufLen,
00059                                                      unsigned char dup, uint16_t packetId, uint32_t topicCount,
00060                                                      const char **pTopicNameList, uint16_t *pTopicNameLenList,
00061                                                      QoS *pRequestedQoSs, uint32_t *pSerializedLen) {
00062     unsigned char *ptr;
00063     uint32_t itr, rem_len;
00064     IoT_Error_t rc;
00065     MQTTHeader header = {0};
00066 
00067     FUNC_ENTRY;
00068     if(NULL == pTxBuf || NULL == pSerializedLen) {
00069         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00070     }
00071 
00072     ptr = pTxBuf;
00073     rem_len = 2; /* packetId */
00074 
00075     for(itr = 0; itr < topicCount; ++itr) {
00076         rem_len += (uint32_t) (pTopicNameLenList[itr] + 2 + 1); /* topic + length + req_qos */
00077     }
00078 
00079     if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) {
00080         FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR);
00081     }
00082 
00083     rc = aws_iot_mqtt_internal_init_header(&header, SUBSCRIBE, QOS1, dup, 0);
00084     if(IOT_SUCCESS != rc) {
00085         FUNC_EXIT_RC(rc);
00086     }
00087     /* write header */
00088     aws_iot_mqtt_internal_write_char(&ptr, header.byte);
00089 
00090     /* write remaining length */
00091     ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len);
00092 
00093     aws_iot_mqtt_internal_write_uint_16(&ptr, packetId);
00094 
00095     for(itr = 0; itr < topicCount; ++itr) {
00096         aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicNameList[itr], pTopicNameLenList[itr]);
00097         aws_iot_mqtt_internal_write_char(&ptr, (unsigned char) pRequestedQoSs[itr]);
00098     }
00099 
00100     *pSerializedLen = (uint32_t) (ptr - pTxBuf);
00101 
00102     FUNC_EXIT_RC(IOT_SUCCESS);
00103 }
00104 
00105 /**
00106   * Deserializes the supplied (wire) buffer into suback data
00107   * @param pPacketId returned integer - the MQTT packet identifier
00108   * @param maxExpectedQoSCount - the maximum number of members allowed in the grantedQoSs array
00109   * @param pGrantedQoSCount returned uint32_t - number of members in the grantedQoSs array
00110   * @param pGrantedQoSs returned array of QoS type - the granted qualities of service
00111   * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field
00112   * @param rxBufLen the length in bytes of the data in the supplied buffer
00113   *
00114   * @return An IoT Error Type defining successful/failed operation
00115   */
00116 static IoT_Error_t _aws_iot_mqtt_deserialize_suback(uint16_t *pPacketId, uint32_t maxExpectedQoSCount,
00117                                                     uint32_t *pGrantedQoSCount, QoS *pGrantedQoSs,
00118                                                     unsigned char *pRxBuf, size_t rxBufLen) {
00119     unsigned char *curData, *endData;
00120     uint32_t decodedLen, readBytesLen;
00121     IoT_Error_t decodeRc;
00122     MQTTHeader header = {0};
00123 
00124     FUNC_ENTRY;
00125     if(NULL == pPacketId || NULL == pGrantedQoSCount || NULL == pGrantedQoSs) {
00126         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00127     }
00128 
00129     curData = pRxBuf;
00130     endData = NULL;
00131     decodeRc = IOT_FAILURE;
00132     decodedLen = 0;
00133     readBytesLen = 0;
00134 
00135     /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload
00136      * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9
00137      */
00138     if(5 > rxBufLen) {
00139         FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR);
00140     }
00141 
00142     header.byte = aws_iot_mqtt_internal_read_char(&curData);
00143     if(SUBACK != header.bits.type) {
00144         FUNC_EXIT_RC(IOT_FAILURE);
00145     }
00146 
00147     /* read remaining length */
00148     decodeRc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen);
00149     if(IOT_SUCCESS != decodeRc) {
00150         FUNC_EXIT_RC(decodeRc);
00151     }
00152 
00153     curData += (readBytesLen);
00154     endData = curData + decodedLen;
00155     if(endData - curData < 2) {
00156         FUNC_EXIT_RC(IOT_FAILURE);
00157     }
00158 
00159     *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData);
00160 
00161     *pGrantedQoSCount = 0;
00162     while(curData < endData) {
00163         if(*pGrantedQoSCount > maxExpectedQoSCount) {
00164             FUNC_EXIT_RC(IOT_FAILURE);
00165         }
00166         pGrantedQoSs[(*pGrantedQoSCount)++] = (QoS) aws_iot_mqtt_internal_read_char(&curData);
00167     }
00168 
00169     FUNC_EXIT_RC(IOT_SUCCESS);
00170 }
00171 
00172 /* Returns MAX_MESSAGE_HANDLERS value if no free index is available */
00173 static uint32_t _aws_iot_mqtt_get_free_message_handler_index(AWS_IoT_Client *pClient) {
00174     uint32_t itr;
00175 
00176     FUNC_ENTRY;
00177 
00178     for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; itr++) {
00179         if(pClient->clientData.messageHandlers[itr].topicName == NULL) {
00180             break;
00181         }
00182     }
00183 
00184     FUNC_EXIT_RC(itr);
00185 }
00186 
00187 /**
00188  * @brief Subscribe to an MQTT topic.
00189  *
00190  * Called to send a subscribe message to the broker requesting a subscription
00191  * to an MQTT topic. This is the internal function which is called by the
00192  * subscribe API to perform the operation. Not meant to be called directly as
00193  * it doesn't do validations or client state changes
00194  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00195  *
00196  * @param pClient Reference to the IoT Client
00197  * @param pTopicName Topic Name to publish to
00198  * @param topicNameLen Length of the topic name
00199  * @param pApplicationHandler_t Reference to the handler function for this subscription
00200  *
00201  * @return An IoT Error Type defining successful/failed subscription
00202  */
00203 static IoT_Error_t _aws_iot_mqtt_internal_subscribe(AWS_IoT_Client *pClient, const char *pTopicName,
00204                                                     uint16_t topicNameLen, QoS qos,
00205                                                     pApplicationHandler_t pApplicationHandler,
00206                                                     void *pApplicationHandlerData) {
00207     uint16_t txPacketId, rxPacketId;
00208     uint32_t serializedLen, indexOfFreeMessageHandler, count;
00209     IoT_Error_t rc;
00210     TimerAWS timer;
00211     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00212 
00213     FUNC_ENTRY;
00214     init_timer(&timer);
00215     countdown_ms(&timer, pClient->clientData.commandTimeoutMs);
00216 
00217     serializedLen = 0;
00218     count = 0;
00219     txPacketId = aws_iot_mqtt_get_next_packet_id(pClient);
00220     rxPacketId = 0;
00221 
00222     rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0,
00223                                            txPacketId, 1, &pTopicName, &topicNameLen, &qos, &serializedLen);
00224     if(IOT_SUCCESS != rc) {
00225         FUNC_EXIT_RC(rc);
00226     }
00227 
00228     indexOfFreeMessageHandler = _aws_iot_mqtt_get_free_message_handler_index(pClient);
00229     if(AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS <= indexOfFreeMessageHandler) {
00230         FUNC_EXIT_RC(MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR);
00231     }
00232 
00233     /* send the subscribe packet */
00234     rc = aws_iot_mqtt_internal_send_packet(pClient, serializedLen, &timer);
00235     if(IOT_SUCCESS != rc) {
00236         FUNC_EXIT_RC(rc);
00237     }
00238 
00239     /* wait for suback */
00240     rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer);
00241     if(IOT_SUCCESS != rc) {
00242         FUNC_EXIT_RC(rc);
00243     }
00244 
00245     /* Granted QoS can be 0, 1 or 2 */
00246     rc = _aws_iot_mqtt_deserialize_suback(&rxPacketId, 1, &count, grantedQoS, pClient->clientData.readBuf,
00247                                           pClient->clientData.readBufSize);
00248     if(IOT_SUCCESS != rc) {
00249         FUNC_EXIT_RC(rc);
00250     }
00251 
00252     /* TODO : Figure out how to test this before activating this check */
00253     //if(txPacketId != rxPacketId) {
00254     /* Different SUBACK received than expected. Return error
00255      * This can cause issues if the request timeout value is too small */
00256     //  return RX_MESSAGE_INVALID_ERROR;
00257     //}
00258 
00259     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicName =
00260             pTopicName;
00261     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicNameLen =
00262             topicNameLen;
00263     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandler =
00264             pApplicationHandler;
00265     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandlerData =
00266             pApplicationHandlerData;
00267     pClient->clientData.messageHandlers[indexOfFreeMessageHandler].qos = qos;
00268 
00269     FUNC_EXIT_RC(IOT_SUCCESS);
00270 }
00271 
00272 /**
00273  * @brief Subscribe to an MQTT topic.
00274  *
00275  * Called to send a subscribe message to the broker requesting a subscription
00276  * to an MQTT topic. This is the outer function which does the validations and
00277  * calls the internal subscribe above to perform the actual operation.
00278  * It is also responsible for client state changes
00279  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00280  *
00281  * @param pClient Reference to the IoT Client
00282  * @param pTopicName Topic Name to publish to
00283  * @param topicNameLen Length of the topic name
00284  * @param pApplicationHandler_t Reference to the handler function for this subscription
00285  *
00286  * @return An IoT Error Type defining successful/failed subscription
00287  */
00288 IoT_Error_t aws_iot_mqtt_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen,
00289                                    QoS qos, pApplicationHandler_t pApplicationHandler, void *pApplicationHandlerData) {
00290     ClientState clientState;
00291     IoT_Error_t rc, subRc;
00292 
00293     FUNC_ENTRY;
00294 
00295     if(NULL == pClient || NULL == pTopicName || NULL == pApplicationHandler) {
00296         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00297     }
00298 
00299     if(!aws_iot_mqtt_is_client_connected(pClient)) {
00300         FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR);
00301     }
00302 
00303     clientState = aws_iot_mqtt_get_client_state(pClient);
00304     if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) {
00305         FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR);
00306     }
00307 
00308     rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS);
00309     if(IOT_SUCCESS != rc) {
00310         FUNC_EXIT_RC(rc);
00311     }
00312 
00313     subRc = _aws_iot_mqtt_internal_subscribe(pClient, pTopicName, topicNameLen, qos,
00314                                              pApplicationHandler, pApplicationHandlerData);
00315 
00316     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS, clientState);
00317     if(IOT_SUCCESS == subRc && IOT_SUCCESS != rc) {
00318         subRc = rc;
00319     }
00320 
00321     FUNC_EXIT_RC(subRc);
00322 }
00323 
00324 /**
00325  * @brief Subscribe to an MQTT topic.
00326  *
00327  * Called to send a subscribe message to the broker requesting a subscription
00328  * to an MQTT topic.
00329  * This is the internal function which is called by the resubscribe API to perform the operation.
00330  * Not meant to be called directly as it doesn't do validations or client state changes
00331  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00332  *
00333  * @param pClient Reference to the IoT Client
00334  *
00335  * @return An IoT Error Type defining successful/failed subscription
00336  */
00337 static IoT_Error_t _aws_iot_mqtt_internal_resubscribe(AWS_IoT_Client *pClient) {
00338     uint16_t packetId;
00339     uint32_t len, count, existingSubCount, itr;
00340     IoT_Error_t rc;
00341     TimerAWS timer;
00342     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00343 
00344     FUNC_ENTRY;
00345 
00346     packetId = 0;
00347     len = 0;
00348     count = 0;
00349     existingSubCount = _aws_iot_mqtt_get_free_message_handler_index(pClient);
00350 
00351     for(itr = 0; itr < existingSubCount; itr++) {
00352         init_timer(&timer);
00353         countdown_ms(&timer, pClient->clientData.commandTimeoutMs);
00354 
00355         rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0,
00356                                                aws_iot_mqtt_get_next_packet_id(pClient), 1,
00357                                                &(pClient->clientData.messageHandlers[itr].topicName),
00358                                                &(pClient->clientData.messageHandlers[itr].topicNameLen),
00359                                                &(pClient->clientData.messageHandlers[itr].qos), &len);
00360         if(IOT_SUCCESS != rc) {
00361             FUNC_EXIT_RC(rc);
00362         }
00363 
00364         /* send the subscribe packet */
00365         rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer);
00366         if(IOT_SUCCESS != rc) {
00367             FUNC_EXIT_RC(rc);
00368         }
00369 
00370         /* wait for suback */
00371         rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer);
00372         if(IOT_SUCCESS != rc) {
00373             FUNC_EXIT_RC(rc);
00374         }
00375 
00376         /* Granted QoS can be 0, 1 or 2 */
00377         rc = _aws_iot_mqtt_deserialize_suback(&packetId, 1, &count, grantedQoS, pClient->clientData.readBuf,
00378                                               pClient->clientData.readBufSize);
00379         if(IOT_SUCCESS != rc) {
00380             FUNC_EXIT_RC(rc);
00381         }
00382     }
00383 
00384     FUNC_EXIT_RC(IOT_SUCCESS);
00385 }
00386 
00387 /**
00388  * @brief Subscribe to an MQTT topic.
00389  *
00390  * Called to send a subscribe message to the broker requesting a subscription
00391  * to an MQTT topic.
00392  * This is the outer function which does the validations and calls the internal resubscribe above
00393  * to perform the actual operation. It is also responsible for client state changes
00394  * @note Call is blocking.  The call returns after the receipt of the SUBACK control packet.
00395  *
00396  * @param pClient Reference to the IoT Client
00397  *
00398  * @return An IoT Error Type defining successful/failed subscription
00399  */
00400 IoT_Error_t aws_iot_mqtt_resubscribe(AWS_IoT_Client *pClient) {
00401     IoT_Error_t rc, resubRc;
00402 
00403     FUNC_ENTRY;
00404 
00405     if(NULL == pClient) {
00406         FUNC_EXIT_RC(NULL_VALUE_ERROR);
00407     }
00408 
00409     if(false == aws_iot_mqtt_is_client_connected(pClient)) {
00410         FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR);
00411     }
00412 
00413     if(CLIENT_STATE_CONNECTED_IDLE != aws_iot_mqtt_get_client_state(pClient)) {
00414         FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR);
00415     }
00416 
00417     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_IDLE,
00418                                        CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS);
00419     if(IOT_SUCCESS != rc) {
00420         FUNC_EXIT_RC(rc);
00421     }
00422 
00423     resubRc = _aws_iot_mqtt_internal_resubscribe(pClient);
00424 
00425     rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS,
00426                                        CLIENT_STATE_CONNECTED_IDLE);
00427     if(IOT_SUCCESS == resubRc && IOT_SUCCESS != rc) {
00428         resubRc = rc;
00429     }
00430 
00431     FUNC_EXIT_RC(resubRc);
00432 }
00433 
00434 #ifdef __cplusplus
00435 }
00436 #endif
00437