Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of AWS-test by
aws_iot_mqtt_client_subscribe.cpp
00001 /* 00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 // Based on Eclipse Paho. 00017 /******************************************************************************* 00018 * Copyright (c) 2014 IBM Corp. 00019 * 00020 * All rights reserved. This program and the accompanying materials 00021 * are made available under the terms of the Eclipse Public License v1.0 00022 * and Eclipse Distribution License v1.0 which accompany this distribution. 00023 * 00024 * The Eclipse Public License is available at 00025 * http://www.eclipse.org/legal/epl-v10.html 00026 * and the Eclipse Distribution License is available at 00027 * http://www.eclipse.org/org/documents/edl-v10.php. 00028 * 00029 * Contributors: 00030 * Ian Craggs - initial API and implementation and/or initial documentation 00031 *******************************************************************************/ 00032 00033 /** 00034 * @file aws_iot_mqtt_client_subscribe.c 00035 * @brief MQTT client subscribe API definitions 00036 */ 00037 00038 #ifdef __cplusplus 00039 extern "C" { 00040 #endif 00041 00042 #include "aws_iot_mqtt_client_common_internal.h" 00043 00044 /** 00045 * Serializes the supplied subscribe data into the supplied buffer, ready for sending 00046 * @param pTxBuf the buffer into which the packet will be serialized 00047 * @param txBufLen the length in bytes of the supplied buffer 00048 * @param dup unsigned char - the MQTT dup flag 00049 * @param packetId uint16_t - the MQTT packet identifier 00050 * @param topicCount - number of members in the topicFilters and reqQos arrays 00051 * @param pTopicNameList - array of topic filter names 00052 * @param pTopicNameLenList - array of length of topic filter names 00053 * @param pRequestedQoSs - array of requested QoS 00054 * @param pSerializedLen - the length of the serialized data 00055 * 00056 * @return An IoT Error Type defining successful/failed operation 00057 */ 00058 static IoT_Error_t _aws_iot_mqtt_serialize_subscribe(unsigned char *pTxBuf, size_t txBufLen, 00059 unsigned char dup, uint16_t packetId, uint32_t topicCount, 00060 const char **pTopicNameList, uint16_t *pTopicNameLenList, 00061 QoS *pRequestedQoSs, uint32_t *pSerializedLen) { 00062 unsigned char *ptr; 00063 uint32_t itr, rem_len; 00064 IoT_Error_t rc; 00065 MQTTHeader header = {0}; 00066 00067 FUNC_ENTRY; 00068 if(NULL == pTxBuf || NULL == pSerializedLen) { 00069 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00070 } 00071 00072 ptr = pTxBuf; 00073 rem_len = 2; /* packetId */ 00074 00075 for(itr = 0; itr < topicCount; ++itr) { 00076 rem_len += (uint32_t) (pTopicNameLenList[itr] + 2 + 1); /* topic + length + req_qos */ 00077 } 00078 00079 if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) { 00080 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00081 } 00082 00083 rc = aws_iot_mqtt_internal_init_header(&header, SUBSCRIBE, QOS1, dup, 0); 00084 if(IOT_SUCCESS != rc) { 00085 FUNC_EXIT_RC(rc); 00086 } 00087 /* write header */ 00088 aws_iot_mqtt_internal_write_char(&ptr, header.byte); 00089 00090 /* write remaining length */ 00091 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len); 00092 00093 aws_iot_mqtt_internal_write_uint_16(&ptr, packetId); 00094 00095 for(itr = 0; itr < topicCount; ++itr) { 00096 aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicNameList[itr], pTopicNameLenList[itr]); 00097 aws_iot_mqtt_internal_write_char(&ptr, (unsigned char) pRequestedQoSs[itr]); 00098 } 00099 00100 *pSerializedLen = (uint32_t) (ptr - pTxBuf); 00101 00102 FUNC_EXIT_RC(IOT_SUCCESS); 00103 } 00104 00105 /** 00106 * Deserializes the supplied (wire) buffer into suback data 00107 * @param pPacketId returned integer - the MQTT packet identifier 00108 * @param maxExpectedQoSCount - the maximum number of members allowed in the grantedQoSs array 00109 * @param pGrantedQoSCount returned uint32_t - number of members in the grantedQoSs array 00110 * @param pGrantedQoSs returned array of QoS type - the granted qualities of service 00111 * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field 00112 * @param rxBufLen the length in bytes of the data in the supplied buffer 00113 * 00114 * @return An IoT Error Type defining successful/failed operation 00115 */ 00116 static IoT_Error_t _aws_iot_mqtt_deserialize_suback(uint16_t *pPacketId, uint32_t maxExpectedQoSCount, 00117 uint32_t *pGrantedQoSCount, QoS *pGrantedQoSs, 00118 unsigned char *pRxBuf, size_t rxBufLen) { 00119 unsigned char *curData, *endData; 00120 uint32_t decodedLen, readBytesLen; 00121 IoT_Error_t decodeRc; 00122 MQTTHeader header = {0}; 00123 00124 FUNC_ENTRY; 00125 if(NULL == pPacketId || NULL == pGrantedQoSCount || NULL == pGrantedQoSs) { 00126 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00127 } 00128 00129 curData = pRxBuf; 00130 endData = NULL; 00131 decodeRc = IOT_FAILURE; 00132 decodedLen = 0; 00133 readBytesLen = 0; 00134 00135 /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload 00136 * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9 00137 */ 00138 if(5 > rxBufLen) { 00139 FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR); 00140 } 00141 00142 header.byte = aws_iot_mqtt_internal_read_char(&curData); 00143 if(SUBACK != header.bits.type) { 00144 FUNC_EXIT_RC(IOT_FAILURE); 00145 } 00146 00147 /* read remaining length */ 00148 decodeRc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen); 00149 if(IOT_SUCCESS != decodeRc) { 00150 FUNC_EXIT_RC(decodeRc); 00151 } 00152 00153 curData += (readBytesLen); 00154 endData = curData + decodedLen; 00155 if(endData - curData < 2) { 00156 FUNC_EXIT_RC(IOT_FAILURE); 00157 } 00158 00159 *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData); 00160 00161 *pGrantedQoSCount = 0; 00162 while(curData < endData) { 00163 if(*pGrantedQoSCount > maxExpectedQoSCount) { 00164 FUNC_EXIT_RC(IOT_FAILURE); 00165 } 00166 pGrantedQoSs[(*pGrantedQoSCount)++] = (QoS) aws_iot_mqtt_internal_read_char(&curData); 00167 } 00168 00169 FUNC_EXIT_RC(IOT_SUCCESS); 00170 } 00171 00172 /* Returns MAX_MESSAGE_HANDLERS value if no free index is available */ 00173 static uint32_t _aws_iot_mqtt_get_free_message_handler_index(AWS_IoT_Client *pClient) { 00174 uint32_t itr; 00175 00176 FUNC_ENTRY; 00177 00178 for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; itr++) { 00179 if(pClient->clientData.messageHandlers[itr].topicName == NULL) { 00180 break; 00181 } 00182 } 00183 00184 FUNC_EXIT_RC(itr); 00185 } 00186 00187 /** 00188 * @brief Subscribe to an MQTT topic. 00189 * 00190 * Called to send a subscribe message to the broker requesting a subscription 00191 * to an MQTT topic. This is the internal function which is called by the 00192 * subscribe API to perform the operation. Not meant to be called directly as 00193 * it doesn't do validations or client state changes 00194 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00195 * 00196 * @param pClient Reference to the IoT Client 00197 * @param pTopicName Topic Name to publish to 00198 * @param topicNameLen Length of the topic name 00199 * @param pApplicationHandler_t Reference to the handler function for this subscription 00200 * 00201 * @return An IoT Error Type defining successful/failed subscription 00202 */ 00203 static IoT_Error_t _aws_iot_mqtt_internal_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, 00204 uint16_t topicNameLen, QoS qos, 00205 pApplicationHandler_t pApplicationHandler, 00206 void *pApplicationHandlerData) { 00207 uint16_t txPacketId, rxPacketId; 00208 uint32_t serializedLen, indexOfFreeMessageHandler, count; 00209 IoT_Error_t rc; 00210 TimerAWS timer; 00211 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00212 00213 FUNC_ENTRY; 00214 init_timer(&timer); 00215 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00216 00217 serializedLen = 0; 00218 count = 0; 00219 txPacketId = aws_iot_mqtt_get_next_packet_id(pClient); 00220 rxPacketId = 0; 00221 00222 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00223 txPacketId, 1, &pTopicName, &topicNameLen, &qos, &serializedLen); 00224 if(IOT_SUCCESS != rc) { 00225 FUNC_EXIT_RC(rc); 00226 } 00227 00228 indexOfFreeMessageHandler = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00229 if(AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS <= indexOfFreeMessageHandler) { 00230 FUNC_EXIT_RC(MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR); 00231 } 00232 00233 /* send the subscribe packet */ 00234 rc = aws_iot_mqtt_internal_send_packet(pClient, serializedLen, &timer); 00235 if(IOT_SUCCESS != rc) { 00236 FUNC_EXIT_RC(rc); 00237 } 00238 00239 /* wait for suback */ 00240 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00241 if(IOT_SUCCESS != rc) { 00242 FUNC_EXIT_RC(rc); 00243 } 00244 00245 /* Granted QoS can be 0, 1 or 2 */ 00246 rc = _aws_iot_mqtt_deserialize_suback(&rxPacketId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00247 pClient->clientData.readBufSize); 00248 if(IOT_SUCCESS != rc) { 00249 FUNC_EXIT_RC(rc); 00250 } 00251 00252 /* TODO : Figure out how to test this before activating this check */ 00253 //if(txPacketId != rxPacketId) { 00254 /* Different SUBACK received than expected. Return error 00255 * This can cause issues if the request timeout value is too small */ 00256 // return RX_MESSAGE_INVALID_ERROR; 00257 //} 00258 00259 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicName = 00260 pTopicName; 00261 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicNameLen = 00262 topicNameLen; 00263 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandler = 00264 pApplicationHandler; 00265 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandlerData = 00266 pApplicationHandlerData; 00267 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].qos = qos; 00268 00269 FUNC_EXIT_RC(IOT_SUCCESS); 00270 } 00271 00272 /** 00273 * @brief Subscribe to an MQTT topic. 00274 * 00275 * Called to send a subscribe message to the broker requesting a subscription 00276 * to an MQTT topic. This is the outer function which does the validations and 00277 * calls the internal subscribe above to perform the actual operation. 00278 * It is also responsible for client state changes 00279 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00280 * 00281 * @param pClient Reference to the IoT Client 00282 * @param pTopicName Topic Name to publish to 00283 * @param topicNameLen Length of the topic name 00284 * @param pApplicationHandler_t Reference to the handler function for this subscription 00285 * 00286 * @return An IoT Error Type defining successful/failed subscription 00287 */ 00288 IoT_Error_t aws_iot_mqtt_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen, 00289 QoS qos, pApplicationHandler_t pApplicationHandler, void *pApplicationHandlerData) { 00290 ClientState clientState; 00291 IoT_Error_t rc, subRc; 00292 00293 FUNC_ENTRY; 00294 00295 if(NULL == pClient || NULL == pTopicName || NULL == pApplicationHandler) { 00296 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00297 } 00298 00299 if(!aws_iot_mqtt_is_client_connected(pClient)) { 00300 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00301 } 00302 00303 clientState = aws_iot_mqtt_get_client_state(pClient); 00304 if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) { 00305 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00306 } 00307 00308 rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS); 00309 if(IOT_SUCCESS != rc) { 00310 FUNC_EXIT_RC(rc); 00311 } 00312 00313 subRc = _aws_iot_mqtt_internal_subscribe(pClient, pTopicName, topicNameLen, qos, 00314 pApplicationHandler, pApplicationHandlerData); 00315 00316 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS, clientState); 00317 if(IOT_SUCCESS == subRc && IOT_SUCCESS != rc) { 00318 subRc = rc; 00319 } 00320 00321 FUNC_EXIT_RC(subRc); 00322 } 00323 00324 /** 00325 * @brief Subscribe to an MQTT topic. 00326 * 00327 * Called to send a subscribe message to the broker requesting a subscription 00328 * to an MQTT topic. 00329 * This is the internal function which is called by the resubscribe API to perform the operation. 00330 * Not meant to be called directly as it doesn't do validations or client state changes 00331 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00332 * 00333 * @param pClient Reference to the IoT Client 00334 * 00335 * @return An IoT Error Type defining successful/failed subscription 00336 */ 00337 static IoT_Error_t _aws_iot_mqtt_internal_resubscribe(AWS_IoT_Client *pClient) { 00338 uint16_t packetId; 00339 uint32_t len, count, existingSubCount, itr; 00340 IoT_Error_t rc; 00341 TimerAWS timer; 00342 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00343 00344 FUNC_ENTRY; 00345 00346 packetId = 0; 00347 len = 0; 00348 count = 0; 00349 existingSubCount = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00350 00351 for(itr = 0; itr < existingSubCount; itr++) { 00352 init_timer(&timer); 00353 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00354 00355 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00356 aws_iot_mqtt_get_next_packet_id(pClient), 1, 00357 &(pClient->clientData.messageHandlers[itr].topicName), 00358 &(pClient->clientData.messageHandlers[itr].topicNameLen), 00359 &(pClient->clientData.messageHandlers[itr].qos), &len); 00360 if(IOT_SUCCESS != rc) { 00361 FUNC_EXIT_RC(rc); 00362 } 00363 00364 /* send the subscribe packet */ 00365 rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer); 00366 if(IOT_SUCCESS != rc) { 00367 FUNC_EXIT_RC(rc); 00368 } 00369 00370 /* wait for suback */ 00371 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00372 if(IOT_SUCCESS != rc) { 00373 FUNC_EXIT_RC(rc); 00374 } 00375 00376 /* Granted QoS can be 0, 1 or 2 */ 00377 rc = _aws_iot_mqtt_deserialize_suback(&packetId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00378 pClient->clientData.readBufSize); 00379 if(IOT_SUCCESS != rc) { 00380 FUNC_EXIT_RC(rc); 00381 } 00382 } 00383 00384 FUNC_EXIT_RC(IOT_SUCCESS); 00385 } 00386 00387 /** 00388 * @brief Subscribe to an MQTT topic. 00389 * 00390 * Called to send a subscribe message to the broker requesting a subscription 00391 * to an MQTT topic. 00392 * This is the outer function which does the validations and calls the internal resubscribe above 00393 * to perform the actual operation. It is also responsible for client state changes 00394 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00395 * 00396 * @param pClient Reference to the IoT Client 00397 * 00398 * @return An IoT Error Type defining successful/failed subscription 00399 */ 00400 IoT_Error_t aws_iot_mqtt_resubscribe(AWS_IoT_Client *pClient) { 00401 IoT_Error_t rc, resubRc; 00402 00403 FUNC_ENTRY; 00404 00405 if(NULL == pClient) { 00406 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00407 } 00408 00409 if(false == aws_iot_mqtt_is_client_connected(pClient)) { 00410 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00411 } 00412 00413 if(CLIENT_STATE_CONNECTED_IDLE != aws_iot_mqtt_get_client_state(pClient)) { 00414 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00415 } 00416 00417 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_IDLE, 00418 CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS); 00419 if(IOT_SUCCESS != rc) { 00420 FUNC_EXIT_RC(rc); 00421 } 00422 00423 resubRc = _aws_iot_mqtt_internal_resubscribe(pClient); 00424 00425 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS, 00426 CLIENT_STATE_CONNECTED_IDLE); 00427 if(IOT_SUCCESS == resubRc && IOT_SUCCESS != rc) { 00428 resubRc = rc; 00429 } 00430 00431 FUNC_EXIT_RC(resubRc); 00432 } 00433 00434 #ifdef __cplusplus 00435 } 00436 #endif 00437
Generated on Tue Jul 12 2022 11:16:37 by
1.7.2
