Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of AWS-test by
aws_iot_mqtt_client_publish.cpp
00001 /* 00002 * Copyright 2015-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. 00003 * 00004 * Licensed under the Apache License, Version 2.0 (the "License"). 00005 * You may not use this file except in compliance with the License. 00006 * A copy of the License is located at 00007 * 00008 * http://aws.amazon.com/apache2.0 00009 * 00010 * or in the "license" file accompanying this file. This file is distributed 00011 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 00012 * express or implied. See the License for the specific language governing 00013 * permissions and limitations under the License. 00014 */ 00015 00016 // Based on Eclipse Paho. 00017 /******************************************************************************* 00018 * Copyright (c) 2014 IBM Corp. 00019 * 00020 * All rights reserved. This program and the accompanying materials 00021 * are made available under the terms of the Eclipse Public License v1.0 00022 * and Eclipse Distribution License v1.0 which accompany this distribution. 00023 * 00024 * The Eclipse Public License is available at 00025 * http://www.eclipse.org/legal/epl-v10.html 00026 * and the Eclipse Distribution License is available at 00027 * http://www.eclipse.org/org/documents/edl-v10.php. 00028 * 00029 * Contributors: 00030 * Ian Craggs - initial API and implementation and/or initial documentation 00031 * Ian Craggs - fix for https://bugs.eclipse.org/bugs/show_bug.cgi?id=453144 00032 *******************************************************************************/ 00033 00034 /** 00035 * @file aws_iot_mqtt_client_publish.c 00036 * @brief MQTT client publish API definitions 00037 */ 00038 00039 #ifdef __cplusplus 00040 extern "C" { 00041 #endif 00042 00043 #include "aws_iot_mqtt_client_common_internal.h" 00044 00045 /** 00046 * @param stringVar pointer to the String into which the data is to be read 00047 * @param stringLen pointer to variable which has the length of the string 00048 * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned 00049 * @param enddata pointer to the end of the data: do not read beyond 00050 * @return IOT_SUCCESS if successful, IOT_FAILURE if not 00051 */ 00052 static IoT_Error_t _aws_iot_mqtt_read_string_with_len(char **stringVar, uint16_t *stringLen, 00053 unsigned char **pptr, unsigned char *enddata) { 00054 IoT_Error_t rc = IOT_FAILURE; 00055 00056 FUNC_ENTRY; 00057 /* the first two bytes are the length of the string */ 00058 /* enough length to read the integer? */ 00059 if(enddata - (*pptr) > 1) { 00060 *stringLen = aws_iot_mqtt_internal_read_uint16_t(pptr); /* increments pptr to point past length */ 00061 if(&(*pptr)[*stringLen] <= enddata) { 00062 *stringVar = (char *) *pptr; 00063 *pptr += *stringLen; 00064 rc = IOT_SUCCESS; 00065 } 00066 } 00067 00068 FUNC_EXIT_RC(rc); 00069 } 00070 00071 /** 00072 * Serializes the supplied publish data into the supplied buffer, ready for sending 00073 * @param pTxBuf the buffer into which the packet will be serialized 00074 * @param txBufLen the length in bytes of the supplied buffer 00075 * @param dup uint8_t - the MQTT dup flag 00076 * @param qos QoS - the MQTT QoS value 00077 * @param retained uint8_t - the MQTT retained flag 00078 * @param packetId uint16_t - the MQTT packet identifier 00079 * @param pTopicName char * - the MQTT topic in the publish 00080 * @param topicNameLen uint16_t - the length of the Topic Name 00081 * @param pPayload byte buffer - the MQTT publish payload 00082 * @param payloadLen size_t - the length of the MQTT payload 00083 * @param pSerializedLen uint32_t - pointer to the variable that stores serialized len 00084 * 00085 * @return An IoT Error Type defining successful/failed call 00086 */ 00087 static IoT_Error_t _aws_iot_mqtt_internal_serialize_publish(unsigned char *pTxBuf, size_t txBufLen, uint8_t dup, 00088 QoS qos, uint8_t retained, uint16_t packetId, 00089 const char *pTopicName, uint16_t topicNameLen, 00090 const unsigned char *pPayload, size_t payloadLen, 00091 uint32_t *pSerializedLen) { 00092 unsigned char *ptr; 00093 uint32_t rem_len; 00094 IoT_Error_t rc; 00095 MQTTHeader header = {0}; 00096 00097 FUNC_ENTRY; 00098 if(NULL == pTxBuf || NULL == pPayload || NULL == pSerializedLen) { 00099 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00100 } 00101 00102 ptr = pTxBuf; 00103 rem_len = 0; 00104 00105 rem_len += (uint32_t) (topicNameLen + payloadLen + 2); 00106 if(qos > 0) { 00107 rem_len += 2; /* packetId */ 00108 } 00109 if(aws_iot_mqtt_internal_get_final_packet_length_from_remaining_length(rem_len) > txBufLen) { 00110 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00111 } 00112 00113 rc = aws_iot_mqtt_internal_init_header(&header, PUBLISH, qos, dup, retained); 00114 if(IOT_SUCCESS != rc) { 00115 FUNC_EXIT_RC(rc); 00116 } 00117 aws_iot_mqtt_internal_write_char(&ptr, header.byte); /* write header */ 00118 00119 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, rem_len); /* write remaining length */; 00120 00121 aws_iot_mqtt_internal_write_utf8_string(&ptr, pTopicName, topicNameLen); 00122 00123 if(qos > 0) { 00124 aws_iot_mqtt_internal_write_uint_16(&ptr, packetId); 00125 } 00126 00127 memcpy(ptr, pPayload, payloadLen); 00128 ptr += payloadLen; 00129 00130 *pSerializedLen = (uint32_t) (ptr - pTxBuf); 00131 00132 FUNC_EXIT_RC(IOT_SUCCESS); 00133 } 00134 00135 /** 00136 * Serializes the ack packet into the supplied buffer. 00137 * @param pTxBuf the buffer into which the packet will be serialized 00138 * @param txBufLen the length in bytes of the supplied buffer 00139 * @param msgType the MQTT packet type 00140 * @param dup the MQTT dup flag 00141 * @param packetId the MQTT packet identifier 00142 * @param pSerializedLen uint32_t - pointer to the variable that stores serialized len 00143 * 00144 * @return An IoT Error Type defining successful/failed call 00145 */ 00146 IoT_Error_t aws_iot_mqtt_internal_serialize_ack(unsigned char *pTxBuf, size_t txBufLen, 00147 MessageTypes msgType, uint8_t dup, uint16_t packetId, 00148 uint32_t *pSerializedLen) { 00149 unsigned char *ptr; 00150 QoS requestQoS; 00151 IoT_Error_t rc; 00152 MQTTHeader header = {0}; 00153 FUNC_ENTRY; 00154 if(NULL == pTxBuf || pSerializedLen == NULL) { 00155 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00156 } 00157 00158 ptr = pTxBuf; 00159 00160 /* Minimum byte length required by ACK headers is 00161 * 2 for fixed and 2 for variable part */ 00162 if(4 > txBufLen) { 00163 FUNC_EXIT_RC(MQTT_TX_BUFFER_TOO_SHORT_ERROR); 00164 } 00165 00166 requestQoS = (PUBREL == msgType) ? QOS1 : QOS0; 00167 rc = aws_iot_mqtt_internal_init_header(&header, msgType, requestQoS, dup, 0); 00168 if(IOT_SUCCESS != rc) { 00169 FUNC_EXIT_RC(rc); 00170 } 00171 aws_iot_mqtt_internal_write_char(&ptr, header.byte); /* write header */ 00172 00173 ptr += aws_iot_mqtt_internal_write_len_to_buffer(ptr, 2); /* write remaining length */ 00174 aws_iot_mqtt_internal_write_uint_16(&ptr, packetId); 00175 *pSerializedLen = (uint32_t) (ptr - pTxBuf); 00176 00177 FUNC_EXIT_RC(IOT_SUCCESS); 00178 } 00179 00180 /** 00181 * @brief Publish an MQTT message on a topic 00182 * 00183 * Called to publish an MQTT message on a topic. 00184 * @note Call is blocking. In the case of a QoS 0 message the function returns 00185 * after the message was successfully passed to the TLS layer. In the case of QoS 1 00186 * the function returns after the receipt of the PUBACK control packet. 00187 * This is the internal function which is called by the publish API to perform the operation. 00188 * Not meant to be called directly as it doesn't do validations or client state changes 00189 * 00190 * @param pClient Reference to the IoT Client 00191 * @param pTopicName Topic Name to publish to 00192 * @param topicNameLen Length of the topic name 00193 * @param pParams Pointer to Publish Message parameters 00194 * 00195 * @return An IoT Error Type defining successful/failed publish 00196 */ 00197 static IoT_Error_t _aws_iot_mqtt_internal_publish(AWS_IoT_Client *pClient, const char *pTopicName, 00198 uint16_t topicNameLen, IoT_Publish_Message_Params *pParams) { 00199 TimerAWS timer; 00200 uint32_t len = 0; 00201 uint16_t packet_id; 00202 unsigned char dup, type; 00203 IoT_Error_t rc; 00204 00205 FUNC_ENTRY; 00206 00207 init_timer(&timer); 00208 countdown_ms(&timer, pClient->clientData.commandTimeoutMs); 00209 00210 if(QOS1 == pParams->qos) { 00211 pParams->id = aws_iot_mqtt_get_next_packet_id(pClient); 00212 } 00213 00214 rc = _aws_iot_mqtt_internal_serialize_publish(pClient->clientData.writeBuf, pClient->clientData.writeBufSize, 0, 00215 pParams->qos, pParams->isRetained, pParams->id, pTopicName, 00216 topicNameLen, (unsigned char *) pParams->payload, 00217 pParams->payloadLen, &len); 00218 if(IOT_SUCCESS != rc) { 00219 FUNC_EXIT_RC(rc); 00220 } 00221 00222 /* send the publish packet */ 00223 rc = aws_iot_mqtt_internal_send_packet(pClient, len, &timer); 00224 if(IOT_SUCCESS != rc) { 00225 FUNC_EXIT_RC(rc); 00226 } 00227 00228 /* Wait for ack if QoS1 */ 00229 if(QOS1 == pParams->qos) { 00230 rc = aws_iot_mqtt_internal_wait_for_read(pClient, PUBACK, &timer); 00231 if(IOT_SUCCESS != rc) { 00232 FUNC_EXIT_RC(rc); 00233 } 00234 00235 rc = aws_iot_mqtt_internal_deserialize_ack(&type, &dup, &packet_id, pClient->clientData.readBuf, 00236 pClient->clientData.readBufSize); 00237 if(IOT_SUCCESS != rc) { 00238 FUNC_EXIT_RC(rc); 00239 } 00240 } 00241 00242 FUNC_EXIT_RC(IOT_SUCCESS); 00243 } 00244 00245 /** 00246 * @brief Publish an MQTT message on a topic 00247 * 00248 * Called to publish an MQTT message on a topic. 00249 * @note Call is blocking. In the case of a QoS 0 message the function returns 00250 * after the message was successfully passed to the TLS layer. In the case of QoS 1 00251 * the function returns after the receipt of the PUBACK control packet. 00252 * This is the outer function which does the validations and calls the internal publish above 00253 * to perform the actual operation. It is also responsible for client state changes 00254 * 00255 * @param pClient Reference to the IoT Client 00256 * @param pTopicName Topic Name to publish to 00257 * @param topicNameLen Length of the topic name 00258 * @param pParams Pointer to Publish Message parameters 00259 * 00260 * @return An IoT Error Type defining successful/failed publish 00261 */ 00262 IoT_Error_t aws_iot_mqtt_publish(AWS_IoT_Client *pClient, const char *pTopicName, uint16_t topicNameLen, 00263 IoT_Publish_Message_Params *pParams) { 00264 IoT_Error_t rc, pubRc; 00265 ClientState clientState; 00266 00267 FUNC_ENTRY; 00268 00269 if(NULL == pClient || NULL == pTopicName || 0 == topicNameLen || NULL == pParams) { 00270 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00271 } 00272 00273 if(!aws_iot_mqtt_is_client_connected(pClient)) { 00274 FUNC_EXIT_RC(NETWORK_DISCONNECTED_ERROR); 00275 } 00276 00277 clientState = aws_iot_mqtt_get_client_state(pClient); 00278 if(CLIENT_STATE_CONNECTED_IDLE != clientState && CLIENT_STATE_CONNECTED_WAIT_FOR_CB_RETURN != clientState) { 00279 FUNC_EXIT_RC(MQTT_CLIENT_NOT_IDLE_ERROR); 00280 } 00281 00282 rc = aws_iot_mqtt_set_client_state(pClient, clientState, CLIENT_STATE_CONNECTED_PUBLISH_IN_PROGRESS); 00283 if(IOT_SUCCESS != rc) { 00284 FUNC_EXIT_RC(rc); 00285 } 00286 00287 pubRc = _aws_iot_mqtt_internal_publish(pClient, pTopicName, topicNameLen, pParams); 00288 00289 rc = aws_iot_mqtt_set_client_state(pClient, CLIENT_STATE_CONNECTED_PUBLISH_IN_PROGRESS, clientState); 00290 if(IOT_SUCCESS == pubRc && IOT_SUCCESS != rc) { 00291 pubRc = rc; 00292 } 00293 00294 FUNC_EXIT_RC(pubRc); 00295 } 00296 00297 /** 00298 * Deserializes the supplied (wire) buffer into publish data 00299 * @param dup returned uint8_t - the MQTT dup flag 00300 * @param qos returned QoS type - the MQTT QoS value 00301 * @param retained returned uint8_t - the MQTT retained flag 00302 * @param pPacketId returned uint16_t - the MQTT packet identifier 00303 * @param pTopicName returned String - the MQTT topic in the publish 00304 * @param topicNameLen returned uint16_t - the length of the MQTT topic in the publish 00305 * @param payload returned byte buffer - the MQTT publish payload 00306 * @param payloadlen returned size_t - the length of the MQTT payload 00307 * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field 00308 * @param rxBufLen the length in bytes of the data in the supplied buffer 00309 * 00310 * @return An IoT Error Type defining successful/failed call 00311 */ 00312 IoT_Error_t aws_iot_mqtt_internal_deserialize_publish(uint8_t *dup, QoS *qos, 00313 uint8_t *retained, uint16_t *pPacketId, 00314 char **pTopicName, uint16_t *topicNameLen, 00315 unsigned char **payload, size_t *payloadLen, 00316 unsigned char *pRxBuf, size_t rxBufLen) { 00317 unsigned char *curData = pRxBuf; 00318 unsigned char *endData = NULL; 00319 IoT_Error_t rc = IOT_FAILURE; 00320 uint32_t decodedLen = 0; 00321 uint32_t readBytesLen = 0; 00322 MQTTHeader header = {0}; 00323 00324 FUNC_ENTRY; 00325 00326 if(NULL == dup || NULL == qos || NULL == retained || NULL == pPacketId) { 00327 FUNC_EXIT_RC(IOT_FAILURE); 00328 } 00329 00330 /* Publish header size is at least four bytes. 00331 * Fixed header is two bytes. 00332 * Variable header size depends on QoS And Topic Name. 00333 * QoS level 0 doesn't have a message identifier (0 - 2 bytes) 00334 * Topic Name length fields decide size of topic name field (at least 2 bytes) 00335 * MQTT v3.1.1 Specification 3.3.1 */ 00336 if(4 > rxBufLen) { 00337 FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR); 00338 } 00339 00340 header.byte = aws_iot_mqtt_internal_read_char(&curData); 00341 if(PUBLISH != header.bits.type) { 00342 FUNC_EXIT_RC(IOT_FAILURE); 00343 } 00344 00345 *dup = header.bits.dup; 00346 *qos = (QoS) header.bits.qos; 00347 *retained = header.bits.retain; 00348 00349 /* read remaining length */ 00350 rc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curData, &decodedLen, &readBytesLen); 00351 if(IOT_SUCCESS != rc) { 00352 FUNC_EXIT_RC(rc); 00353 return rc; 00354 } 00355 curData += (readBytesLen); 00356 endData = curData + decodedLen; 00357 00358 /* do we have enough data to read the protocol version byte? */ 00359 if(IOT_SUCCESS != _aws_iot_mqtt_read_string_with_len(pTopicName, topicNameLen, &curData, endData) 00360 || (0 > (endData - curData))) { 00361 FUNC_EXIT_RC(IOT_FAILURE); 00362 } 00363 00364 if(QOS0 != *qos) { 00365 *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curData); 00366 } 00367 00368 *payloadLen = (size_t) (endData - curData); 00369 *payload = curData; 00370 00371 FUNC_EXIT_RC(IOT_SUCCESS); 00372 } 00373 00374 /** 00375 * Deserializes the supplied (wire) buffer into an ack 00376 * @param pPacketType returned integer - the MQTT packet type 00377 * @param dup returned integer - the MQTT dup flag 00378 * @param pPacketId returned integer - the MQTT packet identifier 00379 * @param pRxBuf the raw buffer data, of the correct length determined by the remaining length field 00380 * @param rxBuflen the length in bytes of the data in the supplied buffer 00381 * 00382 * @return An IoT Error Type defining successful/failed call 00383 */ 00384 IoT_Error_t aws_iot_mqtt_internal_deserialize_ack(unsigned char *pPacketType, unsigned char *dup, 00385 uint16_t *pPacketId, unsigned char *pRxBuf, 00386 size_t rxBuflen) { 00387 IoT_Error_t rc = IOT_FAILURE; 00388 unsigned char *curdata = pRxBuf; 00389 unsigned char *enddata = NULL; 00390 uint32_t decodedLen = 0; 00391 uint32_t readBytesLen = 0; 00392 MQTTHeader header = {0}; 00393 00394 FUNC_ENTRY; 00395 00396 if(NULL == pPacketType || NULL == dup || NULL == pPacketId || NULL == pRxBuf) { 00397 FUNC_EXIT_RC(NULL_VALUE_ERROR); 00398 } 00399 00400 /* PUBACK fixed header size is two bytes, variable header is 2 bytes, MQTT v3.1.1 Specification 3.4.1 */ 00401 if(4 > rxBuflen) { 00402 FUNC_EXIT_RC(MQTT_RX_BUFFER_TOO_SHORT_ERROR); 00403 } 00404 00405 00406 header.byte = aws_iot_mqtt_internal_read_char(&curdata); 00407 *dup = header.bits.dup; 00408 *pPacketType = header.bits.type; 00409 00410 /* read remaining length */ 00411 rc = aws_iot_mqtt_internal_decode_remaining_length_from_buffer(curdata, &decodedLen, &readBytesLen); 00412 if(IOT_SUCCESS != rc) { 00413 FUNC_EXIT_RC(rc); 00414 } 00415 curdata += (readBytesLen); 00416 enddata = curdata + decodedLen; 00417 00418 if(enddata - curdata < 2) { 00419 FUNC_EXIT_RC(IOT_FAILURE); 00420 } 00421 00422 *pPacketId = aws_iot_mqtt_internal_read_uint16_t(&curdata); 00423 00424 FUNC_EXIT_RC(IOT_SUCCESS); 00425 } 00426 00427 #ifdef __cplusplus 00428 } 00429 #endif
Generated on Tue Jul 12 2022 11:16:37 by
1.7.2
