Jack Hansdampf / mbed-mqtt-GSOE1

Dependents:   ESP8266MQTT

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTClient.c Source File

MQTTClient.c

00001 /*******************************************************************************
00002  * Copyright (c) 2014, 2017 IBM Corp.
00003  *
00004  * All rights reserved. This program and the accompanying materials
00005  * are made available under the terms of the Eclipse Public License v1.0
00006  * and Eclipse Distribution License v1.0 which accompany this distribution.
00007  *
00008  * The Eclipse Public License is available at
00009  *    http://www.eclipse.org/legal/epl-v10.html
00010  * and the Eclipse Distribution License is available at
00011  *   http://www.eclipse.org/org/documents/edl-v10.php.
00012  *
00013  * Contributors:
00014  *   Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
00015  *   Ian Craggs - fix for #96 - check rem_len in readPacket
00016  *   Ian Craggs - add ability to set message handler separately #6
00017  *******************************************************************************/
00018 #include "MQTTClient.h"
00019 
00020 #include <stdio.h>
00021 #include <string.h>
00022 
00023 static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
00024     md->topicName = aTopicName;
00025     md->message = aMessage;
00026 }
00027 
00028 
00029 static int getNextPacketId(MQTTClient *c) {
00030     return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
00031 }
00032 
00033 
00034 static int sendPacket(MQTTClient* c, int length, Timer* timer)
00035 {
00036     int rc = FAILURE,
00037         sent = 0;
00038 
00039     while (sent < length && !TimerIsExpired(timer))
00040     {
00041         rc = c->ipstack->mqttwrite(c->ipstack, &c->buf[sent], length, TimerLeftMS(timer));
00042         if (rc < 0)  // there was an error writing the data
00043             break;
00044         sent += rc;
00045     }
00046     if (sent == length)
00047     {
00048         TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
00049         rc = SUCCESS;
00050     }
00051     else
00052         rc = FAILURE;
00053     return rc;
00054 }
00055 
00056 
00057 void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
00058         unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
00059 {
00060     int i;
00061     c->ipstack = network;
00062 
00063     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00064         c->messageHandlers[i].topicFilter = 0;
00065     c->command_timeout_ms = command_timeout_ms;
00066     c->buf = sendbuf;
00067     c->buf_size = sendbuf_size;
00068     c->readbuf = readbuf;
00069     c->readbuf_size = readbuf_size;
00070     c->isconnected = 0;
00071     c->cleansession = 0;
00072     c->ping_outstanding = 0;
00073     c->defaultMessageHandler = NULL;
00074       c->next_packetid = 1;
00075     TimerInit(&c->last_sent);
00076     TimerInit(&c->last_received);
00077 #if defined(MQTT_TASK)
00078       MutexInit(&c->mutex);
00079 #endif
00080 }
00081 
00082 
00083 static int decodePacket(MQTTClient* c, int* value, int timeout)
00084 {
00085     unsigned char i;
00086     int multiplier = 1;
00087     int len = 0;
00088     const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
00089 
00090     *value = 0;
00091     do
00092     {
00093         int rc = MQTTPACKET_READ_ERROR;
00094 
00095         if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
00096         {
00097             rc = MQTTPACKET_READ_ERROR; /* bad data */
00098             goto exit;
00099         }
00100         rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
00101         if (rc != 1)
00102             goto exit;
00103         *value += (i & 127) * multiplier;
00104         multiplier *= 128;
00105     } while ((i & 128) != 0);
00106 exit:
00107     return len;
00108 }
00109 
00110 
00111 static int readPacket(MQTTClient* c, Timer* timer)
00112 {
00113     MQTTHeader header = {0};
00114     int len = 0;
00115     int rem_len = 0;
00116 
00117     /* 1. read the header byte.  This has the packet type in it */
00118     int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
00119     if (rc != 1)
00120         goto exit;
00121 
00122     len = 1;
00123     /* 2. read the remaining length.  This is variable in itself */
00124     decodePacket(c, &rem_len, TimerLeftMS(timer));
00125     len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
00126 
00127     if (rem_len > (c->readbuf_size - len))
00128     {
00129         rc = BUFFER_OVERFLOW;
00130         goto exit;
00131     }
00132 
00133     /* 3. read the rest of the buffer using a callback to supply the rest of the data */
00134     if (rem_len > 0 && (rc = c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
00135         rc = 0;
00136         goto exit;
00137     }
00138 
00139     header.byte = c->readbuf[0];
00140     rc = header.bits.type;
00141     if (c->keepAliveInterval > 0)
00142         TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
00143 exit:
00144     return rc;
00145 }
00146 
00147 
00148 // assume topic filter and name is in correct format
00149 // # can only be at end
00150 // + and # can only be next to separator
00151 static char isTopicMatched(char* topicFilter, MQTTString* topicName)
00152 {
00153     char* curf = topicFilter;
00154     char* curn = topicName->lenstring.data;
00155     char* curn_end = curn + topicName->lenstring.len;
00156 
00157     while (*curf && curn < curn_end)
00158     {
00159         if (*curn == '/' && *curf != '/')
00160             break;
00161         if (*curf != '+' && *curf != '#' && *curf != *curn)
00162             break;
00163         if (*curf == '+')
00164         {   // skip until we meet the next separator, or end of string
00165             char* nextpos = curn + 1;
00166             while (nextpos < curn_end && *nextpos != '/')
00167                 nextpos = ++curn + 1;
00168         }
00169         else if (*curf == '#')
00170             curn = curn_end - 1;    // skip until end of string
00171         curf++;
00172         curn++;
00173     };
00174 
00175     return (curn == curn_end) && (*curf == '\0');
00176 }
00177 
00178 
00179 int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
00180 {
00181     int i;
00182     int rc = FAILURE;
00183 
00184     // we have to find the right message handler - indexed by topic
00185     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00186     {
00187         if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
00188                 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
00189         {
00190             if (c->messageHandlers[i].fp != NULL)
00191             {
00192                 MessageData md;
00193                 NewMessageData(&md, topicName, message);
00194                 c->messageHandlers[i].fp(&md);
00195                 rc = SUCCESS;
00196             }
00197         }
00198     }
00199 
00200     if (rc == FAILURE && c->defaultMessageHandler != NULL)
00201     {
00202         MessageData md;
00203         NewMessageData(&md, topicName, message);
00204         c->defaultMessageHandler(&md);
00205         rc = SUCCESS;
00206     }
00207 
00208     return rc;
00209 }
00210 
00211 
00212 int keepalive(MQTTClient* c)
00213 {
00214     int rc = SUCCESS;
00215 
00216     if (c->keepAliveInterval == 0)
00217         goto exit;
00218 
00219     if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
00220     {
00221         if (c->ping_outstanding)
00222             rc = FAILURE; /* PINGRESP not received in keepalive interval */
00223         else
00224         {
00225             Timer timer;
00226             TimerInit(&timer);
00227             TimerCountdownMS(&timer, 1000);
00228             int len = MQTTSerialize_pingreq(c->buf, c->buf_size);
00229             if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
00230                 c->ping_outstanding = 1;
00231         }
00232     }
00233 
00234 exit:
00235     return rc;
00236 }
00237 
00238 
00239 void MQTTCleanSession(MQTTClient* c)
00240 {
00241     int i = 0;
00242 
00243     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00244         c->messageHandlers[i].topicFilter = NULL;
00245 }
00246 
00247 
00248 void MQTTCloseSession(MQTTClient* c)
00249 {
00250     c->ping_outstanding = 0;
00251     c->isconnected = 0;
00252     if (c->cleansession)
00253         MQTTCleanSession(c);
00254 }
00255 
00256 
00257 int cycle(MQTTClient* c, Timer* timer)
00258 {
00259     int len = 0,
00260         rc = SUCCESS;
00261 
00262     int packet_type = readPacket(c, timer);     /* read the socket, see what work is due */
00263 
00264     switch (packet_type)
00265     {
00266         default:
00267             /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
00268             rc = packet_type;
00269             goto exit;
00270         case 0: /* timed out reading packet */
00271             break;
00272         case CONNACK:
00273         case PUBACK:
00274         case SUBACK:
00275         case UNSUBACK:
00276             break;
00277         case PUBLISH:
00278         {
00279             MQTTString topicName;
00280             MQTTMessage msg;
00281             int intQoS;
00282             msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
00283             if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
00284                (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
00285                 goto exit;
00286             msg.qos = (enum QoS)intQoS;
00287             deliverMessage(c, &topicName, &msg);
00288             if (msg.qos != QOS0)
00289             {
00290                 if (msg.qos == QOS1)
00291                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id);
00292                 else if (msg.qos == QOS2)
00293                     len = MQTTSerialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id);
00294                 if (len <= 0)
00295                     rc = FAILURE;
00296                 else
00297                     rc = sendPacket(c, len, timer);
00298                 if (rc == FAILURE)
00299                     goto exit; // there was a problem
00300             }
00301             break;
00302         }
00303         case PUBREC:
00304         case PUBREL:
00305         {
00306             unsigned short mypacketid;
00307             unsigned char dup, type;
00308             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
00309                 rc = FAILURE;
00310             else if ((len = MQTTSerialize_ack(c->buf, c->buf_size,
00311                 (packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
00312                 rc = FAILURE;
00313             else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
00314                 rc = FAILURE; // there was a problem
00315             if (rc == FAILURE)
00316                 goto exit; // there was a problem
00317             break;
00318         }
00319 
00320         case PUBCOMP:
00321             break;
00322         case PINGRESP:
00323             c->ping_outstanding = 0;
00324             break;
00325     }
00326 
00327     if (keepalive(c) != SUCCESS) {
00328         //check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
00329         rc = FAILURE;
00330     }
00331 
00332 exit:
00333     if (rc == SUCCESS)
00334         rc = packet_type;
00335     else if (c->isconnected)
00336         MQTTCloseSession(c);
00337     return rc;
00338 }
00339 
00340 
00341 int MQTTYield(MQTTClient* c, int timeout_ms)
00342 {
00343     int rc = SUCCESS;
00344     Timer timer;
00345 
00346     TimerInit(&timer);
00347     TimerCountdownMS(&timer, timeout_ms);
00348 
00349       do
00350     {
00351         if (cycle(c, &timer) < 0)
00352         {
00353             rc = FAILURE;
00354             break;
00355         }
00356     } while (!TimerIsExpired(&timer));
00357 
00358     return rc;
00359 }
00360 
00361 int MQTTIsConnected(MQTTClient* client)
00362 {
00363   return client->isconnected;
00364 }
00365 
00366 void MQTTRun(void* parm)
00367 {
00368     Timer timer;
00369     MQTTClient* c = (MQTTClient*)parm;
00370 
00371     TimerInit(&timer);
00372 
00373     while (1)
00374     {
00375 #if defined(MQTT_TASK)
00376         MutexLock(&c->mutex);
00377 #endif
00378         TimerCountdownMS(&timer, 500); /* Don't wait too long if no traffic is incoming */
00379         cycle(c, &timer);
00380 #if defined(MQTT_TASK)
00381         MutexUnlock(&c->mutex);
00382 #endif
00383     }
00384 }
00385 
00386 
00387 #if defined(MQTT_TASK)
00388 int MQTTStartTask(MQTTClient* client)
00389 {
00390     return ThreadStart(&client->thread, &MQTTRun, client);
00391 }
00392 #endif
00393 
00394 
00395 int waitfor(MQTTClient* c, int packet_type, Timer* timer)
00396 {
00397     int rc = FAILURE;
00398 
00399     do
00400     {
00401         if (TimerIsExpired(timer))
00402             break; // we timed out
00403         rc = cycle(c, timer);
00404     }
00405     while (rc != packet_type && rc >= 0);
00406 
00407     return rc;
00408 }
00409 
00410 
00411 
00412 
00413 int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
00414 {
00415     Timer connect_timer;
00416     int rc = FAILURE;
00417     MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
00418     int len = 0;
00419 
00420 #if defined(MQTT_TASK)
00421       MutexLock(&c->mutex);
00422 #endif
00423       if (c->isconnected) /* don't send connect packet again if we are already connected */
00424           goto exit;
00425 
00426     TimerInit(&connect_timer);
00427     TimerCountdownMS(&connect_timer, c->command_timeout_ms);
00428 
00429     if (options == 0)
00430         options = &default_options; /* set default options if none were supplied */
00431 
00432     c->keepAliveInterval = options->keepAliveInterval;
00433     c->cleansession = options->cleansession;
00434     TimerCountdown(&c->last_received, c->keepAliveInterval);
00435     if ((len = MQTTSerialize_connect(c->buf, c->buf_size, options)) <= 0)
00436         goto exit;
00437     if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS)  // send the connect packet
00438         goto exit; // there was a problem
00439 
00440     // this will be a blocking call, wait for the connack
00441     if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
00442     {
00443         data->rc = 0;
00444         data->sessionPresent = 0;
00445         if (MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
00446             rc = data->rc;
00447         else
00448             rc = FAILURE;
00449     }
00450     else
00451         rc = FAILURE;
00452 
00453 exit:
00454     if (rc == SUCCESS)
00455     {
00456         c->isconnected = 1;
00457         c->ping_outstanding = 0;
00458     }
00459 
00460 #if defined(MQTT_TASK)
00461       MutexUnlock(&c->mutex);
00462 #endif
00463 
00464     return rc;
00465 }
00466 
00467 
00468 int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
00469 {
00470     MQTTConnackData data;
00471     return MQTTConnectWithResults(c, options, &data);
00472 }
00473 
00474 
00475 int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
00476 {
00477     int rc = FAILURE;
00478     int i = -1;
00479 
00480     /* first check for an existing matching slot */
00481     for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00482     {
00483         if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
00484         {
00485             if (messageHandler == NULL) /* remove existing */
00486             {
00487                 c->messageHandlers[i].topicFilter = NULL;
00488                 c->messageHandlers[i].fp = NULL;
00489             }
00490             rc = SUCCESS; /* return i when adding new subscription */
00491             break;
00492         }
00493     }
00494     /* if no existing, look for empty slot (unless we are removing) */
00495     if (messageHandler != NULL) {
00496         if (rc == FAILURE)
00497         {
00498             for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
00499             {
00500                 if (c->messageHandlers[i].topicFilter == NULL)
00501                 {
00502                     rc = SUCCESS;
00503                     break;
00504                 }
00505             }
00506         }
00507         if (i < MAX_MESSAGE_HANDLERS)
00508         {
00509             c->messageHandlers[i].topicFilter = topicFilter;
00510             c->messageHandlers[i].fp = messageHandler;
00511         }
00512     }
00513     return rc;
00514 }
00515 
00516 
00517 int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
00518        messageHandler messageHandler, MQTTSubackData* data)
00519 {
00520     int rc = FAILURE;
00521     Timer timer;
00522     int len = 0;
00523     MQTTString topic = MQTTString_initializer;
00524     topic.cstring = (char *)topicFilter;
00525 
00526 #if defined(MQTT_TASK)
00527       MutexLock(&c->mutex);
00528 #endif
00529       if (!c->isconnected)
00530             goto exit;
00531 
00532     TimerInit(&timer);
00533     TimerCountdownMS(&timer, c->command_timeout_ms);
00534 
00535     len = MQTTSerialize_subscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic, (int*)&qos);
00536     if (len <= 0)
00537         goto exit;
00538     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
00539         goto exit;             // there was a problem
00540 
00541     if (waitfor(c, SUBACK, &timer) == SUBACK)      // wait for suback
00542     {
00543         int count = 0;
00544         unsigned short mypacketid;
00545         data->grantedQoS = QOS0;
00546         if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&data->grantedQoS, c->readbuf, c->readbuf_size) == 1)
00547         {
00548             if (data->grantedQoS != 0x80)
00549                 rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
00550         }
00551     }
00552     else
00553         rc = FAILURE;
00554 
00555 exit:
00556     if (rc == FAILURE)
00557         MQTTCloseSession(c);
00558 #if defined(MQTT_TASK)
00559       MutexUnlock(&c->mutex);
00560 #endif
00561     return rc;
00562 }
00563 
00564 
00565 int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
00566        messageHandler messageHandler)
00567 {
00568     MQTTSubackData data;
00569     return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
00570 }
00571 
00572 
00573 int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
00574 {
00575     int rc = FAILURE;
00576     Timer timer;
00577     MQTTString topic = MQTTString_initializer;
00578     topic.cstring = (char *)topicFilter;
00579     int len = 0;
00580 
00581 #if defined(MQTT_TASK)
00582       MutexLock(&c->mutex);
00583 #endif
00584       if (!c->isconnected)
00585           goto exit;
00586 
00587     TimerInit(&timer);
00588     TimerCountdownMS(&timer, c->command_timeout_ms);
00589 
00590     if ((len = MQTTSerialize_unsubscribe(c->buf, c->buf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
00591         goto exit;
00592     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
00593         goto exit; // there was a problem
00594 
00595     if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
00596     {
00597         unsigned short mypacketid;  // should be the same as the packetid above
00598         if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
00599         {
00600             /* remove the subscription message handler associated with this topic, if there is one */
00601             MQTTSetMessageHandler(c, topicFilter, NULL);
00602         }
00603     }
00604     else
00605         rc = FAILURE;
00606 
00607 exit:
00608     if (rc == FAILURE)
00609         MQTTCloseSession(c);
00610 #if defined(MQTT_TASK)
00611       MutexUnlock(&c->mutex);
00612 #endif
00613     return rc;
00614 }
00615 
00616 
00617 int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
00618 {
00619     int rc = FAILURE;
00620     Timer timer;
00621     MQTTString topic = MQTTString_initializer;
00622     topic.cstring = (char *)topicName;
00623     int len = 0;
00624 
00625 #if defined(MQTT_TASK)
00626       MutexLock(&c->mutex);
00627 #endif
00628       if (!c->isconnected)
00629             goto exit;
00630 
00631     TimerInit(&timer);
00632     TimerCountdownMS(&timer, c->command_timeout_ms);
00633 
00634     if (message->qos == QOS1 || message->qos == QOS2)
00635         message->id = getNextPacketId(c);
00636 
00637     len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
00638               topic, (unsigned char*)message->payload, message->payloadlen);
00639     if (len <= 0)
00640         goto exit;
00641     if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
00642         goto exit; // there was a problem
00643 
00644     if (message->qos == QOS1)
00645     {
00646         if (waitfor(c, PUBACK, &timer) == PUBACK)
00647         {
00648             unsigned short mypacketid;
00649             unsigned char dup, type;
00650             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
00651                 rc = FAILURE;
00652         }
00653         else
00654             rc = FAILURE;
00655     }
00656     else if (message->qos == QOS2)
00657     {
00658         if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
00659         {
00660             unsigned short mypacketid;
00661             unsigned char dup, type;
00662             if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
00663                 rc = FAILURE;
00664         }
00665         else
00666             rc = FAILURE;
00667     }
00668 
00669 exit:
00670     if (rc == FAILURE)
00671         MQTTCloseSession(c);
00672 #if defined(MQTT_TASK)
00673       MutexUnlock(&c->mutex);
00674 #endif
00675     return rc;
00676 }
00677 
00678 
00679 int MQTTDisconnect(MQTTClient* c)
00680 {
00681     int rc = FAILURE;
00682     Timer timer;     // we might wait for incomplete incoming publishes to complete
00683     int len = 0;
00684 
00685 #if defined(MQTT_TASK)
00686     MutexLock(&c->mutex);
00687 #endif
00688     TimerInit(&timer);
00689     TimerCountdownMS(&timer, c->command_timeout_ms);
00690 
00691       len = MQTTSerialize_disconnect(c->buf, c->buf_size);
00692     if (len > 0)
00693         rc = sendPacket(c, len, &timer);            // send the disconnect packet
00694     MQTTCloseSession(c);
00695 
00696 #if defined(MQTT_TASK)
00697       MutexUnlock(&c->mutex);
00698 #endif
00699     return rc;
00700 }