Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
aws_iot_mqtt_client_common_internal.c
00001 /* 00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 // Based on Eclipse Paho. 00017 /******************************************************************************* 00018 * Copyright (c) 2014 IBM Corp. 00019 * 00020 * All rights reserved. This program and the accompanying materials 00021 * are made available under the terms of the Eclipse Public License v1.0 00022 * and Eclipse Distribution License v1.0 which accompany this distribution. 00023 * 00024 * The Eclipse Public License is available at 00025 * http://www.eclipse.org/legal/epl-v10.html 00026 * and the Eclipse Distribution License is available at 00027 * http://www.eclipse.org/org/documents/edl-v10.php. 00028 * 00029 * Contributors: 00030 * Ian Craggs - initial API and implementation and/or initial documentation 00031 * Sergio R. Caprile - non-blocking packet read functions for stream transport 00032 *******************************************************************************/ 00033 00034 /** 00035 * @file aws_iot_mqtt_client_common_internal.c 00036 * @brief MQTT client internal API definitions 00037 */ 00038 00039 #ifdef __cplusplus 00040 extern "C" { 00041 #endif 00042 00043 #include <aws_iot_mqtt_client.h> 00044 #include "aws_iot_mqtt_client_common_internal.h" 00045 00046 /* Max length of packet header */ 00047 #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 00048 00049 /** 00050 * Encodes the message length according to the MQTT algorithm 00051 * @param buf the buffer into which the encoded data is written 00052 * @param length the length to be encoded 00053 * @return the number of bytes written to buffer 00054 */ 00055 size_t aws_iot_mqtt_internal_write_len_to_buffer(unsigned char *buf, uint32_t length) { 00056 size_t outLen = 0; 00057 unsigned char encodedByte; 00058 00059 FUNC_ENTRY; 00060 do { 00061 encodedByte = (unsigned char) (length % 128); 00062 length /= 128; 00063 /* if there are more digits to encode, set the top bit of this digit */ 00064 if(length > 0) { 00065 encodedByte |= 0x80; 00066 } 00067 buf[outLen++] = encodedByte; 00068 } while(length > 0); 00069 00070 FUNC_EXIT_RC(outLen); 00071 } 00072 00073 /** 00074 * Decodes the message length according to the MQTT algorithm 00075 * @param the buffer containing the message 00076 * @param value the decoded length returned 00077 * @return the number of bytes read from the socket 00078 */ 00079 IoT_Error_t aws_iot_mqtt_internal_decode_remaining_length_from_buffer(unsigned char *buf, uint32_t *decodedLen, 00080 uint32_t *readBytesLen) { 00081 unsigned char encodedByte; 00082 uint32_t multiplier, len; 00083 FUNC_ENTRY; 00084 00085 multiplier = 1; 00086 len = 0; 00087 *decodedLen = 0; 00088 00089 do { 00090 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { 00091 /* bad data */ 00092 FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); 00093 } 00094 encodedByte = *buf; 00095 buf++; 00096 *decodedLen += (encodedByte & 127) * multiplier; 00097 multiplier *= 128; 00098 } while((encodedByte & 128) != 0); 00099 00100 *readBytesLen = len; 00101 00102 FUNC_EXIT_RC(AWS_SUCCESS); 00103 } 00104 00105 uint32_t aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(uint32_t rem_len) { 00106 rem_len += 1; /* header byte */ 00107 /* now remaining_length field (MQTT 3.1.1 - 2.2.3)*/ 00108 if(rem_len < 128) { 00109 rem_len += 1; 00110 } else if(rem_len < 16384) { 00111 rem_len += 2; 00112 } else if(rem_len < 2097152) { 00113 rem_len += 3; 00114 } else { 00115 rem_len += 4; 00116 } 00117 return rem_len; 00118 } 00119 00120 /** 00121 * Calculates uint16 packet id from two bytes read from the input buffer 00122 * Checks Endianness at runtime 00123 * 00124 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00125 * @return the value calculated 00126 */ 00127 uint16_t aws_iot_mqtt_internal_read_uint16_t(unsigned char **pptr) { 00128 unsigned char *ptr = *pptr; 00129 uint16_t len = 0; 00130 uint8_t firstByte = (uint8_t) (*ptr); 00131 uint8_t secondByte = (uint8_t) (*(ptr + 1)); 00132 len = (uint16_t) (secondByte + (256 * firstByte)); 00133 00134 *pptr += 2; 00135 return len; 00136 } 00137 00138 /** 00139 * Writes an integer as 2 bytes to an output buffer. 00140 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00141 * @param anInt the integer to write 00142 */ 00143 void aws_iot_mqtt_internal_write_uint_16(unsigned char **pptr, uint16_t anInt) { 00144 **pptr = (unsigned char) (anInt / 256); 00145 (*pptr)++; 00146 **pptr = (unsigned char) (anInt % 256); 00147 (*pptr)++; 00148 } 00149 00150 /** 00151 * Reads one character from the input buffer. 00152 * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned 00153 * @return the character read 00154 */ 00155 unsigned char aws_iot_mqtt_internal_read_char(unsigned char **pptr) { 00156 unsigned char c = **pptr; 00157 (*pptr)++; 00158 return c; 00159 } 00160 00161 /** 00162 * Writes one character to an output buffer. 00163 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00164 * @param c the character to write 00165 */ 00166 void aws_iot_mqtt_internal_write_char(unsigned char **pptr, unsigned char c) { 00167 **pptr = c; 00168 (*pptr)++; 00169 } 00170 00171 void aws_iot_mqtt_internal_write_utf8_string(unsigned char **pptr, const char *string, uint16_t stringLen) { 00172 /* Nothing that calls this function will have a stringLen with a size larger than 2 bytes (MQTT 3.1.1 - 1.5.3) */ 00173 aws_iot_mqtt_internal_write_uint_16(pptr, stringLen); 00174 if(stringLen > 0) { 00175 memcpy(*pptr, string, stringLen); 00176 *pptr += stringLen; 00177 } 00178 } 00179 00180 /** 00181 * Initialize the MQTTHeader structure. Used to ensure that Header bits are 00182 * always initialized using the proper mappings. No Endianness issues here since 00183 * the individual fields are all less than a byte. Also generates no warnings since 00184 * all fields are initialized using hex constants 00185 */ 00186 IoT_Error_t aws_iot_mqtt_internal_init_header(MQTTHeader *pHeader, MessageTypes message_type, 00187 QoS qos, uint8_t dup, uint8_t retained) { 00188 FUNC_ENTRY; 00189 00190 if(NULL == pHeader) { 00191 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00192 } 00193 00194 /* Set all bits to zero */ 00195 pHeader->byte = 0; 00196 uint8_t type = 0; 00197 switch(message_type) { 00198 case UNKNOWN: 00199 /* Should never happen */ 00200 return FAILURE; 00201 case CONNECT: 00202 type = 0x01; 00203 break; 00204 case CONNACK: 00205 type = 0x02; 00206 break; 00207 case PUBLISH: 00208 type = 0x03; 00209 break; 00210 case PUBACK: 00211 type = 0x04; 00212 break; 00213 case PUBREC: 00214 type = 0x05; 00215 break; 00216 case PUBREL: 00217 type = 0x06; 00218 break; 00219 case PUBCOMP: 00220 type = 0x07; 00221 break; 00222 case SUBSCRIBE: 00223 type = 0x08; 00224 break; 00225 case SUBACK: 00226 type = 0x09; 00227 break; 00228 case UNSUBSCRIBE: 00229 type = 0x0A; 00230 break; 00231 case UNSUBACK: 00232 type = 0x0B; 00233 break; 00234 case PINGREQ: 00235 type = 0x0C; 00236 break; 00237 case PINGRESP: 00238 type = 0x0D; 00239 break; 00240 case DISCONNECT: 00241 type = 0x0E; 00242 break; 00243 default: 00244 /* Should never happen */ 00245 FUNC_EXIT_RC(FAILURE); 00246 } 00247 00248 pHeader->byte = type << 4; 00249 pHeader->byte |= dup << 3; 00250 00251 switch(qos) { 00252 case QOS0: 00253 break; 00254 case QOS1: 00255 pHeader->byte |= 1 << 1; 00256 break; 00257 default: 00258 /* Using QOS0 as default */ 00259 break; 00260 } 00261 00262 pHeader->byte |= (1 == retained) ? 0x01 : 0x00; 00263 00264 FUNC_EXIT_RC(AWS_SUCCESS); 00265 } 00266 00267 IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t length, awsTimer *pTimer) { 00268 00269 size_t sentLen, sent; 00270 IoT_Error_t rc; 00271 00272 FUNC_ENTRY; 00273 00274 if(NULL == pClient || NULL == pTimer) { 00275 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00276 } 00277 00278 if(length >= pClient->clientData.writeBufSize) { 00279 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00280 } 00281 00282 #ifdef _ENABLE_THREAD_SUPPORT_ 00283 rc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); 00284 if(AWS_SUCCESS != rc) { 00285 FUNC_EXIT_RC(rc); 00286 } 00287 #endif 00288 00289 sentLen = 0; 00290 sent = 0; 00291 00292 while(sent < length && !has_timer_expired(pTimer)) { 00293 rc = pClient->networkStack.write(&(pClient->networkStack), 00294 &pClient->clientData.writeBuf[sent], 00295 (length - sent), 00296 pTimer, 00297 &sentLen); 00298 if(AWS_SUCCESS != rc) { 00299 /* there was an error writing the data */ 00300 break; 00301 } 00302 sent += sentLen; 00303 } 00304 00305 #ifdef _ENABLE_THREAD_SUPPORT_ 00306 rc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_write_mutex)); 00307 if(AWS_SUCCESS != rc) { 00308 FUNC_EXIT_RC(rc); 00309 } 00310 #endif 00311 00312 if(sent == length) { 00313 /* record the fact that we have successfully sent the packet */ 00314 //countdown_sec(&c->pingTimer, c->clientData.keepAliveInterval); 00315 FUNC_EXIT_RC(AWS_SUCCESS); 00316 } 00317 00318 FUNC_EXIT_RC(rc) 00319 } 00320 00321 static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Client *pClient, 00322 size_t *rem_len, awsTimer *pTimer) { 00323 unsigned char encodedByte; 00324 size_t multiplier, len; 00325 IoT_Error_t rc; 00326 00327 FUNC_ENTRY; 00328 00329 multiplier = 1; 00330 len = 0; 00331 *rem_len = 0; 00332 00333 do { 00334 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { 00335 /* bad data */ 00336 FUNC_EXIT_RC(MQTT_DECODE_REMAINING_LENGTH_ERROR); 00337 } 00338 00339 rc = pClient->networkStack.read(&(pClient->networkStack), &encodedByte, 1, pTimer, &len); 00340 if(AWS_SUCCESS != rc) { 00341 FUNC_EXIT_RC(rc); 00342 } 00343 00344 *rem_len += ((encodedByte & 127) * multiplier); 00345 multiplier *= 128; 00346 } while((encodedByte & 128) != 0); 00347 00348 FUNC_EXIT_RC(rc); 00349 } 00350 00351 static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { 00352 size_t len, rem_len, total_bytes_read, bytes_to_be_read, read_len; 00353 IoT_Error_t rc; 00354 MQTTHeader header = {0}; 00355 awsTimer packetTimer; 00356 init_timer(&packetTimer); 00357 countdown_ms(&packetTimer, pClient->clientData.packetTimeoutMs); 00358 00359 rem_len = 0; 00360 total_bytes_read = 0; 00361 bytes_to_be_read = 0; 00362 read_len = 0; 00363 00364 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, 1, pTimer, &read_len); 00365 //printf("JMF:%s:%d read %d from networkStack.read\n",__FILE__,__LINE__,rc); 00366 00367 /* 1. read the header byte. This has the packet type in it */ 00368 if(NETWORK_SSL_NOTHING_TO_READ == rc) { 00369 return MQTT_NOTHING_TO_READ; 00370 } else if(AWS_SUCCESS != rc) { 00371 return rc; 00372 } 00373 00374 len = 1; 00375 00376 /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to 00377 * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving 00378 * if the remaining time in pTimer is too short. 00379 */ 00380 pTimer = &packetTimer; 00381 00382 /* 2. read the remaining length. This is variable in itself */ 00383 rc = _aws_iot_mqtt_internal_decode_packet_remaining_len(pClient, &rem_len, pTimer); 00384 if(AWS_SUCCESS != rc) { 00385 return rc; 00386 } 00387 00388 /* if the buffer is too short then the message will be dropped silently */ 00389 if(rem_len >= pClient->clientData.readBufSize) { 00390 bytes_to_be_read = pClient->clientData.readBufSize; 00391 do { 00392 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf, bytes_to_be_read, 00393 pTimer, &read_len); 00394 if(AWS_SUCCESS == rc) { 00395 total_bytes_read += read_len; 00396 if((rem_len - total_bytes_read) >= pClient->clientData.readBufSize) { 00397 bytes_to_be_read = pClient->clientData.readBufSize; 00398 } else { 00399 bytes_to_be_read = rem_len - total_bytes_read; 00400 } 00401 } 00402 } while(total_bytes_read < rem_len && AWS_SUCCESS == rc); 00403 return MQTT_RX_BUFFER_TOO_SHORT_ERROR; 00404 } 00405 00406 /* put the original remaining length into the read buffer */ 00407 len += aws_iot_mqtt_internal_write_len_to_buffer(pClient->clientData.readBuf + 1, (uint32_t) rem_len); 00408 00409 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00410 if(rem_len > 0) { 00411 rc = pClient->networkStack.read(&(pClient->networkStack), pClient->clientData.readBuf + len, rem_len, pTimer, 00412 &read_len); 00413 if(AWS_SUCCESS != rc || read_len != rem_len) { 00414 return FAILURE; 00415 } 00416 } 00417 00418 header.byte = pClient->clientData.readBuf[0]; 00419 *pPacketType = MQTT_HEADER_FIELD_TYPE(header.byte); 00420 00421 FUNC_EXIT_RC(rc); 00422 } 00423 00424 // assume topic filter and name is in correct format 00425 // # can only be at end 00426 // + and # can only be next to separator 00427 static bool _aws_iot_mqtt_internal_is_topic_matched(char *pTopicFilter, char *pTopicName, uint16_t topicNameLen) { 00428 00429 char *curf, *curn, *curn_end; 00430 00431 if(NULL == pTopicFilter || NULL == pTopicName) { 00432 return false; 00433 } 00434 00435 curf = pTopicFilter; 00436 curn = pTopicName; 00437 curn_end = curn + topicNameLen; 00438 00439 while(*curf && (curn < curn_end)) { 00440 if(*curn == '/' && *curf != '/') { 00441 break; 00442 } 00443 if(*curf != '+' && *curf != '#' && *curf != *curn) { 00444 break; 00445 } 00446 if(*curf == '+') { 00447 /* skip until we meet the next separator, or end of string */ 00448 char *nextpos = curn + 1; 00449 while(nextpos < curn_end && *nextpos != '/') 00450 nextpos = ++curn + 1; 00451 } else if(*curf == '#') { 00452 /* skip until end of string */ 00453 curn = curn_end - 1; 00454 } 00455 00456 curf++; 00457 curn++; 00458 }; 00459 00460 return (curn == curn_end) && (*curf == '\0'); 00461 } 00462 00463 static IoT_Error_t _aws_iot_mqtt_internal_deliver_message(AWS_IoT_Client *pClient, char *pTopicName, 00464 uint16_t topicNameLen, 00465 IoT_Publish_Message_Params *pMessageParams) { 00466 uint32_t itr; 00467 IoT_Error_t rc; 00468 ClientState clientState; 00469 00470 FUNC_ENTRY; 00471 00472 if(NULL == pTopicName) { 00473 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00474 } 00475 00476 /* This function can be called from all MQTT APIs 00477 * But while callback return is in progress, Yield should not be called. 00478 * The state for CB_RETURN accomplishes that, as yield cannot be called while in that state */ 00479 clientState = aws_iot_mqtt_get_client_state(pClient); 00480 aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN); 00481 00482 /* Find the right message handler - indexed by topic */ 00483 for(itr = 0; itr < AWS_IOT_MQTT_NUM_SUBSCRIBE_HANDLERS; ++itr) { 00484 if(NULL != pClient->clientData.messageHandlers[itr].topicName) { 00485 if(((topicNameLen == pClient->clientData.messageHandlers[itr].topicNameLen) 00486 && 00487 (strncmp(pTopicName, (char *) pClient->clientData.messageHandlers[itr].topicName, topicNameLen) == 0)) 00488 || _aws_iot_mqtt_internal_is_topic_matched((char *) pClient->clientData.messageHandlers[itr].topicName, 00489 pTopicName, topicNameLen)) { 00490 if(NULL != pClient->clientData.messageHandlers[itr].pApplicationHandler) { 00491 pClient->clientData.messageHandlers[itr].pApplicationHandler(pClient, pTopicName, topicNameLen, 00492 pMessageParams, 00493 pClient->clientData.messageHandlers[itr].pApplicationHandlerData); 00494 } 00495 } 00496 } 00497 } 00498 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN, clientState); 00499 00500 FUNC_EXIT_RC(rc); 00501 } 00502 00503 static IoT_Error_t _aws_iot_mqtt_internal_handle_publish(AWS_IoT_Client *pClient, awsTimer *pTimer) { 00504 char *topicName; 00505 uint16_t topicNameLen; 00506 uint32_t len; 00507 IoT_Error_t rc; 00508 IoT_Publish_Message_Params msg; 00509 00510 FUNC_ENTRY; 00511 00512 topicName = NULL; 00513 topicNameLen = 0; 00514 len = 0; 00515 00516 rc = aws_iot_mqtt_internal_deserialize_publish(&msg.isDup, &msg.qos, &msg.isRetained, 00517 &msg.id, &topicName, &topicNameLen, 00518 (unsigned char **) &msg.payload, &msg.payloadLen, 00519 pClient->clientData.readBuf, 00520 pClient->clientData.readBufSize); 00521 00522 if(AWS_SUCCESS != rc) { 00523 FUNC_EXIT_RC(rc); 00524 } 00525 00526 rc = _aws_iot_mqtt_internal_deliver_message(pClient, topicName, topicNameLen, &msg); 00527 if(AWS_SUCCESS != rc) { 00528 FUNC_EXIT_RC(rc); 00529 } 00530 00531 if(QOS0 == msg.qos) { 00532 /* No further processing required for QoS0 */ 00533 FUNC_EXIT_RC(AWS_SUCCESS); 00534 } 00535 00536 /* Message assumed to be QoS1 since we do not support QoS2 at this time */ 00537 rc = aws_iot_mqtt_internal_serialize_ack(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 00538 PUBACK, 0, msg.id, &len); 00539 00540 if(AWS_SUCCESS != rc) { 00541 FUNC_EXIT_RC(rc); 00542 } 00543 00544 rc = aws_iot_mqtt_internal_send_packet(pClient, len, pTimer); 00545 if(AWS_SUCCESS != rc) { 00546 FUNC_EXIT_RC(rc); 00547 } 00548 00549 FUNC_EXIT_RC(AWS_SUCCESS); 00550 } 00551 00552 IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, awsTimer *pTimer, uint8_t *pPacketType) { 00553 IoT_Error_t rc; 00554 //printf("JMF! in aws_iot_mqtt_internal_cycle_read\n"); 00555 #ifdef _ENABLE_THREAD_SUPPORT_ 00556 IoT_Error_t threadRc; 00557 #endif 00558 00559 if(NULL == pClient || NULL == pTimer) { 00560 return NULL_VALUE_ERROR; 00561 } 00562 00563 #ifdef _ENABLE_THREAD_SUPPORT_ 00564 threadRc = aws_iot_mqtt_client_lock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); 00565 if(AWS_SUCCESS != threadRc) { 00566 FUNC_EXIT_RC(threadRc); 00567 } 00568 #endif 00569 00570 /* read the socket, see what work is due */ 00571 rc = _aws_iot_mqtt_internal_read_packet(pClient, pTimer, pPacketType); 00572 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); 00573 00574 #ifdef _ENABLE_THREAD_SUPPORT_ 00575 threadRc = aws_iot_mqtt_client_unlock_mutex(pClient, &(pClient->clientData.tls_read_mutex)); 00576 if(AWS_SUCCESS != threadRc && (MQTT_NOTHING_TO_READ == rc || AWS_SUCCESS == rc)) { 00577 return threadRc; 00578 } 00579 #endif 00580 00581 if(MQTT_NOTHING_TO_READ == rc) { 00582 /* Nothing to read, not a cycle failure */ 00583 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); 00584 return AWS_SUCCESS; 00585 } else if(AWS_SUCCESS != rc) { 00586 //printf("JMF:%s:%d rc=%d\n",__FILE__,__LINE__,rc); 00587 return rc; 00588 } 00589 //printf("JMF: cycle_read got a char, switch on %d\n",*pPacketType); 00590 00591 switch(*pPacketType) { 00592 case CONNACK: 00593 case PUBACK: 00594 case SUBACK: 00595 case UNSUBACK: 00596 /* SDK is blocking, these responses will be forwarded to calling function to process */ 00597 break; 00598 case PUBLISH: { 00599 rc = _aws_iot_mqtt_internal_handle_publish(pClient, pTimer); 00600 break; 00601 } 00602 case PUBREC: 00603 case PUBCOMP: 00604 /* QoS2 not supported at this time */ 00605 break; 00606 case PINGRESP: { 00607 pClient->clientStatus.isPingOutstanding = 0; 00608 countdown_sec(&pClient->pingTimer, pClient->clientData.keepAliveInterval); 00609 break; 00610 } 00611 default: { 00612 /* Either unknown packet type or Failure occurred 00613 * Should not happen */ 00614 rc = MQTT_RX_MESSAGE_PACKET_TYPE_INVALID_ERROR; 00615 break; 00616 } 00617 } 00618 00619 //printf("JMF:%s:%d EXITING rc=%d\n",__FILE__,__LINE__,rc); 00620 return rc; 00621 } 00622 00623 /* only used in single-threaded mode where one command at a time is in process */ 00624 IoT_Error_t aws_iot_mqtt_internal_wait_for_read(AWS_IoT_Client *pClient, uint8_t packetType, awsTimer *pTimer) { 00625 IoT_Error_t rc; 00626 uint8_t read_packet_type; 00627 00628 FUNC_ENTRY; 00629 if(NULL == pClient || NULL == pTimer) { 00630 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00631 } 00632 00633 read_packet_type = 0; 00634 do { 00635 if(has_timer_expired(pTimer)) { 00636 /* we timed out */ 00637 rc = MQTT_REQUEST_TIMEOUT_ERROR; 00638 break; 00639 } 00640 rc = aws_iot_mqtt_internal_cycle_read(pClient, pTimer, &read_packet_type); 00641 } while(((AWS_SUCCESS == rc) || (MQTT_NOTHING_TO_READ == rc)) && (read_packet_type != packetType)); 00642 00643 /* If rc is AWS_SUCCESS, we have received the expected 00644 * MQTT packet. Otherwise rc tells the error. */ 00645 FUNC_EXIT_RC(rc); 00646 } 00647 00648 /** 00649 * Serializes a 0-length packet into the supplied buffer, ready for writing to a socket 00650 * @param buf the buffer into which the packet will be serialized 00651 * @param buflen the length in bytes of the supplied buffer, to avoid overruns 00652 * @param packettype the message type 00653 * @param serialized length 00654 * @return IoT_Error_t indicating function execution status 00655 */ 00656 IoT_Error_t aws_iot_mqtt_internal_serialize_zero(unsigned char *pTxBuf, size_t txBufLen, MessageTypes packetType, 00657 size_t *pSerializedLength) { 00658 unsigned char *ptr; 00659 IoT_Error_t rc; 00660 MQTTHeader header = {0}; 00661 00662 FUNC_ENTRY; 00663 if(NULL == pTxBuf || NULL == pSerializedLength) { 00664 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00665 } 00666 00667 /* Buffer should have at least 2 bytes for the header */ 00668 if(4 > txBufLen) { 00669 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00670 } 00671 00672 ptr = pTxBuf; 00673 00674 rc = aws_iot_mqtt_internal_init_header(&header, packetType, QOS0, 0, 0); 00675 if(AWS_SUCCESS != rc) { 00676 FUNC_EXIT_RC(rc); 00677 } 00678 00679 /* write header */ 00680 aws_iot_mqtt_internal_write_char(&ptr, header.byte); 00681 00682 /* write remaining length */ 00683 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 0); 00684 *pSerializedLength = (uint32_t) (ptr - pTxBuf); 00685 00686 FUNC_EXIT_RC(AWS_SUCCESS); 00687 } 00688 00689 #ifdef __cplusplus 00690 } 00691 #endif
Generated on Tue Jul 12 2022 19:02:38 by
1.7.2