Jeon byungchul
/
aws-iot-example
this is fork and i will modify for STM32
Fork of AWS-test by
Embed:
(wiki syntax)
Show/hide line numbers
aws_iot_mqtt_client_subscribe.cpp
00001 /* 00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 // Based on Eclipse Paho. 00017 /******************************************************************************* 00018 * Copyright (c) 2014 IBM Corp. 00019 * 00020 * All rights reserved. This program and the accompanying materials 00021 * are made available under the terms of the Eclipse Public License v1.0 00022 * and Eclipse Distribution License v1.0 which accompany this distribution. 00023 * 00024 * The Eclipse Public License is available at 00025 * http://www.eclipse.org/legal/epl-v10.html 00026 * and the Eclipse Distribution License is available at 00027 * http://www.eclipse.org/org/documents/edl-v10.php. 00028 * 00029 * Contributors: 00030 * Ian Craggs - initial API and implementation and/or initial documentation 00031 *******************************************************************************/ 00032 00033 /** 00034 * @file aws_iot_mqtt_client_subscribe.c 00035 * @brief MQTT client subscribe API definitions 00036 */ 00037 00038 #ifdef __cplusplus 00039 extern "C" { 00040 #endif 00041 00042 #include "aws_iot_mqtt_client_common_internal.h" 00043 00044 /** 00045 * Serializes the supplied subscribe data into the supplied buffer, ready for sending 00046 * @param pTxBuf the buffer into which the packet will be serialized 00047 * @param txBufLen the length in bytes of the supplied buffer 00048 * @param dup unsigned char - the MQTT dup flag 00049 * @param packetId uint16_t - the MQTT packet identifier 00050 * @param topicCount - number of members in the topicFilters and reqQos arrays 00051 * @param pTopicNameList - array of topic filter names 00052 * @param pTopicNameLenList - array of length of topic filter names 00053 * @param pRequestedQoSs - array of requested QoS 00054 * @param pSerializedLen - the length of the serialized data 00055 * 00056 * @return An IoT Error Type defining successful/failed operation 00057 */ 00058 static IoT_Error_t _aws_iot_mqtt_serialize_subscribe(unsigned char *pTxBuf, size_t txBufLen, 00059 unsigned char dup, uint16_t packetId, uint32_t topicCount, 00060 const char **pTopicNameList, uint16_t *pTopicNameLenList, 00061 QoS *pRequestedQoSs, uint32_t *pSerializedLen) { 00062 unsigned char *ptr; 00063 uint32_t itr, rem_len; 00064 IoT_Error_t rc; 00065 MQTTHeader header = {0}; 00066 00067 FUNC_ENTRY; 00068 if(NULL == pTxBuf || NULL == pSerializedLen) { 00069 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00070 } 00071 00072 ptr = pTxBuf; 00073 rem_len = 2; /* packetId */ 00074 00075 for(itr = 0; itr < topicCount; ++itr) { 00076 rem_len += (uint32_t) (pTopicNameLenList[itr] + 2 + 1); /* topic + length + req_qos */ 00077 } 00078 00079 if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) { 00080 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00081 } 00082 00083 rc = aws_iot_mqtt_internal_init_header(&header, SUBSCRIBE, QOS1, dup, 0); 00084 if(IOT_SUCCESS != rc) { 00085 FUNC_EXIT_RC(rc); 00086 } 00087 /* write header */ 00088 aws_iot_mqtt_internal_write_char(&ptr, header.byte); 00089 00090 /* write remaining length */ 00091 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len); 00092 00093 aws_iot_mqtt_internal_write_uint_16(&ptr, packetId); 00094 00095 for(itr = 0; itr < topicCount; ++itr) { 00096 aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicNameList[itr], pTopicNameLenList[itr]); 00097 aws_iot_mqtt_internal_write_char(&ptr, (unsigned char) pRequestedQoSs[itr]); 00098 } 00099 00100 *pSerializedLen = (uint32_t) (ptr - pTxBuf); 00101 00102 FUNC_EXIT_RC(IOT_SUCCESS); 00103 } 00104 00105 /** 00106 * Deserializes the supplied (wire) buffer into suback data 00107 * @param pPacketId returned integer - the MQTT packet identifier 00108 * @param maxExpectedQoSCount - the maximum number of members allowed in the grantedQoSs array 00109 * @param pGrantedQoSCount returned uint32_t - number of members in the grantedQoSs array 00110 * @param pGrantedQoSs returned array of QoS type - the granted qualities of service 00111 * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field 00112 * @param rxBufLen the length in bytes of the data in the supplied buffer 00113 * 00114 * @return An IoT Error Type defining successful/failed operation 00115 */ 00116 static IoT_Error_t _aws_iot_mqtt_deserialize_suback(uint16_t *pPacketId, uint32_t maxExpectedQoSCount, 00117 uint32_t *pGrantedQoSCount, QoS *pGrantedQoSs, 00118 unsigned char *pRxBuf, size_t rxBufLen) { 00119 unsigned char *curData, *endData; 00120 uint32_t decodedLen, readBytesLen; 00121 IoT_Error_t decodeRc; 00122 MQTTHeader header = {0}; 00123 00124 FUNC_ENTRY; 00125 if(NULL == pPacketId || NULL == pGrantedQoSCount || NULL == pGrantedQoSs) { 00126 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00127 } 00128 00129 curData = pRxBuf; 00130 endData = NULL; 00131 decodeRc = IOT_FAILURE; 00132 decodedLen = 0; 00133 readBytesLen = 0; 00134 00135 /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload 00136 * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9 00137 */ 00138 if(5 > rxBufLen) { 00139 FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR); 00140 } 00141 00142 header.byte = aws_iot_mqtt_internal_read_char(&curData); 00143 if(SUBACK != header.bits.type) { 00144 FUNC_EXIT_RC(IOT_FAILURE); 00145 } 00146 00147 /* read remaining length */ 00148 decodeRc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen); 00149 if(IOT_SUCCESS != decodeRc) { 00150 FUNC_EXIT_RC(decodeRc); 00151 } 00152 00153 curData += (readBytesLen); 00154 endData = curData + decodedLen; 00155 if(endData - curData < 2) { 00156 FUNC_EXIT_RC(IOT_FAILURE); 00157 } 00158 00159 *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData); 00160 00161 *pGrantedQoSCount = 0; 00162 while(curData < endData) { 00163 if(*pGrantedQoSCount > maxExpectedQoSCount) { 00164 FUNC_EXIT_RC(IOT_FAILURE); 00165 } 00166 pGrantedQoSs[(*pGrantedQoSCount)++] = (QoS) aws_iot_mqtt_internal_read_char(&curData); 00167 } 00168 00169 FUNC_EXIT_RC(IOT_SUCCESS); 00170 } 00171 00172 /* Returns MAX_MESSAGE_HANDLERS value if no free index is available */ 00173 static uint32_t _aws_iot_mqtt_get_free_message_handler_index(AWS_IoT_Client *pClient) { 00174 uint32_t itr; 00175 00176 FUNC_ENTRY; 00177 00178 for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; itr++) { 00179 if(pClient->clientData.messageHandlers[itr].topicName == NULL) { 00180 break; 00181 } 00182 } 00183 00184 FUNC_EXIT_RC(itr); 00185 } 00186 00187 /** 00188 * @brief Subscribe to an MQTT topic. 00189 * 00190 * Called to send a subscribe message to the broker requesting a subscription 00191 * to an MQTT topic. This is the internal function which is called by the 00192 * subscribe API to perform the operation. Not meant to be called directly as 00193 * it doesn't do validations or client state changes 00194 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00195 * 00196 * @param pClient Reference to the IoT Client 00197 * @param pTopicName Topic Name to publish to 00198 * @param topicNameLen Length of the topic name 00199 * @param pApplicationHandler_t Reference to the handler function for this subscription 00200 * 00201 * @return An IoT Error Type defining successful/failed subscription 00202 */ 00203 static IoT_Error_t _aws_iot_mqtt_internal_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, 00204 uint16_t topicNameLen, QoS qos, 00205 pApplicationHandler_t pApplicationHandler, 00206 void *pApplicationHandlerData) { 00207 uint16_t txPacketId, rxPacketId; 00208 uint32_t serializedLen, indexOfFreeMessageHandler, count; 00209 IoT_Error_t rc; 00210 TimerAWS timer; 00211 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00212 00213 FUNC_ENTRY; 00214 init_timer(&timer); 00215 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00216 00217 serializedLen = 0; 00218 count = 0; 00219 txPacketId = aws_iot_mqtt_get_next_packet_id(pClient); 00220 rxPacketId = 0; 00221 00222 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00223 txPacketId, 1, &pTopicName, &topicNameLen, &qos, &serializedLen); 00224 if(IOT_SUCCESS != rc) { 00225 FUNC_EXIT_RC(rc); 00226 } 00227 00228 indexOfFreeMessageHandler = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00229 if(AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS <= indexOfFreeMessageHandler) { 00230 FUNC_EXIT_RC(MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR); 00231 } 00232 00233 /* send the subscribe packet */ 00234 rc = aws_iot_mqtt_internal_send_packet(pClient, serializedLen, &timer); 00235 if(IOT_SUCCESS != rc) { 00236 FUNC_EXIT_RC(rc); 00237 } 00238 00239 /* wait for suback */ 00240 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00241 if(IOT_SUCCESS != rc) { 00242 FUNC_EXIT_RC(rc); 00243 } 00244 00245 /* Granted QoS can be 0, 1 or 2 */ 00246 rc = _aws_iot_mqtt_deserialize_suback(&rxPacketId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00247 pClient->clientData.readBufSize); 00248 if(IOT_SUCCESS != rc) { 00249 FUNC_EXIT_RC(rc); 00250 } 00251 00252 /* TODO : Figure out how to test this before activating this check */ 00253 //if(txPacketId != rxPacketId) { 00254 /* Different SUBACK received than expected. Return error 00255 * This can cause issues if the request timeout value is too small */ 00256 // return RX_MESSAGE_INVALID_ERROR; 00257 //} 00258 00259 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicName = 00260 pTopicName; 00261 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].topicNameLen = 00262 topicNameLen; 00263 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandler = 00264 pApplicationHandler; 00265 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].pApplicationHandlerData = 00266 pApplicationHandlerData; 00267 pClient->clientData.messageHandlers[indexOfFreeMessageHandler].qos = qos; 00268 00269 FUNC_EXIT_RC(IOT_SUCCESS); 00270 } 00271 00272 /** 00273 * @brief Subscribe to an MQTT topic. 00274 * 00275 * Called to send a subscribe message to the broker requesting a subscription 00276 * to an MQTT topic. This is the outer function which does the validations and 00277 * calls the internal subscribe above to perform the actual operation. 00278 * It is also responsible for client state changes 00279 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00280 * 00281 * @param pClient Reference to the IoT Client 00282 * @param pTopicName Topic Name to publish to 00283 * @param topicNameLen Length of the topic name 00284 * @param pApplicationHandler_t Reference to the handler function for this subscription 00285 * 00286 * @return An IoT Error Type defining successful/failed subscription 00287 */ 00288 IoT_Error_t aws_iot_mqtt_subscribe(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen, 00289 QoS qos, pApplicationHandler_t pApplicationHandler, void *pApplicationHandlerData) { 00290 ClientState clientState; 00291 IoT_Error_t rc, subRc; 00292 00293 FUNC_ENTRY; 00294 00295 if(NULL == pClient || NULL == pTopicName || NULL == pApplicationHandler) { 00296 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00297 } 00298 00299 if(!aws_iot_mqtt_is_client_connected(pClient)) { 00300 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00301 } 00302 00303 clientState = aws_iot_mqtt_get_client_state(pClient); 00304 if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) { 00305 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00306 } 00307 00308 rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS); 00309 if(IOT_SUCCESS != rc) { 00310 FUNC_EXIT_RC(rc); 00311 } 00312 00313 subRc = _aws_iot_mqtt_internal_subscribe(pClient, pTopicName, topicNameLen, qos, 00314 pApplicationHandler, pApplicationHandlerData); 00315 00316 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_SUBSCRIBE_IN_PROGRESS, clientState); 00317 if(IOT_SUCCESS == subRc && IOT_SUCCESS != rc) { 00318 subRc = rc; 00319 } 00320 00321 FUNC_EXIT_RC(subRc); 00322 } 00323 00324 /** 00325 * @brief Subscribe to an MQTT topic. 00326 * 00327 * Called to send a subscribe message to the broker requesting a subscription 00328 * to an MQTT topic. 00329 * This is the internal function which is called by the resubscribe API to perform the operation. 00330 * Not meant to be called directly as it doesn't do validations or client state changes 00331 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00332 * 00333 * @param pClient Reference to the IoT Client 00334 * 00335 * @return An IoT Error Type defining successful/failed subscription 00336 */ 00337 static IoT_Error_t _aws_iot_mqtt_internal_resubscribe(AWS_IoT_Client *pClient) { 00338 uint16_t packetId; 00339 uint32_t len, count, existingSubCount, itr; 00340 IoT_Error_t rc; 00341 TimerAWS timer; 00342 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00343 00344 FUNC_ENTRY; 00345 00346 packetId = 0; 00347 len = 0; 00348 count = 0; 00349 existingSubCount = _aws_iot_mqtt_get_free_message_handler_index(pClient); 00350 00351 for(itr = 0; itr < existingSubCount; itr++) { 00352 init_timer(&timer); 00353 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00354 00355 rc = _aws_iot_mqtt_serialize_subscribe(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00356 aws_iot_mqtt_get_next_packet_id(pClient), 1, 00357 &(pClient->clientData.messageHandlers[itr].topicName), 00358 &(pClient->clientData.messageHandlers[itr].topicNameLen), 00359 &(pClient->clientData.messageHandlers[itr].qos), &len); 00360 if(IOT_SUCCESS != rc) { 00361 FUNC_EXIT_RC(rc); 00362 } 00363 00364 /* send the subscribe packet */ 00365 rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer); 00366 if(IOT_SUCCESS != rc) { 00367 FUNC_EXIT_RC(rc); 00368 } 00369 00370 /* wait for suback */ 00371 rc = aws_iot_mqtt_internal_wait_for_read(pClient, SUBACK, &timer); 00372 if(IOT_SUCCESS != rc) { 00373 FUNC_EXIT_RC(rc); 00374 } 00375 00376 /* Granted QoS can be 0, 1 or 2 */ 00377 rc = _aws_iot_mqtt_deserialize_suback(&packetId, 1, &count, grantedQoS, pClient->clientData.readBuf, 00378 pClient->clientData.readBufSize); 00379 if(IOT_SUCCESS != rc) { 00380 FUNC_EXIT_RC(rc); 00381 } 00382 } 00383 00384 FUNC_EXIT_RC(IOT_SUCCESS); 00385 } 00386 00387 /** 00388 * @brief Subscribe to an MQTT topic. 00389 * 00390 * Called to send a subscribe message to the broker requesting a subscription 00391 * to an MQTT topic. 00392 * This is the outer function which does the validations and calls the internal resubscribe above 00393 * to perform the actual operation. It is also responsible for client state changes 00394 * @note Call is blocking. The call returns after the receipt of the SUBACK control packet. 00395 * 00396 * @param pClient Reference to the IoT Client 00397 * 00398 * @return An IoT Error Type defining successful/failed subscription 00399 */ 00400 IoT_Error_t aws_iot_mqtt_resubscribe(AWS_IoT_Client *pClient) { 00401 IoT_Error_t rc, resubRc; 00402 00403 FUNC_ENTRY; 00404 00405 if(NULL == pClient) { 00406 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00407 } 00408 00409 if(false == aws_iot_mqtt_is_client_connected(pClient)) { 00410 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00411 } 00412 00413 if(CLIENT_STATE_CONNECTED_IDLE != aws_iot_mqtt_get_client_state(pClient)) { 00414 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00415 } 00416 00417 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_IDLE, 00418 CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS); 00419 if(IOT_SUCCESS != rc) { 00420 FUNC_EXIT_RC(rc); 00421 } 00422 00423 resubRc = _aws_iot_mqtt_internal_resubscribe(pClient); 00424 00425 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_RESUBSCRIBE_IN_PROGRESS, 00426 CLIENT_STATE_CONNECTED_IDLE); 00427 if(IOT_SUCCESS == resubRc && IOT_SUCCESS != rc) { 00428 resubRc = rc; 00429 } 00430 00431 FUNC_EXIT_RC(resubRc); 00432 } 00433 00434 #ifdef __cplusplus 00435 } 00436 #endif 00437
Generated on Tue Jul 12 2022 11:16:37 by 1.7.2