Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
aws_iot_mqtt_client_subscribe.c
00001 /* 00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 // Based on Eclipse Paho. 00017 /******************************************************************************* 00018 * Copyright (c) 2014 IBM Corp. 00019 * 00020 * All rights reserved. This program and the accompanying materials 00021 * are made available under the terms of the Eclipse Public License v1.0 00022 * and Eclipse Distribution License v1.0 which accompany this distribution. 00023 * 00024 * The Eclipse Public License is available at 00025 * http://www.eclipse.org/legal/epl-v10.html 00026 * and the Eclipse Distribution License is available at 00027 * http://www.eclipse.org/org/documents/edl-v10.php. 00028 * 00029 * Contributors: 00030 * Ian Craggs - initial API and implementation and/or initial documentation 00031 *******************************************************************************/ 00032 00033 /** 00034 * @file aws_iot_mqtt_client_subscribe.c 00035 * @brief MQTT client subscribe API definitions 00036 */ 00037 00038 #ifdef __cplusplus 00039 extern "C" { 00040 #endif 00041 00042 #include "aws_iot_mqtt_client_common_internal.h" 00043 00044 /** 00045 * Serializes the supplied subscribe data into the supplied buffer, ready for sending 00046 * @param pTxBuf the buffer into which the packet will be serialized 00047 * @param txBufLen the length in bytes of the supplied buffer 00048 * @param dup unsigned char - the MQTT dup flag 00049 * @param packetId uint16_t - the MQTT packet identifier 00050 * @param topicCount - number of members in the topicFilters and reqQos arrays 00051 * @param pTopicNameList - array of topic filter names 00052 * @param pTopicNameLenList - array of length of topic filter names 00053 * @param pRequestedQoSs - array of requested QoS 00054 * @param pSerializedLen - the length of the serialized data 00055 * 00056 * @return An IoT Error Type defining successful/failed operation 00057 */ 00058 static IoT_Error_t _aws_iot_mqtt_serialize_subscribe(unsigned char *pTxBuf, size_t txBufLen, 00059 unsigned char dup, uint16_t packetId, uint32_t topicCount, 00060 const char **pTopicNameList, uint16_t *pTopicNameLenList, 00061 QoS *pRequestedQoSs, uint32_t *pSerializedLen) { 00062 unsigned char *ptr; 00063 uint32_t itr, rem_len; 00064 IoT_Error_t rc; 00065 MQTTHeader header = {0}; 00066 00067 FUNC_ENTRY; 00068 if(NULL == pTxBuf || NULL == pSerializedLen) { 00069 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00070 } 00071 00072 ptr = pTxBuf; 00073 rem_len = 2; /* packetId */ 00074 00075 for(itr = 0; itr < topicCount; ++itr) { 00076 rem_len += (uint32_t) (pTopicNameLenList[itr] + 2 + 1); /* topic + length + req_qos */ 00077 } 00078 00079 if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) { 00080 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00081 } 00082 00083 rc = aws_iot_mqtt_internal_init_header(&header, SUBSCRIBE, QOS1, dup, 0); 00084 if(AWS_SUCCESS != rc) { 00085 FUNC_EXIT_RC(rc); 00086 } 00087 /* write header */ 00088 aws_iot_mqtt_internal_write_char(&ptr, header.byte); 00089 00090 /* write remaining length */ 00091 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len); 00092 00093 aws_iot_mqtt_internal_write_uint_16(&ptr, packetId); 00094 00095 for(itr = 0; itr < topicCount; ++itr) { 00096 aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicNameList[itr], pTopicNameLenList[itr]); 00097 aws_iot_mqtt_internal_write_char(&ptr, (unsigned char) pRequestedQoSs[itr]); 00098 } 00099 00100 *pSerializedLen = (uint32_t) (ptr - pTxBuf); 00101 00102 FUNC_EXIT_RC(AWS_SUCCESS); 00103 } 00104 00105 /** 00106 * Deserializes the supplied (wire) buffer into suback data 00107 * @param pPacketId returned integer - the MQTT packet identifier 00108 * @param maxExpectedQoSCount - the maximum number of members allowed in the grantedQoSs array 00109 * @param pGrantedQoSCount returned uint32_t - number of members in the grantedQoSs array 00110 * @param pGrantedQoSs returned array of QoS type - the granted qualities of service 00111 * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field 00112 * @param rxBufLen the length in bytes of the data in the supplied buffer 00113 * 00114 * @return An IoT Error Type defining successful/failed operation 00115 */ 00116 static IoT_Error_t _aws_iot_mqtt_deserialize_suback(uint16_t *pPacketId, uint32_t maxExpectedQoSCount, 00117 uint32_t *pGrantedQoSCount, QoS *pGrantedQoSs, 00118 unsigned char *pRxBuf, size_t rxBufLen) { 00119 unsigned char *curData, *endData; 00120 uint32_t decodedLen, readBytesLen; 00121 IoT_Error_t decodeRc; 00122 MQTTHeader header = {0}; 00123 00124 FUNC_ENTRY; 00125 if(NULL == pPacketId || NULL == pGrantedQoSCount || NULL == pGrantedQoSs) { 00126 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00127 } 00128 00129 curData = pRxBuf; 00130 endData = NULL; 00131 decodeRc = FAILURE; 00132 decodedLen = 0; 00133 readBytesLen = 0; 00134 00135 /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload 00136 * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9 00137 */ 00138 if(5 > rxBufLen) { 00139 FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR); 00140 } 00141 00142 header.byte = aws_iot_mqtt_internal_read_char(&curData); 00143 if(SUBACK != MQTT_HEADER_FIELD_TYPE(header.byte)) { 00144 FUNC_EXIT_RC(FAILURE); 00145 } 00146 00147 /* read remaining length */ 00148 decodeRc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen); 00149 if(AWS_SUCCESS != decodeRc) { 00150 FUNC_EXIT_RC(decodeRc); 00151 } 00152 00153 curData += (readBytesLen); 00154 endData = curData + decodedLen; 00155 if(endData - curData < 2) { 00156 FUNC_EXIT_RC(FAILURE); 00157 } 00158 00159 *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData); 00160 00161 *pGrantedQoSCount = 0; 00162 while(curData < endData) { 00163 if(*pGrantedQoSCount > maxExpectedQoSCount) { 00164 FUNC_EXIT_RC(FAILURE); 00165 } 00166 pGrantedQoSs[(*pGrantedQoSCount)++] = (QoS) aws_iot_mqtt_internal_read_char(&curData); 00167 } 00168 00169 FUNC_EXIT_RC(AWS_SUCCESS); 00170 } 00171 00172 /* Returns MAX_MESSAGE_HANDLERS value if no free index is available */ 00173 static uint32_t _aws_iot_mqtt_get_free_message_handler_index(AWS_IoT_Client *pClient) { 00174 uint32_t itr; 00175 00176 FUNC_ENTRY; 00177 00178 for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; itr++) { 00179 if(pClient->clientData.messageHandlers[itr].topicName == NULL) { 00180 break; 00181 } 00182 } 00183 00184 FUNC_EXIT_RC((int)itr); 00185 } 00186 00187 /** 00188 * @brief Subscribe to an MQTT topic. 00189 * 00190 * Called to send a subscribe message to the broker requesting a subscription 00191 * to an MQTT topic. This is the internal function which is called by the 00192 * subscribe API to perform the operation. Not meant to be called directly as 00193 * it doesn't do validations or client state changes 00194 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00195 * @warning pTopicName and pApplicationHandlerData need to be static in memory. 00196 * 00197 * @param pClient Reference to the IoT Client 00198 * @param pTopicName Topic Name to publish to. pTopicName needs to be static in memory since 00199 * no malloc are performed by the SDK 00200 * @param topicNameLen Length of the topic name 00201 * @param pApplicationHandler_t Reference to the handler function for this subscription 00202 * @param pApplicationHandlerData Point to data passed to the callback. 00203 * pApplicationHandlerData also needs to be static in memory since no malloc are performed by the SDK 00204 * 00205 * @return An IoT Error Type defining successful/failed subscription 00206 */ 00207 static IoT_Error_t _aws_iot_mqtt_internal_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, 00208 uint16_t topicNameLen, QoS qos, 00209 pApplicationHandler_t pApplicationHandler, 00210 void *pApplicationHandlerData) { 00211 uint16_t txPacketId, rxPacketId; 00212 uint32_t serializedLen, indexOfFreeMessageHandler, count; 00213 IoT_Error_t rc; 00214 awsTimer timer; 00215 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00216 00217 FUNC_ENTRY; 00218 init_timer(&timer); 00219 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00220 00221 serializedLen = 0; 00222 count = 0; 00223 txPacketId = aws_iot_mqtt_get_next_packet_id(pClient); 00224 rxPacketId = 0; 00225 00226 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00227 txPacketId, 1, &pTopicName, &topicNameLen, &qos, &serializedLen); 00228 if(AWS_SUCCESS != rc) { 00229 FUNC_EXIT_RC(rc); 00230 } 00231 00232 indexOfFreeMessageHandler = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00233 if(AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS <= indexOfFreeMessageHandler) { 00234 FUNC_EXIT_RC(MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR); 00235 } 00236 00237 /* send the subscribe packet */ 00238 rc = aws_iot_mqtt_internal_send_packet(pClient, serializedLen, &timer); 00239 if(AWS_SUCCESS != rc) { 00240 FUNC_EXIT_RC(rc); 00241 } 00242 00243 /* wait for suback */ 00244 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00245 if(AWS_SUCCESS != rc) { 00246 FUNC_EXIT_RC(rc); 00247 } 00248 00249 /* Granted QoS can be 0, 1 or 2 */ 00250 rc = _aws_iot_mqtt_deserialize_suback(&rxPacketId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00251 pClient->clientData.readBufSize); 00252 if(AWS_SUCCESS != rc) { 00253 FUNC_EXIT_RC(rc); 00254 } 00255 00256 /* TODO : Figure out how to test this before activating this check */ 00257 //if(txPacketId != rxPacketId) { 00258 /* Different SUBACK received than expected. Return error 00259 * This can cause issues if the request timeout value is too small */ 00260 // return RX_MESSAGE_INVALID_ERROR; 00261 //} 00262 00263 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicName = 00264 pTopicName; 00265 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicNameLen = 00266 topicNameLen; 00267 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandler = 00268 pApplicationHandler; 00269 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandlerData = 00270 pApplicationHandlerData; 00271 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].qos = qos; 00272 00273 FUNC_EXIT_RC(AWS_SUCCESS); 00274 } 00275 00276 /** 00277 * @brief Subscribe to an MQTT topic. 00278 * 00279 * Called to send a subscribe message to the broker requesting a subscription 00280 * to an MQTT topic. This is the outer function which does the validations and 00281 * calls the internal subscribe above to perform the actual operation. 00282 * It is also responsible for client state changes 00283 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00284 * @warning pTopicName and pApplicationHandlerData need to be static in memory. 00285 * 00286 * @param pClient Reference to the IoT Client 00287 * @param pTopicName Topic Name to publish to. pTopicName needs to be static in memory since 00288 * no malloc are performed by the SDK 00289 * @param topicNameLen Length of the topic name 00290 * @param pApplicationHandler_t Reference to the handler function for this subscription 00291 * @param pApplicationHandlerData Point to data passed to the callback. 00292 * pApplicationHandlerData also needs to be static in memory since no malloc are performed by the SDK 00293 * 00294 * @return An IoT Error Type defining successful/failed subscription 00295 */ 00296 IoT_Error_t aws_iot_mqtt_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen, 00297 QoS qos, pApplicationHandler_t pApplicationHandler, void *pApplicationHandlerData) { 00298 ClientState clientState; 00299 IoT_Error_t rc, subRc; 00300 00301 FUNC_ENTRY; 00302 00303 if(NULL == pClient || NULL == pTopicName || NULL == pApplicationHandler) { 00304 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00305 } 00306 00307 if(!aws_iot_mqtt_is_client_connected(pClient)) { 00308 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00309 } 00310 00311 clientState = aws_iot_mqtt_get_client_state(pClient); 00312 if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) { 00313 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00314 } 00315 00316 rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS); 00317 if(AWS_SUCCESS != rc) { 00318 FUNC_EXIT_RC(rc); 00319 } 00320 00321 subRc = _aws_iot_mqtt_internal_subscribe(pClient, pTopicName, topicNameLen, qos, 00322 pApplicationHandler, pApplicationHandlerData); 00323 00324 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS, clientState); 00325 if(AWS_SUCCESS == subRc && AWS_SUCCESS != rc) { 00326 subRc = rc; 00327 } 00328 00329 FUNC_EXIT_RC(subRc); 00330 } 00331 00332 /** 00333 * @brief Subscribe to an MQTT topic. 00334 * 00335 * Called to send a subscribe message to the broker requesting a subscription 00336 * to an MQTT topic. 00337 * This is the internal function which is called by the resubscribe API to perform the operation. 00338 * Not meant to be called directly as it doesn't do validations or client state changes 00339 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00340 * 00341 * @param pClient Reference to the IoT Client 00342 * 00343 * @return An IoT Error Type defining successful/failed subscription 00344 */ 00345 static IoT_Error_t _aws_iot_mqtt_internal_resubscribe(AWS_IoT_Client *pClient) { 00346 uint16_t packetId; 00347 uint32_t len, count, existingSubCount, itr; 00348 IoT_Error_t rc; 00349 awsTimer timer; 00350 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00351 00352 FUNC_ENTRY; 00353 00354 packetId = 0; 00355 len = 0; 00356 count = 0; 00357 existingSubCount = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00358 00359 for(itr = 0; itr < existingSubCount; itr++) { 00360 if(pClient->clientData.messageHandlers[itr].topicName == NULL) { 00361 continue; 00362 } 00363 00364 init_timer(&timer); 00365 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00366 00367 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00368 aws_iot_mqtt_get_next_packet_id(pClient), 1, 00369 &(pClient->clientData.messageHandlers[itr].topicName), 00370 &(pClient->clientData.messageHandlers[itr].topicNameLen), 00371 &(pClient->clientData.messageHandlers[itr].qos), &len); 00372 if(AWS_SUCCESS != rc) { 00373 FUNC_EXIT_RC(rc); 00374 } 00375 00376 /* send the subscribe packet */ 00377 rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer); 00378 if(AWS_SUCCESS != rc) { 00379 FUNC_EXIT_RC(rc); 00380 } 00381 00382 /* wait for suback */ 00383 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00384 if(AWS_SUCCESS != rc) { 00385 FUNC_EXIT_RC(rc); 00386 } 00387 00388 /* Granted QoS can be 0, 1 or 2 */ 00389 rc = _aws_iot_mqtt_deserialize_suback(&packetId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00390 pClient->clientData.readBufSize); 00391 if(AWS_SUCCESS != rc) { 00392 FUNC_EXIT_RC(rc); 00393 } 00394 } 00395 00396 FUNC_EXIT_RC(AWS_SUCCESS); 00397 } 00398 00399 /** 00400 * @brief Subscribe to an MQTT topic. 00401 * 00402 * Called to send a subscribe message to the broker requesting a subscription 00403 * to an MQTT topic. 00404 * This is the outer function which does the validations and calls the internal resubscribe above 00405 * to perform the actual operation. It is also responsible for client state changes 00406 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00407 * 00408 * @param pClient Reference to the IoT Client 00409 * 00410 * @return An IoT Error Type defining successful/failed subscription 00411 */ 00412 IoT_Error_t aws_iot_mqtt_resubscribe(AWS_IoT_Client *pClient) { 00413 IoT_Error_t rc, resubRc; 00414 00415 FUNC_ENTRY; 00416 00417 if(NULL == pClient) { 00418 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00419 } 00420 00421 if(false == aws_iot_mqtt_is_client_connected(pClient)) { 00422 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00423 } 00424 00425 if(CLIENT_STATE_CONNECTED_IDLE != aws_iot_mqtt_get_client_state(pClient)) { 00426 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00427 } 00428 00429 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_IDLE, 00430 CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS); 00431 if(AWS_SUCCESS != rc) { 00432 FUNC_EXIT_RC(rc); 00433 } 00434 00435 resubRc = _aws_iot_mqtt_internal_resubscribe(pClient); 00436 00437 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS, 00438 CLIENT_STATE_CONNECTED_IDLE); 00439 if(AWS_SUCCESS == resubRc && AWS_SUCCESS != rc) { 00440 resubRc = rc; 00441 } 00442 00443 FUNC_EXIT_RC(resubRc); 00444 } 00445 00446 #ifdef __cplusplus 00447 } 00448 #endif 00449
Generated on Tue Jul 12 2022 19:02:38 by
1.7.2