V.06 11/3

Dependencies:   FT6206 SDFileSystem SPI_TFT_ILI9341 TFT_fonts

Fork of ATT_AWS_IoT_demo by attiot

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.cpp Source File

MQTTClient.cpp

00001 /*******************************************************************************
00002  * Copyright (c) 2014 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *    Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
00015  *******************************************************************************/
00016 
00017 #include "MQTTClient.h"
00018 #include <string.h>
00019 #include "aws_iot_log.h"
00020 
00021 static void MQTTForceDisconnect(Client *c);
00022 
00023 void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) {
00024     md->topicName = aTopicName;
00025     md->message = aMessage;
00026     md->applicationHandler = applicationHandler;
00027 }
00028 
00029 uint16_t getNextPacketId(Client *c) {
00030     return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1));
00031 }
00032 
00033 MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) {
00034     int32_t sentLen = 0;
00035     uint32_t sent = 0, i;
00036 
00037     if(NULL == c || NULL == timer) {
00038         ERROR("...MQTT_NULL_VALUE_ERROR");
00039         return MQTT_NULL_VALUE_ERROR;
00040     }
00041 
00042     if(length >= c->bufSize) {
00043         ERROR("...MQTTPACKET_BUFFER_TOO_SHORT");
00044         return MQTTPACKET_BUFFER_TOO_SHORT;
00045     }
00046     
00047     while(sent < length && !expired(timer)) {
00048         sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer));
00049         if(sentLen < 0) {
00050             /* there was an error writing the data */
00051             ERROR("...there was an error writing the data");
00052             break;
00053         }
00054         sent = sent + (uint32_t)sentLen;
00055     }
00056 
00057     if(sent == length) {
00058         /* record the fact that we have successfully sent the packet */
00059         //countdown(&c->pingTimer, c->keepAliveInterval);
00060         return SUCCESS;
00061     }
00062 
00063     ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length);
00064     return FAILURE;
00065 }
00066 
00067 void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) {
00068     if(NULL == destination || NULL == source) {
00069         return;
00070     }
00071     destination->willFlag = source->willFlag;
00072     destination->MQTTVersion = source->MQTTVersion;
00073     destination->clientID.cstring = source->clientID.cstring;
00074     destination->username.cstring = source->username.cstring;
00075     destination->password.cstring = source->password.cstring;
00076     destination->will.topicName.cstring = source->will.topicName.cstring;
00077     destination->will.message.cstring = source->will.message.cstring;
00078     destination->will.qos = source->will.qos;
00079     destination->will.retained = source->will.retained;
00080     destination->keepAliveInterval = source->keepAliveInterval;
00081     destination->cleansession = source->cleansession;
00082 }
00083 
00084 MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs,
00085                           unsigned char *buf, size_t bufSize, unsigned char *readbuf,
00086                           size_t readBufSize, uint8_t enableAutoReconnect,
00087                           networkInitHandler_t networkInitHandler,
00088                           TLSConnectParams *tlsConnectParams) {
00089     uint32_t i;
00090     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00091 
00092     if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf
00093        || NULL == networkInitHandler) {
00094         return MQTT_NULL_VALUE_ERROR;
00095     }
00096 
00097     for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
00098         c->messageHandlers[i].topicFilter = NULL;
00099         c->messageHandlers[i].fp = NULL;
00100         c->messageHandlers[i].applicationHandler = NULL;
00101         c->messageHandlers[i].qos = (QoS)0;
00102     }
00103 
00104     c->commandTimeoutMs = commandTimeoutMs;
00105     c->buf = buf;
00106     c->bufSize = bufSize;
00107     c->readbuf = readbuf;
00108     c->readBufSize = readBufSize;
00109     c->isConnected = 0;
00110     c->isPingOutstanding = 0;
00111     c->wasManuallyDisconnected = 0;
00112     c->counterNetworkDisconnected = 0;
00113     c->isAutoReconnectEnabled = enableAutoReconnect;
00114     c->defaultMessageHandler = NULL;
00115     c->disconnectHandler = NULL;
00116     copyMQTTConnectData(&(c->options), &default_options);
00117 
00118     c->networkInitHandler = networkInitHandler;
00119     c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort;
00120     c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL;
00121     c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation;
00122     c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation;
00123     c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation;
00124     c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms;
00125     c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag;
00126 
00127     InitTimer(&(c->pingTimer));
00128     InitTimer(&(c->reconnectDelayTimer));
00129 
00130     return SUCCESS;
00131 }
00132 
00133 MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) {
00134     unsigned char i;
00135     uint32_t multiplier = 1;
00136     uint32_t len = 0;
00137     const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00138 
00139     if(NULL == c || NULL == value) {
00140         return MQTT_NULL_VALUE_ERROR;
00141     }
00142 
00143     *value = 0;
00144 
00145     do {
00146         if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
00147             /* bad data */
00148             return MQTTPACKET_READ_ERROR;
00149         }
00150 
00151         if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) {
00152             /* The value argument is the important value. len is just used temporarily
00153              * and never used by the calling function for anything else */
00154             return FAILURE;
00155         }
00156 
00157         *value += ((i & 127) * multiplier);
00158         multiplier *= 128;
00159     }while((i & 128) != 0);
00160 
00161     /* The value argument is the important value. len is just used temporarily
00162      * and never used by the calling function for anything else */
00163     return SUCCESS;
00164 }
00165 
00166 MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) {
00167     MQTTHeader header = {0};
00168     uint32_t len = 0;
00169     uint32_t rem_len = 0;
00170     uint32_t total_bytes_read = 0;
00171     uint32_t bytes_to_be_read = 0;
00172     int32_t ret_val = 0;
00173     MQTTReturnCode rc;
00174 
00175     if(NULL == c || NULL == timer) {
00176         ERROR("readPacket() MQTT_NULL_VALUE_ERROR");
00177         return MQTT_NULL_VALUE_ERROR;
00178     }
00179 
00180     /* 1. read the header byte.  This has the packet type in it */
00181     if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) {
00182         /* If a network disconnect has occurred it would have been caught by keepalive already.
00183          * If nothing is found at this point means there was nothing to read. Not 100% correct,
00184          * but the only way to be sure is to pass proper error codes from the network stack
00185          * which the mbedtls/openssl implementations do not return */
00186         return MQTT_NOTHING_TO_READ;
00187     }
00188 
00189     len = 1;
00190     /* 2. read the remaining length.  This is variable in itself */
00191     rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer));
00192     if(SUCCESS != rc) {
00193         ERROR("readPacket() SUCCESS != rc");
00194         return rc;
00195     }
00196 
00197     /* if the buffer is too short then the message will be dropped silently */
00198     if (rem_len >= c->readBufSize) {
00199         bytes_to_be_read = c->readBufSize;
00200         do {
00201             ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer));
00202             if (ret_val > 0) {
00203                 total_bytes_read += ret_val;
00204                 if((rem_len - total_bytes_read) >= c->readBufSize){
00205                     bytes_to_be_read = c->readBufSize;
00206                 }
00207                 else{
00208                     bytes_to_be_read = rem_len - total_bytes_read;
00209                 }
00210             }
00211         } while (total_bytes_read < rem_len && ret_val > 0);
00212         return MQTTPACKET_BUFFER_TOO_SHORT;
00213     }
00214 
00215     /* put the original remaining length back into the buffer */
00216     len += MQTTPacket_encode(c->readbuf + 1, rem_len);
00217 
00218     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00219     if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) {
00220         ERROR("readPacket() FAILURE");
00221         return FAILURE;
00222     }
00223 
00224     header.byte = c->readbuf[0];
00225     *packet_type = header.bits.type;
00226 
00227     return SUCCESS;
00228 }
00229 
00230 // assume topic filter and name is in correct format
00231 // # can only be at end
00232 // + and # can only be next to separator
00233 char isTopicMatched(char *topicFilter, MQTTString *topicName) {
00234     char *curf = NULL;
00235     char *curn = NULL;
00236     char *curn_end = NULL;
00237 
00238     if(NULL == topicFilter || NULL == topicName) {
00239         return MQTT_NULL_VALUE_ERROR;
00240     }
00241 
00242     curf = topicFilter;
00243     curn = topicName->lenstring.data;
00244     curn_end = curn + topicName->lenstring.len;
00245 
00246     while(*curf && (curn < curn_end)) {
00247         if(*curn == '/' && *curf != '/') {
00248             break;
00249         }
00250         if(*curf != '+' && *curf != '#' && *curf != *curn) {
00251             break;
00252         }
00253         if(*curf == '+') {
00254             /* skip until we meet the next separator, or end of string */
00255             char *nextpos = curn + 1;
00256             while(nextpos < curn_end && *nextpos != '/')
00257                 nextpos = ++curn + 1;
00258         } else if(*curf == '#') {
00259             /* skip until end of string */
00260             curn = curn_end - 1;
00261         }
00262 
00263         curf++;
00264         curn++;
00265     };
00266 
00267     return (curn == curn_end) && (*curf == '\0');
00268 }
00269 
00270 MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) {
00271     uint32_t i;
00272     MessageData md;
00273 
00274     if(NULL == c || NULL == topicName || NULL == message) {
00275         return MQTT_NULL_VALUE_ERROR;
00276     }
00277 
00278     // we have to find the right message handler - indexed by topic
00279     for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
00280         if((c->messageHandlers[i].topicFilter != 0)
00281            && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
00282                 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) {
00283             if(c->messageHandlers[i].fp != NULL) {
00284                 NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler);
00285                 c->messageHandlers[i].fp(&md);
00286                 return SUCCESS;
00287             }
00288         }
00289     }
00290 
00291     if(NULL != c->defaultMessageHandler) {
00292         NewMessageData(&md, topicName, message, NULL);
00293         c->defaultMessageHandler(&md);
00294         return SUCCESS;
00295     }
00296 
00297     /* Message handler not found for topic */
00298     return FAILURE;
00299 }
00300 
00301 MQTTReturnCode handleDisconnect(Client *c) {
00302     MQTTReturnCode rc;
00303 
00304     if(NULL == c) {
00305         return MQTT_NULL_VALUE_ERROR;
00306     }
00307 
00308     rc = MQTTDisconnect(c);
00309     if(rc != SUCCESS){
00310         // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack
00311         MQTTForceDisconnect(c);
00312     }
00313 
00314     if(NULL != c->disconnectHandler) {
00315         c->disconnectHandler();
00316     }
00317 
00318     /* Reset to 0 since this was not a manual disconnect */
00319     c->wasManuallyDisconnected = 0;
00320     return MQTT_NETWORK_DISCONNECTED_ERROR;
00321 }
00322 
00323 MQTTReturnCode MQTTAttemptReconnect(Client *c) {
00324     MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT;
00325 
00326     if(NULL == c) {
00327         return MQTT_NULL_VALUE_ERROR;
00328     }
00329 
00330     if(1 == c->isConnected) {
00331         return MQTT_NETWORK_ALREADY_CONNECTED_ERROR;
00332     }
00333 
00334     /* Ignoring return code. failures expected if network is disconnected */
00335     rc = MQTTConnect(c, NULL);
00336 
00337     /* If still disconnected handle disconnect */
00338     if(0 == c->isConnected) {
00339         return MQTT_ATTEMPTING_RECONNECT;
00340     }
00341 
00342     rc = MQTTResubscribe(c);
00343     if(SUCCESS != rc) {
00344         return rc;
00345     }
00346 
00347     return MQTT_NETWORK_RECONNECTED;
00348 }
00349 
00350 MQTTReturnCode handleReconnect(Client *c) {
00351     int8_t isPhysicalLayerConnected = 1;
00352     MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED;
00353 
00354     if(NULL == c) {
00355         return MQTT_NULL_VALUE_ERROR;
00356     }
00357 
00358     if(!expired(&(c->reconnectDelayTimer))) {
00359         /* Timer has not expired. Not time to attempt reconnect yet.
00360          * Return attempting reconnect */
00361         return MQTT_ATTEMPTING_RECONNECT;
00362     }
00363 
00364     if(NULL != c->networkStack.isConnected) {
00365         isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack));
00366     }
00367 
00368     if(isPhysicalLayerConnected) {
00369         rc = MQTTAttemptReconnect(c);
00370         if(MQTT_NETWORK_RECONNECTED == rc) {
00371             return MQTT_NETWORK_RECONNECTED;
00372         }
00373     }
00374 
00375     c->currentReconnectWaitInterval *= 2;
00376 
00377     if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) {
00378         return MQTT_RECONNECT_TIMED_OUT;
00379     }
00380     countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval);
00381     return rc;
00382 }
00383 
00384 MQTTReturnCode keepalive(Client *c) {
00385     MQTTReturnCode rc = SUCCESS;
00386     Timer timer;
00387     uint32_t serialized_len = 0;
00388 
00389     if(NULL == c) {
00390         return MQTT_NULL_VALUE_ERROR;
00391     }
00392 
00393     if(0 == c->keepAliveInterval) {
00394         return SUCCESS;
00395     }
00396 
00397     if(!expired(&c->pingTimer)) {
00398         return SUCCESS;
00399     }
00400 
00401     if(c->isPingOutstanding) {
00402         return handleDisconnect(c);
00403     }
00404 
00405     /* there is no ping outstanding - send one */
00406     InitTimer(&timer);
00407     countdown_ms(&timer, c->commandTimeoutMs);
00408     rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len);
00409     if(SUCCESS != rc) {
00410         return rc;
00411     }
00412 
00413     /* send the ping packet */
00414     rc = sendPacket(c, serialized_len, &timer);
00415     if(SUCCESS != rc) {
00416         //If sending a PING fails we can no longer determine if we are connected.  In this case we decide we are disconnected and begin reconnection attempts
00417         return handleDisconnect(c);
00418     }
00419 
00420     c->isPingOutstanding = 1;
00421     /* start a timer to wait for PINGRESP from server */
00422     countdown(&c->pingTimer, c->keepAliveInterval / 2);
00423 
00424     return SUCCESS;
00425 }
00426 
00427 MQTTReturnCode handlePublish(Client *c, Timer *timer) {
00428     MQTTString topicName;
00429     MQTTMessage msg;
00430     MQTTReturnCode rc;
00431     uint32_t len = 0;
00432 
00433     rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained,
00434                                  (uint16_t *)&msg.id, &topicName,
00435                                  (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf,
00436                                  c->readBufSize);
00437     if(SUCCESS != rc) {
00438         return rc;
00439     }
00440 
00441     rc = deliverMessage(c, &topicName, &msg);
00442     if(SUCCESS != rc) {
00443         return rc;
00444     }
00445 
00446     if(QOS0 == msg.qos) {
00447         /* No further processing required for QOS0 */
00448         return SUCCESS;
00449     }
00450 
00451     if(QOS1 == msg.qos) {
00452         rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len);
00453     } else { /* Message is not QOS0 or 1 means only option left is QOS2 */
00454         rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len);
00455     }
00456 
00457     if(SUCCESS != rc) {
00458         return rc;
00459     }
00460 
00461     rc = sendPacket(c, len, timer);
00462     if(SUCCESS != rc) {
00463         return rc;
00464     }
00465 
00466     return SUCCESS;
00467 }
00468 
00469 MQTTReturnCode handlePubrec(Client *c, Timer *timer) {
00470     uint16_t packet_id;
00471     unsigned char dup, type;
00472     MQTTReturnCode rc;
00473     uint32_t len;
00474 
00475     rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize);
00476     if(SUCCESS != rc) {
00477         return rc;
00478     }
00479 
00480     rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len);
00481     if(SUCCESS != rc) {
00482         return rc;
00483     }
00484 
00485     /* send the PUBREL packet */
00486     rc = sendPacket(c, len, timer);
00487     if(SUCCESS != rc) {
00488         /* there was a problem */
00489         return rc;
00490     }
00491 
00492     return SUCCESS;
00493 }
00494 
00495 MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) {
00496     MQTTReturnCode rc;
00497     if(NULL == c || NULL == timer) {
00498         ERROR("cycle() MQTT_NULL_VALUE_ERROR");
00499         return MQTT_NULL_VALUE_ERROR;
00500     }
00501 
00502     /* read the socket, see what work is due */
00503     rc = readPacket(c, timer, packet_type);
00504     if(MQTT_NOTHING_TO_READ == rc) {
00505         /* Nothing to read, not a cycle failure */
00506         return SUCCESS;
00507     }
00508     if(SUCCESS != rc) {
00509         ERROR("cycle() SUCCESS != rc");
00510         return rc;
00511     }
00512 
00513     switch(*packet_type) {
00514         case CONNACK:
00515         case PUBACK:
00516         case SUBACK:
00517         case UNSUBACK:
00518             break;
00519         case PUBLISH: {
00520             rc = handlePublish(c, timer);
00521             break;
00522         }
00523         case PUBREC: {
00524             rc = handlePubrec(c, timer);
00525             break;
00526         }
00527         case PUBCOMP:
00528             break;
00529         case PINGRESP: {
00530             c->isPingOutstanding = 0;
00531             countdown(&c->pingTimer, c->keepAliveInterval);
00532             break;
00533         }
00534         default: {
00535             /* Either unknown packet type or Failure occurred
00536              * Should not happen */
00537             ERROR("cycle() Either unknown packet type or Failure occurred");
00538             return MQTT_BUFFER_RX_MESSAGE_INVALID;
00539             break;
00540         }
00541     }
00542 
00543     return rc;
00544 }
00545 
00546 MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) {
00547     MQTTReturnCode rc = SUCCESS;
00548     Timer timer;
00549     uint8_t packet_type;
00550 
00551     if(NULL == c) {
00552         ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR");
00553         return MQTT_NULL_VALUE_ERROR;
00554     }
00555 
00556     /* Check if network was manually disconnected */
00557     if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) {
00558         ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED");
00559         return MQTT_NETWORK_MANUALLY_DISCONNECTED;
00560     }
00561 
00562     /* Check if network is disconnected and auto-reconnect is not enabled */
00563     if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) {
00564         ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR");
00565         return MQTT_NETWORK_DISCONNECTED_ERROR;
00566     }
00567 
00568     InitTimer(&timer);
00569     countdown_ms(&timer, timeout_ms);
00570 
00571     while(!expired(&timer)) {
00572         if(0 == c->isConnected) {
00573             if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) {
00574                 rc = MQTT_RECONNECT_TIMED_OUT;
00575                 ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT");
00576                 break;
00577             }
00578             
00579             rc = handleReconnect(c);
00580             /* Network reconnect attempted, check if yield timer expired before
00581              * doing anything else */
00582             continue;
00583         }
00584 
00585         rc = cycle(c, &timer, &packet_type);
00586         if(SUCCESS != rc) {
00587             ERROR("MQTTYield() SUCCESS != rc");
00588             break;
00589         }
00590 
00591         rc = keepalive(c);
00592         if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) {
00593             c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL;
00594             countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval);
00595             c->counterNetworkDisconnected++;
00596             /* Depending on timer values, it is possible that yield timer has expired
00597              * Set to rc to attempting reconnect to inform client that autoreconnect
00598              * attempt has started */
00599             INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT");
00600             rc = MQTT_ATTEMPTING_RECONNECT;
00601         } else if(SUCCESS != rc) {
00602             ERROR("MQTTYield() SUCCESS != rc");
00603             break;
00604         }
00605     }
00606 
00607     return rc;
00608 }
00609 
00610 /* only used in single-threaded mode where one command at a time is in process */
00611 MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) {
00612     MQTTReturnCode rc = FAILURE;
00613     uint8_t read_packet_type = 0, retry = 2;
00614     
00615     
00616     
00617     if(NULL == c || NULL == timer) {
00618         ERROR("waitfor() MQTT_NULL_VALUE_ERROR");
00619         return MQTT_NULL_VALUE_ERROR;
00620     }
00621 
00622     do {      
00623         if(expired(timer)) {   
00624             /* we timed out */
00625             ERROR("waitfor() timer expired");
00626             break;
00627                 
00628             //countdown_ms(timer, 10);    
00629         }
00630              
00631         rc = cycle(c, timer, &read_packet_type);
00632     }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc  && read_packet_type != packet_type);
00633 
00634     if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) {
00635         ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR");
00636         return FAILURE;
00637     }
00638 
00639     /* Something failed or we didn't receive the expected packet, return error code */
00640     return rc;
00641 }
00642 
00643 MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) {
00644     Timer connect_timer;
00645     MQTTReturnCode connack_rc = FAILURE;
00646     char sessionPresent = 0;
00647     uint32_t len = 0;
00648     MQTTReturnCode rc = FAILURE;
00649 
00650     if(NULL == c) {
00651         return MQTT_NULL_VALUE_ERROR;
00652     }
00653 
00654     DEBUG("...connect_timer");
00655     InitTimer(&connect_timer);
00656     countdown_ms(&connect_timer, c->commandTimeoutMs);
00657 
00658     if(c->isConnected) {
00659         /* Don't send connect packet again if we are already connected */
00660         ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR");
00661         return MQTT_NETWORK_ALREADY_CONNECTED_ERROR;
00662     }
00663 
00664     if(NULL != options) {
00665         /* override default options if new options were supplied */
00666         copyMQTTConnectData(&(c->options), options);
00667     }
00668 
00669     DEBUG("...TLS Connect");   
00670     c->networkInitHandler(&(c->networkStack));
00671     rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams);
00672     if(0 != rc) {
00673         /* TLS Connect failed, return error */
00674         ERROR("...TLS Connect failed, return error");
00675         return FAILURE;
00676     }
00677 
00678     c->keepAliveInterval = c->options.keepAliveInterval;
00679     rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len);
00680     if(SUCCESS != rc || 0 >= len) {
00681         ERROR("...MQTTSerialize_connect FAIL");
00682         return FAILURE;
00683     }
00684 
00685     /* send the connect packet */
00686     rc = sendPacket(c, len, &connect_timer);
00687     if(SUCCESS != rc) {
00688         ERROR("...sendPacket FAIL");
00689         return rc;
00690     }
00691 
00692     /* this will be a blocking call, wait for the CONNACK */
00693     rc = waitfor(c, CONNACK, &connect_timer);
00694     if(SUCCESS != rc) {
00695         ERROR("...waitfor FAIL");
00696         return rc;
00697     }
00698 
00699     /* Received CONNACK, check the return code */
00700     rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize);
00701     if(SUCCESS != rc) {
00702         ERROR("...MQTTDeserialize_connack FAIL");
00703         return rc;
00704     }
00705 
00706     if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) {
00707         ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL");
00708         return connack_rc;
00709     }
00710 
00711     DEBUG("...isConnected");
00712     c->isConnected = 1;
00713     c->wasManuallyDisconnected = 0;
00714     c->isPingOutstanding = 0;
00715     countdown(&c->pingTimer, c->keepAliveInterval);
00716 
00717     return SUCCESS;
00718 }
00719 
00720 /* Return MAX_MESSAGE_HANDLERS value if no free index is available */
00721 uint32_t GetFreeMessageHandlerIndex(Client *c) {
00722     uint32_t itr;
00723     for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) {
00724         if(c->messageHandlers[itr].topicFilter == NULL) {
00725             break;
00726         }
00727     }
00728 
00729     return itr;
00730 }
00731 
00732 MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos,
00733                   messageHandler messageHandler, pApplicationHandler_t applicationHandler) {
00734     MQTTReturnCode rc = FAILURE;
00735     Timer timer;
00736     uint32_t len = 0;
00737     uint32_t indexOfFreeMessageHandler;
00738     uint32_t count = 0;
00739     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00740     uint16_t packetId;
00741     MQTTString topic = MQTTString_initializer;
00742 
00743     if(NULL == c || NULL == topicFilter
00744        || NULL == messageHandler || NULL == applicationHandler) {
00745         ERROR("...MQTT_NULL_VALUE_ERROR FAIL");
00746         return MQTT_NULL_VALUE_ERROR;
00747     }
00748 
00749     if(!c->isConnected) {
00750         ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL");
00751         return MQTT_NETWORK_DISCONNECTED_ERROR;
00752     }
00753 
00754     topic.cstring = (char *)topicFilter;
00755 
00756     InitTimer(&timer);
00757     countdown_ms(&timer, c->commandTimeoutMs);
00758 
00759     rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len);
00760     if(SUCCESS != rc) {
00761         ERROR("...MQTTSerialize_subscribe FAIL");
00762         return rc;
00763     }
00764 
00765     indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c);
00766     if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) {
00767         ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL");
00768         return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR;
00769     }
00770 
00771     /* send the subscribe packet */
00772     rc = sendPacket(c, len, &timer);
00773     if(SUCCESS != rc) {
00774         ERROR("...send the subscribe packet FAIL");
00775         return rc;
00776     }
00777 
00778     /* wait for suback */
00779     rc = waitfor(c, SUBACK, &timer);
00780     if(SUCCESS != rc) {
00781         ERROR("...wait for suback FAIL");
00782         return rc;
00783     }
00784 
00785     /* Granted QoS can be 0, 1 or 2 */
00786     rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize);
00787     if(SUCCESS != rc) {
00788         ERROR("...Granted QoS can be 0, 1 or 2");
00789         return rc;
00790     }
00791 
00792     c->messageHandlers[indexOfFreeMessageHandler].topicFilter =
00793             topicFilter;
00794     c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler;
00795     c->messageHandlers[indexOfFreeMessageHandler].applicationHandler =
00796             applicationHandler;
00797     c->messageHandlers[indexOfFreeMessageHandler].qos = qos;
00798 
00799     DEBUG("...MQTTSubscribe SUCCESS");
00800     return SUCCESS;
00801 }
00802 
00803 MQTTReturnCode MQTTResubscribe(Client *c) {
00804     MQTTReturnCode rc = FAILURE;
00805     Timer timer;
00806     uint32_t len = 0;
00807     uint32_t count = 0;
00808     QoS grantedQoS[3] = {QOS0, QOS0, QOS0};
00809     uint16_t packetId;
00810     uint32_t existingSubCount = 0;
00811     uint32_t itr = 0;
00812 
00813     if(NULL == c) {
00814         return MQTT_NULL_VALUE_ERROR;
00815     }
00816 
00817     if(!c->isConnected) {
00818         return MQTT_NETWORK_DISCONNECTED_ERROR;
00819     }
00820 
00821     existingSubCount = GetFreeMessageHandlerIndex(c);
00822 
00823     for(itr = 0; itr < existingSubCount; itr++) {
00824         MQTTString topic = MQTTString_initializer;
00825         topic.cstring = (char *)c->messageHandlers[itr].topicFilter;
00826 
00827         InitTimer(&timer);
00828         countdown_ms(&timer, c->commandTimeoutMs);
00829 
00830         rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1,
00831                                      &topic, &(c->messageHandlers[itr].qos), &len);
00832         if(SUCCESS != rc) {
00833             return rc;
00834         }
00835 
00836         /* send the subscribe packet */
00837         rc = sendPacket(c, len, &timer);
00838         if(SUCCESS != rc) {
00839             return rc;
00840         }
00841 
00842         /* wait for suback */
00843         rc = waitfor(c, SUBACK, &timer);
00844         if(SUCCESS != rc) {
00845             return rc;
00846         }
00847 
00848         /* Granted QoS can be 0, 1 or 2 */
00849         rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize);
00850         if(SUCCESS != rc) {
00851             return rc;
00852         }
00853     }
00854 
00855     return SUCCESS;
00856 }
00857 
00858 MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) {
00859     MQTTReturnCode rc = FAILURE;
00860     Timer timer;
00861     MQTTString topic = MQTTString_initializer;
00862     uint32_t len = 0;
00863     uint32_t i = 0;
00864     uint16_t packet_id;
00865 
00866     if(NULL == c || NULL == topicFilter) {
00867         return MQTT_NULL_VALUE_ERROR;
00868     }
00869 
00870     topic.cstring = (char *)topicFilter;
00871 
00872     if(!c->isConnected) {
00873         return MQTT_NETWORK_DISCONNECTED_ERROR;
00874     }
00875 
00876     InitTimer(&timer);
00877     countdown_ms(&timer, c->commandTimeoutMs);
00878 
00879     rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len);
00880     if(SUCCESS != rc) {
00881         return rc;
00882     }
00883 
00884     /* send the unsubscribe packet */
00885     rc = sendPacket(c, len, &timer);
00886     if(SUCCESS != rc) {
00887         return rc;
00888     }
00889 
00890     rc = waitfor(c, UNSUBACK, &timer);
00891     if(SUCCESS != rc) {
00892         return rc;
00893     }
00894 
00895     rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize);
00896     if(SUCCESS != rc) {
00897         return rc;
00898     }
00899 
00900     /* Remove from message handler array */
00901     for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) {
00902         if(c->messageHandlers[i].topicFilter != NULL &&
00903             (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) {
00904             c->messageHandlers[i].topicFilter = NULL;
00905             /* We don't want to break here, if the same topic is registered
00906              * with 2 callbacks. Unlikely scenario */
00907         }
00908     }
00909 
00910     return SUCCESS;
00911 }
00912 
00913 MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) {
00914     Timer timer;
00915     MQTTString topic = MQTTString_initializer;
00916     uint32_t len = 0;
00917     uint8_t waitForAck = 0;
00918     uint8_t packetType = PUBACK;
00919     uint16_t packet_id;
00920     unsigned char dup, type;
00921     MQTTReturnCode rc = FAILURE;
00922 
00923     if(NULL == c || NULL == topicName || NULL == message) {
00924         return MQTT_NULL_VALUE_ERROR;
00925     }
00926 
00927     topic.cstring = (char *)topicName;
00928 
00929     if(!c->isConnected) {
00930         return MQTT_NETWORK_DISCONNECTED_ERROR;
00931     }
00932 
00933     InitTimer(&timer);
00934     countdown_ms(&timer, c->commandTimeoutMs);
00935 
00936     if(QOS1 == message->qos || QOS2 == message->qos) {
00937         message->id = getNextPacketId(c);
00938         waitForAck = 1;
00939         if(QOS2 == message->qos) {
00940             packetType = PUBCOMP;
00941         }
00942     }
00943 
00944     rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id,
00945               topic, (unsigned char*)message->payload, message->payloadlen, &len);
00946     if(SUCCESS != rc) {
00947         return rc;
00948     }
00949 
00950     /* send the publish packet */
00951     rc = sendPacket(c, len, &timer);
00952     if(SUCCESS != rc) {
00953         return rc;
00954     }
00955 
00956     /* Wait for ack if QoS1 or QoS2 */
00957     if(1 == waitForAck) {
00958         rc = waitfor(c, packetType, &timer);
00959         if(SUCCESS != rc) {
00960             return rc;
00961         }
00962 
00963         rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize);
00964         if(SUCCESS != rc) {
00965             return rc;
00966         }
00967     }
00968 
00969     return SUCCESS;
00970 }
00971 /**
00972  * This is for the case when the sendPacket Fails.
00973  */
00974 static void MQTTForceDisconnect(Client *c){
00975     c->isConnected = 0;
00976     c->networkStack.disconnect(&(c->networkStack));
00977     c->networkStack.destroy(&(c->networkStack));
00978 }
00979 
00980 MQTTReturnCode MQTTDisconnect(Client *c) {
00981     MQTTReturnCode rc = FAILURE;
00982     /* We might wait for incomplete incoming publishes to complete */
00983     Timer timer;
00984     uint32_t serialized_len = 0;
00985 
00986     if(NULL == c) {
00987         return MQTT_NULL_VALUE_ERROR;
00988     }
00989 
00990     if(0 == c->isConnected) {
00991         /* Network is already disconnected. Do nothing */
00992         return MQTT_NETWORK_DISCONNECTED_ERROR;
00993     }
00994 
00995     rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len);
00996     if(SUCCESS != rc) {
00997         return rc;
00998     }
00999 
01000     InitTimer(&timer);
01001     countdown_ms(&timer, c->commandTimeoutMs);
01002 
01003     /* send the disconnect packet */
01004     if(serialized_len > 0) {
01005         rc = sendPacket(c, serialized_len, &timer);
01006         if(SUCCESS != rc) {
01007             return rc;
01008         }
01009     }
01010 
01011     /* Clean network stack */
01012     c->networkStack.disconnect(&(c->networkStack));
01013     rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack));
01014     if(0 != rc) {
01015         /* TLS Destroy failed, return error */
01016         return FAILURE;
01017     }
01018 
01019     c->isConnected = 0;
01020 
01021     /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */
01022     c->wasManuallyDisconnected = 1;
01023 
01024     return SUCCESS;
01025 }
01026 
01027 uint8_t MQTTIsConnected(Client *c) {
01028     if(NULL == c) {
01029         return 0;
01030     }
01031 
01032     return c->isConnected;
01033 }
01034 
01035 uint8_t MQTTIsAutoReconnectEnabled(Client *c) {
01036     if(NULL == c) {
01037         return 0;
01038     }
01039 
01040     return c->isAutoReconnectEnabled;
01041 }
01042 
01043 MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) {
01044     if(NULL == c || NULL == disconnectHandler) {
01045         return MQTT_NULL_VALUE_ERROR;
01046     }
01047 
01048     c->disconnectHandler = disconnectHandler;
01049     return SUCCESS;
01050 }
01051 
01052 MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) {
01053     if(NULL == c) {
01054         return FAILURE;
01055     }
01056     c->isAutoReconnectEnabled = value;
01057     return SUCCESS;
01058 }
01059 
01060 uint32_t MQTTGetNetworkDisconnectedCount(Client *c) {
01061     return c->counterNetworkDisconnected;
01062 }
01063 
01064 void MQTTResetNetworkDisconnectedCount(Client *c) {
01065     c->counterNetworkDisconnected = 0;
01066 }
01067 
01068