Demo application for using the AT&T IoT Starter Kit Powered by AWS.

Dependencies:   SDFileSystem

Fork of ATT_AWS_IoT_demo by Anthony Phillips

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSubscribeClient.cpp Source File

MQTTSubscribeClient.cpp

00001 /*******************************************************************************
00002  * Copyright (c) 2014 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Ian Craggs - initial API and implementation and/or initial documentation
00015  *******************************************************************************/
00016 
00017 #include "MQTTPacket.h"
00018 #include "StackTrace.h"
00019 
00020 #include <string.h>
00021 
00022 /**
00023   * Determines the length of the MQTT subscribe packet that would be produced using the supplied parameters
00024   * @param count the number of topic filter strings in topicFilters
00025   * @param topicFilters the array of topic filter strings to be used in the publish
00026   * @return the length of buffer needed to contain the serialized version of the packet
00027   */
00028 size_t MQTTSerialize_GetSubscribePacketLength(uint32_t count, MQTTString topicFilters[]) {
00029     size_t i;
00030     size_t len = 2; /* packetid */
00031 
00032     for(i = 0; i < count; ++i) {
00033         len += 2 + MQTTstrlen(topicFilters[i]) + 1; /* length + topic + req_qos */
00034     }
00035 
00036     return len;
00037 }
00038 
00039 /**
00040   * Serializes the supplied subscribe data into the supplied buffer, ready for sending
00041   * @param buf the buffer into which the packet will be serialized
00042   * @param buflen the length in bytes of the supplied bufferr
00043   * @param dup integer - the MQTT dup flag
00044   * @param packetid integer - the MQTT packet identifier
00045   * @param count - number of members in the topicFilters and reqQos arrays
00046   * @param topicFilters - array of topic filter names
00047   * @param requestedQoSs - array of requested QoS
00048   * @return the length of the serialized data.  <= 0 indicates error
00049   */
00050 MQTTReturnCode MQTTSerialize_subscribe(unsigned char *buf, size_t buflen,
00051                                        unsigned char dup, uint16_t packetid, uint32_t count,
00052                                        MQTTString topicFilters[], QoS requestedQoSs[],
00053                                        uint32_t *serialized_len) {
00054         unsigned char *ptr = buf;
00055         MQTTHeader header = {0};
00056         size_t rem_len = 0;
00057         uint32_t i = 0;
00058         MQTTReturnCode rc = MQTTPacket_InitHeader(&header, SUBSCRIBE, (QoS)1, dup, 0);
00059     FUNC_ENTRY;
00060     if(NULL == buf || NULL == serialized_len) {
00061         FUNC_EXIT_RC(MQTT_NULL_VALUE_ERROR);
00062         return MQTT_NULL_VALUE_ERROR;
00063     }
00064 
00065     if(MQTTPacket_len(rem_len = MQTTSerialize_GetSubscribePacketLength(count, topicFilters)) > buflen) {
00066         FUNC_EXIT_RC(MQTTPACKET_BUFFER_TOO_SHORT);
00067         return MQTTPACKET_BUFFER_TOO_SHORT;
00068     }
00069 
00070     if(SUCCESS != rc) {
00071         FUNC_EXIT_RC(rc);
00072         return rc;
00073     }
00074     /* write header */
00075     writeChar(&ptr, header.byte);
00076 
00077     /* write remaining length */
00078     ptr += MQTTPacket_encode(ptr, rem_len);
00079 
00080     writePacketId(&ptr, packetid);
00081 
00082     for(i = 0; i < count; ++i) {
00083         writeMQTTString(&ptr, topicFilters[i]);
00084         writeChar(&ptr, (unsigned char)requestedQoSs[i]);
00085     }
00086 
00087     *serialized_len = (uint32_t)(ptr - buf);
00088 
00089     FUNC_EXIT_RC(SUCCESS);
00090     return SUCCESS;
00091 }
00092 
00093 /**
00094   * Deserializes the supplied (wire) buffer into suback data
00095   * @param packetid returned integer - the MQTT packet identifier
00096   * @param maxcount - the maximum number of members allowed in the grantedQoSs array
00097   * @param count returned integer - number of members in the grantedQoSs array
00098   * @param grantedQoSs returned array of integers - the granted qualities of service
00099   * @param buf the raw buffer data, of the correct length determined by the remaining length field
00100   * @param buflen the length in bytes of the data in the supplied buffer
00101   * @return error code.  1 is success, 0 is failure
00102   */
00103 MQTTReturnCode MQTTDeserialize_suback(uint16_t *packetid, uint32_t maxcount,
00104                                       uint32_t *count, QoS grantedQoSs[],
00105                                       unsigned char *buf, size_t buflen) {
00106         MQTTHeader header = {0};
00107         unsigned char *curdata = buf;
00108         unsigned char *enddata = NULL;
00109         MQTTReturnCode decodeRc = FAILURE;
00110         uint32_t decodedLen = 0;
00111         uint32_t readBytesLen = 0;
00112 
00113     FUNC_ENTRY;
00114     if(NULL == packetid || NULL == count || NULL == grantedQoSs) {
00115         FUNC_EXIT_RC(MQTT_NULL_VALUE_ERROR);
00116         return MQTT_NULL_VALUE_ERROR;
00117     }
00118 
00119     /* SUBACK header size is 4 bytes for header and at least one byte for QoS payload
00120      * Need at least a 5 bytes buffer. MQTT3.1.1 specification 3.9
00121      */
00122     if(5 > buflen) {
00123         FUNC_EXIT_RC(MQTTPACKET_BUFFER_TOO_SHORT);
00124         return MQTTPACKET_BUFFER_TOO_SHORT;
00125     }
00126 
00127     header.byte = readChar(&curdata);
00128     if (header.bits.type != SUBACK) {
00129         FUNC_EXIT_RC(FAILURE);
00130         return FAILURE;
00131     }
00132 
00133     /* read remaining length */
00134     decodeRc = MQTTPacket_decodeBuf(curdata, &decodedLen, &readBytesLen);
00135     if(decodeRc != SUCCESS) {
00136         return decodeRc;
00137     }
00138 
00139     curdata += (readBytesLen);
00140     enddata = curdata + decodedLen;
00141     if (enddata - curdata < 2) {
00142         FUNC_EXIT_RC(FAILURE);
00143         return FAILURE;
00144     }
00145 
00146     *packetid = readPacketId(&curdata);
00147 
00148     *count = 0;
00149     while(curdata < enddata) {
00150         if(*count > maxcount) {
00151             FUNC_EXIT_RC(FAILURE);
00152             return FAILURE;
00153         }
00154         grantedQoSs[(*count)++] = (QoS)readChar(&curdata);
00155     }
00156 
00157     FUNC_EXIT_RC(SUCCESS);
00158     return SUCCESS;
00159 }
00160 
00161