V.06 11/3
Dependencies: FT6206 SDFileSystem SPI_TFT_ILI9341 TFT_fonts
Fork of ATT_AWS_IoT_demo by
MQTTClient.cpp
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #include "MQTTClient.h" 00018 #include <string.h> 00019 #include "aws_iot_log.h" 00020 00021 static void MQTTForceDisconnect(Client *c); 00022 00023 void NewMessageData(MessageData *md, MQTTString *aTopicName, MQTTMessage *aMessage, pApplicationHandler_t applicationHandler) { 00024 md->topicName = aTopicName; 00025 md->message = aMessage; 00026 md->applicationHandler = applicationHandler; 00027 } 00028 00029 uint16_t getNextPacketId(Client *c) { 00030 return c->nextPacketId = (uint16_t)((MAX_PACKET_ID == c->nextPacketId) ? 1 : (c->nextPacketId + 1)); 00031 } 00032 00033 MQTTReturnCode sendPacket(Client *c, uint32_t length, Timer *timer) { 00034 int32_t sentLen = 0; 00035 uint32_t sent = 0, i; 00036 00037 if(NULL == c || NULL == timer) { 00038 ERROR("...MQTT_NULL_VALUE_ERROR"); 00039 return MQTT_NULL_VALUE_ERROR; 00040 } 00041 00042 if(length >= c->bufSize) { 00043 ERROR("...MQTTPACKET_BUFFER_TOO_SHORT"); 00044 return MQTTPACKET_BUFFER_TOO_SHORT; 00045 } 00046 00047 while(sent < length && !expired(timer)) { 00048 sentLen = c->networkStack.mqttwrite(&(c->networkStack), &c->buf[sent], (int)length, left_ms(timer)); 00049 if(sentLen < 0) { 00050 /* there was an error writing the data */ 00051 ERROR("...there was an error writing the data"); 00052 break; 00053 } 00054 sent = sent + (uint32_t)sentLen; 00055 } 00056 00057 if(sent == length) { 00058 /* record the fact that we have successfully sent the packet */ 00059 //countdown(&c->pingTimer, c->keepAliveInterval); 00060 return SUCCESS; 00061 } 00062 00063 ERROR("...sendPacket FAILURE, sent = %d, length = %d", sent, length); 00064 return FAILURE; 00065 } 00066 00067 void copyMQTTConnectData(MQTTPacket_connectData *destination, MQTTPacket_connectData *source) { 00068 if(NULL == destination || NULL == source) { 00069 return; 00070 } 00071 destination->willFlag = source->willFlag; 00072 destination->MQTTVersion = source->MQTTVersion; 00073 destination->clientID.cstring = source->clientID.cstring; 00074 destination->username.cstring = source->username.cstring; 00075 destination->password.cstring = source->password.cstring; 00076 destination->will.topicName.cstring = source->will.topicName.cstring; 00077 destination->will.message.cstring = source->will.message.cstring; 00078 destination->will.qos = source->will.qos; 00079 destination->will.retained = source->will.retained; 00080 destination->keepAliveInterval = source->keepAliveInterval; 00081 destination->cleansession = source->cleansession; 00082 } 00083 00084 MQTTReturnCode MQTTClient(Client *c, uint32_t commandTimeoutMs, 00085 unsigned char *buf, size_t bufSize, unsigned char *readbuf, 00086 size_t readBufSize, uint8_t enableAutoReconnect, 00087 networkInitHandler_t networkInitHandler, 00088 TLSConnectParams *tlsConnectParams) { 00089 uint32_t i; 00090 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00091 00092 if(NULL == c || NULL == tlsConnectParams || NULL == buf || NULL == readbuf 00093 || NULL == networkInitHandler) { 00094 return MQTT_NULL_VALUE_ERROR; 00095 } 00096 00097 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { 00098 c->messageHandlers[i].topicFilter = NULL; 00099 c->messageHandlers[i].fp = NULL; 00100 c->messageHandlers[i].applicationHandler = NULL; 00101 c->messageHandlers[i].qos = (QoS)0; 00102 } 00103 00104 c->commandTimeoutMs = commandTimeoutMs; 00105 c->buf = buf; 00106 c->bufSize = bufSize; 00107 c->readbuf = readbuf; 00108 c->readBufSize = readBufSize; 00109 c->isConnected = 0; 00110 c->isPingOutstanding = 0; 00111 c->wasManuallyDisconnected = 0; 00112 c->counterNetworkDisconnected = 0; 00113 c->isAutoReconnectEnabled = enableAutoReconnect; 00114 c->defaultMessageHandler = NULL; 00115 c->disconnectHandler = NULL; 00116 copyMQTTConnectData(&(c->options), &default_options); 00117 00118 c->networkInitHandler = networkInitHandler; 00119 c->tlsConnectParams.DestinationPort = tlsConnectParams->DestinationPort; 00120 c->tlsConnectParams.pDestinationURL = tlsConnectParams->pDestinationURL; 00121 c->tlsConnectParams.pDeviceCertLocation = tlsConnectParams->pDeviceCertLocation; 00122 c->tlsConnectParams.pDevicePrivateKeyLocation = tlsConnectParams->pDevicePrivateKeyLocation; 00123 c->tlsConnectParams.pRootCALocation = tlsConnectParams->pRootCALocation; 00124 c->tlsConnectParams.timeout_ms = tlsConnectParams->timeout_ms; 00125 c->tlsConnectParams.ServerVerificationFlag = tlsConnectParams->ServerVerificationFlag; 00126 00127 InitTimer(&(c->pingTimer)); 00128 InitTimer(&(c->reconnectDelayTimer)); 00129 00130 return SUCCESS; 00131 } 00132 00133 MQTTReturnCode decodePacket(Client *c, uint32_t *value, uint32_t timeout) { 00134 unsigned char i; 00135 uint32_t multiplier = 1; 00136 uint32_t len = 0; 00137 const uint32_t MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00138 00139 if(NULL == c || NULL == value) { 00140 return MQTT_NULL_VALUE_ERROR; 00141 } 00142 00143 *value = 0; 00144 00145 do { 00146 if(++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { 00147 /* bad data */ 00148 return MQTTPACKET_READ_ERROR; 00149 } 00150 00151 if((c->networkStack.mqttread(&(c->networkStack), &i, 1, (int)timeout)) != 1) { 00152 /* The value argument is the important value. len is just used temporarily 00153 * and never used by the calling function for anything else */ 00154 return FAILURE; 00155 } 00156 00157 *value += ((i & 127) * multiplier); 00158 multiplier *= 128; 00159 }while((i & 128) != 0); 00160 00161 /* The value argument is the important value. len is just used temporarily 00162 * and never used by the calling function for anything else */ 00163 return SUCCESS; 00164 } 00165 00166 MQTTReturnCode readPacket(Client *c, Timer *timer, uint8_t *packet_type) { 00167 MQTTHeader header = {0}; 00168 uint32_t len = 0; 00169 uint32_t rem_len = 0; 00170 uint32_t total_bytes_read = 0; 00171 uint32_t bytes_to_be_read = 0; 00172 int32_t ret_val = 0; 00173 MQTTReturnCode rc; 00174 00175 if(NULL == c || NULL == timer) { 00176 ERROR("readPacket() MQTT_NULL_VALUE_ERROR"); 00177 return MQTT_NULL_VALUE_ERROR; 00178 } 00179 00180 /* 1. read the header byte. This has the packet type in it */ 00181 if(1 != c->networkStack.mqttread(&(c->networkStack), c->readbuf, 1, left_ms(timer))) { 00182 /* If a network disconnect has occurred it would have been caught by keepalive already. 00183 * If nothing is found at this point means there was nothing to read. Not 100% correct, 00184 * but the only way to be sure is to pass proper error codes from the network stack 00185 * which the mbedtls/openssl implementations do not return */ 00186 return MQTT_NOTHING_TO_READ; 00187 } 00188 00189 len = 1; 00190 /* 2. read the remaining length. This is variable in itself */ 00191 rc = decodePacket(c, &rem_len, (uint32_t)left_ms(timer)); 00192 if(SUCCESS != rc) { 00193 ERROR("readPacket() SUCCESS != rc"); 00194 return rc; 00195 } 00196 00197 /* if the buffer is too short then the message will be dropped silently */ 00198 if (rem_len >= c->readBufSize) { 00199 bytes_to_be_read = c->readBufSize; 00200 do { 00201 ret_val = c->networkStack.mqttread(&(c->networkStack), c->readbuf, bytes_to_be_read, left_ms(timer)); 00202 if (ret_val > 0) { 00203 total_bytes_read += ret_val; 00204 if((rem_len - total_bytes_read) >= c->readBufSize){ 00205 bytes_to_be_read = c->readBufSize; 00206 } 00207 else{ 00208 bytes_to_be_read = rem_len - total_bytes_read; 00209 } 00210 } 00211 } while (total_bytes_read < rem_len && ret_val > 0); 00212 return MQTTPACKET_BUFFER_TOO_SHORT; 00213 } 00214 00215 /* put the original remaining length back into the buffer */ 00216 len += MQTTPacket_encode(c->readbuf + 1, rem_len); 00217 00218 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00219 if(rem_len > 0 && (c->networkStack.mqttread(&(c->networkStack), c->readbuf + len, (int)rem_len, left_ms(timer)) != (int)rem_len)) { 00220 ERROR("readPacket() FAILURE"); 00221 return FAILURE; 00222 } 00223 00224 header.byte = c->readbuf[0]; 00225 *packet_type = header.bits.type; 00226 00227 return SUCCESS; 00228 } 00229 00230 // assume topic filter and name is in correct format 00231 // # can only be at end 00232 // + and # can only be next to separator 00233 char isTopicMatched(char *topicFilter, MQTTString *topicName) { 00234 char *curf = NULL; 00235 char *curn = NULL; 00236 char *curn_end = NULL; 00237 00238 if(NULL == topicFilter || NULL == topicName) { 00239 return MQTT_NULL_VALUE_ERROR; 00240 } 00241 00242 curf = topicFilter; 00243 curn = topicName->lenstring.data; 00244 curn_end = curn + topicName->lenstring.len; 00245 00246 while(*curf && (curn < curn_end)) { 00247 if(*curn == '/' && *curf != '/') { 00248 break; 00249 } 00250 if(*curf != '+' && *curf != '#' && *curf != *curn) { 00251 break; 00252 } 00253 if(*curf == '+') { 00254 /* skip until we meet the next separator, or end of string */ 00255 char *nextpos = curn + 1; 00256 while(nextpos < curn_end && *nextpos != '/') 00257 nextpos = ++curn + 1; 00258 } else if(*curf == '#') { 00259 /* skip until end of string */ 00260 curn = curn_end - 1; 00261 } 00262 00263 curf++; 00264 curn++; 00265 }; 00266 00267 return (curn == curn_end) && (*curf == '\0'); 00268 } 00269 00270 MQTTReturnCode deliverMessage(Client *c, MQTTString *topicName, MQTTMessage *message) { 00271 uint32_t i; 00272 MessageData md; 00273 00274 if(NULL == c || NULL == topicName || NULL == message) { 00275 return MQTT_NULL_VALUE_ERROR; 00276 } 00277 00278 // we have to find the right message handler - indexed by topic 00279 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { 00280 if((c->messageHandlers[i].topicFilter != 0) 00281 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) || 00282 isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName))) { 00283 if(c->messageHandlers[i].fp != NULL) { 00284 NewMessageData(&md, topicName, message, c->messageHandlers[i].applicationHandler); 00285 c->messageHandlers[i].fp(&md); 00286 return SUCCESS; 00287 } 00288 } 00289 } 00290 00291 if(NULL != c->defaultMessageHandler) { 00292 NewMessageData(&md, topicName, message, NULL); 00293 c->defaultMessageHandler(&md); 00294 return SUCCESS; 00295 } 00296 00297 /* Message handler not found for topic */ 00298 return FAILURE; 00299 } 00300 00301 MQTTReturnCode handleDisconnect(Client *c) { 00302 MQTTReturnCode rc; 00303 00304 if(NULL == c) { 00305 return MQTT_NULL_VALUE_ERROR; 00306 } 00307 00308 rc = MQTTDisconnect(c); 00309 if(rc != SUCCESS){ 00310 // If the sendPacket prevents us from sending a disconnect packet then we have to clean the stack 00311 MQTTForceDisconnect(c); 00312 } 00313 00314 if(NULL != c->disconnectHandler) { 00315 c->disconnectHandler(); 00316 } 00317 00318 /* Reset to 0 since this was not a manual disconnect */ 00319 c->wasManuallyDisconnected = 0; 00320 return MQTT_NETWORK_DISCONNECTED_ERROR; 00321 } 00322 00323 MQTTReturnCode MQTTAttemptReconnect(Client *c) { 00324 MQTTReturnCode rc = MQTT_ATTEMPTING_RECONNECT; 00325 00326 if(NULL == c) { 00327 return MQTT_NULL_VALUE_ERROR; 00328 } 00329 00330 if(1 == c->isConnected) { 00331 return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; 00332 } 00333 00334 /* Ignoring return code. failures expected if network is disconnected */ 00335 rc = MQTTConnect(c, NULL); 00336 00337 /* If still disconnected handle disconnect */ 00338 if(0 == c->isConnected) { 00339 return MQTT_ATTEMPTING_RECONNECT; 00340 } 00341 00342 rc = MQTTResubscribe(c); 00343 if(SUCCESS != rc) { 00344 return rc; 00345 } 00346 00347 return MQTT_NETWORK_RECONNECTED; 00348 } 00349 00350 MQTTReturnCode handleReconnect(Client *c) { 00351 int8_t isPhysicalLayerConnected = 1; 00352 MQTTReturnCode rc = MQTT_NETWORK_RECONNECTED; 00353 00354 if(NULL == c) { 00355 return MQTT_NULL_VALUE_ERROR; 00356 } 00357 00358 if(!expired(&(c->reconnectDelayTimer))) { 00359 /* Timer has not expired. Not time to attempt reconnect yet. 00360 * Return attempting reconnect */ 00361 return MQTT_ATTEMPTING_RECONNECT; 00362 } 00363 00364 if(NULL != c->networkStack.isConnected) { 00365 isPhysicalLayerConnected = (int8_t)c->networkStack.isConnected(&(c->networkStack)); 00366 } 00367 00368 if(isPhysicalLayerConnected) { 00369 rc = MQTTAttemptReconnect(c); 00370 if(MQTT_NETWORK_RECONNECTED == rc) { 00371 return MQTT_NETWORK_RECONNECTED; 00372 } 00373 } 00374 00375 c->currentReconnectWaitInterval *= 2; 00376 00377 if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { 00378 return MQTT_RECONNECT_TIMED_OUT; 00379 } 00380 countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); 00381 return rc; 00382 } 00383 00384 MQTTReturnCode keepalive(Client *c) { 00385 MQTTReturnCode rc = SUCCESS; 00386 Timer timer; 00387 uint32_t serialized_len = 0; 00388 00389 if(NULL == c) { 00390 return MQTT_NULL_VALUE_ERROR; 00391 } 00392 00393 if(0 == c->keepAliveInterval) { 00394 return SUCCESS; 00395 } 00396 00397 if(!expired(&c->pingTimer)) { 00398 return SUCCESS; 00399 } 00400 00401 if(c->isPingOutstanding) { 00402 return handleDisconnect(c); 00403 } 00404 00405 /* there is no ping outstanding - send one */ 00406 InitTimer(&timer); 00407 countdown_ms(&timer, c->commandTimeoutMs); 00408 rc = MQTTSerialize_pingreq(c->buf, c->bufSize, &serialized_len); 00409 if(SUCCESS != rc) { 00410 return rc; 00411 } 00412 00413 /* send the ping packet */ 00414 rc = sendPacket(c, serialized_len, &timer); 00415 if(SUCCESS != rc) { 00416 //If sending a PING fails we can no longer determine if we are connected. In this case we decide we are disconnected and begin reconnection attempts 00417 return handleDisconnect(c); 00418 } 00419 00420 c->isPingOutstanding = 1; 00421 /* start a timer to wait for PINGRESP from server */ 00422 countdown(&c->pingTimer, c->keepAliveInterval / 2); 00423 00424 return SUCCESS; 00425 } 00426 00427 MQTTReturnCode handlePublish(Client *c, Timer *timer) { 00428 MQTTString topicName; 00429 MQTTMessage msg; 00430 MQTTReturnCode rc; 00431 uint32_t len = 0; 00432 00433 rc = MQTTDeserialize_publish((unsigned char *) &msg.dup, (QoS *) &msg.qos, (unsigned char *) &msg.retained, 00434 (uint16_t *)&msg.id, &topicName, 00435 (unsigned char **) &msg.payload, (uint32_t *) &msg.payloadlen, c->readbuf, 00436 c->readBufSize); 00437 if(SUCCESS != rc) { 00438 return rc; 00439 } 00440 00441 rc = deliverMessage(c, &topicName, &msg); 00442 if(SUCCESS != rc) { 00443 return rc; 00444 } 00445 00446 if(QOS0 == msg.qos) { 00447 /* No further processing required for QOS0 */ 00448 return SUCCESS; 00449 } 00450 00451 if(QOS1 == msg.qos) { 00452 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBACK, 0, msg.id, &len); 00453 } else { /* Message is not QOS0 or 1 means only option left is QOS2 */ 00454 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREC, 0, msg.id, &len); 00455 } 00456 00457 if(SUCCESS != rc) { 00458 return rc; 00459 } 00460 00461 rc = sendPacket(c, len, timer); 00462 if(SUCCESS != rc) { 00463 return rc; 00464 } 00465 00466 return SUCCESS; 00467 } 00468 00469 MQTTReturnCode handlePubrec(Client *c, Timer *timer) { 00470 uint16_t packet_id; 00471 unsigned char dup, type; 00472 MQTTReturnCode rc; 00473 uint32_t len; 00474 00475 rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); 00476 if(SUCCESS != rc) { 00477 return rc; 00478 } 00479 00480 rc = MQTTSerialize_ack(c->buf, c->bufSize, PUBREL, 0, packet_id, &len); 00481 if(SUCCESS != rc) { 00482 return rc; 00483 } 00484 00485 /* send the PUBREL packet */ 00486 rc = sendPacket(c, len, timer); 00487 if(SUCCESS != rc) { 00488 /* there was a problem */ 00489 return rc; 00490 } 00491 00492 return SUCCESS; 00493 } 00494 00495 MQTTReturnCode cycle(Client *c, Timer *timer, uint8_t *packet_type) { 00496 MQTTReturnCode rc; 00497 if(NULL == c || NULL == timer) { 00498 ERROR("cycle() MQTT_NULL_VALUE_ERROR"); 00499 return MQTT_NULL_VALUE_ERROR; 00500 } 00501 00502 /* read the socket, see what work is due */ 00503 rc = readPacket(c, timer, packet_type); 00504 if(MQTT_NOTHING_TO_READ == rc) { 00505 /* Nothing to read, not a cycle failure */ 00506 return SUCCESS; 00507 } 00508 if(SUCCESS != rc) { 00509 ERROR("cycle() SUCCESS != rc"); 00510 return rc; 00511 } 00512 00513 switch(*packet_type) { 00514 case CONNACK: 00515 case PUBACK: 00516 case SUBACK: 00517 case UNSUBACK: 00518 break; 00519 case PUBLISH: { 00520 rc = handlePublish(c, timer); 00521 break; 00522 } 00523 case PUBREC: { 00524 rc = handlePubrec(c, timer); 00525 break; 00526 } 00527 case PUBCOMP: 00528 break; 00529 case PINGRESP: { 00530 c->isPingOutstanding = 0; 00531 countdown(&c->pingTimer, c->keepAliveInterval); 00532 break; 00533 } 00534 default: { 00535 /* Either unknown packet type or Failure occurred 00536 * Should not happen */ 00537 ERROR("cycle() Either unknown packet type or Failure occurred"); 00538 return MQTT_BUFFER_RX_MESSAGE_INVALID; 00539 break; 00540 } 00541 } 00542 00543 return rc; 00544 } 00545 00546 MQTTReturnCode MQTTYield(Client *c, uint32_t timeout_ms) { 00547 MQTTReturnCode rc = SUCCESS; 00548 Timer timer; 00549 uint8_t packet_type; 00550 00551 if(NULL == c) { 00552 ERROR("MQTTYield() MQTT_NULL_VALUE_ERROR"); 00553 return MQTT_NULL_VALUE_ERROR; 00554 } 00555 00556 /* Check if network was manually disconnected */ 00557 if(0 == c->isConnected && 1 == c->wasManuallyDisconnected) { 00558 ERROR("MQTTYield() MQTT_NETWORK_MANUALLY_DISCONNECTED"); 00559 return MQTT_NETWORK_MANUALLY_DISCONNECTED; 00560 } 00561 00562 /* Check if network is disconnected and auto-reconnect is not enabled */ 00563 if(0 == c->isConnected && 0 == c->isAutoReconnectEnabled) { 00564 ERROR("MQTTYield() MQTT_NETWORK_DISCONNECTED_ERROR"); 00565 return MQTT_NETWORK_DISCONNECTED_ERROR; 00566 } 00567 00568 InitTimer(&timer); 00569 countdown_ms(&timer, timeout_ms); 00570 00571 while(!expired(&timer)) { 00572 if(0 == c->isConnected) { 00573 if(MAX_RECONNECT_WAIT_INTERVAL < c->currentReconnectWaitInterval) { 00574 rc = MQTT_RECONNECT_TIMED_OUT; 00575 ERROR("MQTTYield() MQTT_RECONNECT_TIMED_OUT"); 00576 break; 00577 } 00578 00579 rc = handleReconnect(c); 00580 /* Network reconnect attempted, check if yield timer expired before 00581 * doing anything else */ 00582 continue; 00583 } 00584 00585 rc = cycle(c, &timer, &packet_type); 00586 if(SUCCESS != rc) { 00587 ERROR("MQTTYield() SUCCESS != rc"); 00588 break; 00589 } 00590 00591 rc = keepalive(c); 00592 if(MQTT_NETWORK_DISCONNECTED_ERROR == rc && 1 == c->isAutoReconnectEnabled) { 00593 c->currentReconnectWaitInterval = MIN_RECONNECT_WAIT_INTERVAL; 00594 countdown_ms(&(c->reconnectDelayTimer), c->currentReconnectWaitInterval); 00595 c->counterNetworkDisconnected++; 00596 /* Depending on timer values, it is possible that yield timer has expired 00597 * Set to rc to attempting reconnect to inform client that autoreconnect 00598 * attempt has started */ 00599 INFO("MQTTYield() MQTT_ATTEMPTING_RECONNECT"); 00600 rc = MQTT_ATTEMPTING_RECONNECT; 00601 } else if(SUCCESS != rc) { 00602 ERROR("MQTTYield() SUCCESS != rc"); 00603 break; 00604 } 00605 } 00606 00607 return rc; 00608 } 00609 00610 /* only used in single-threaded mode where one command at a time is in process */ 00611 MQTTReturnCode waitfor(Client *c, uint8_t packet_type, Timer *timer) { 00612 MQTTReturnCode rc = FAILURE; 00613 uint8_t read_packet_type = 0, retry = 2; 00614 00615 00616 00617 if(NULL == c || NULL == timer) { 00618 ERROR("waitfor() MQTT_NULL_VALUE_ERROR"); 00619 return MQTT_NULL_VALUE_ERROR; 00620 } 00621 00622 do { 00623 if(expired(timer)) { 00624 /* we timed out */ 00625 ERROR("waitfor() timer expired"); 00626 break; 00627 00628 //countdown_ms(timer, 10); 00629 } 00630 00631 rc = cycle(c, timer, &read_packet_type); 00632 }while(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type); 00633 00634 if(MQTT_NETWORK_DISCONNECTED_ERROR != rc && read_packet_type != packet_type) { 00635 ERROR("waitfor() MQTT_NETWORK_DISCONNECTED_ERROR"); 00636 return FAILURE; 00637 } 00638 00639 /* Something failed or we didn't receive the expected packet, return error code */ 00640 return rc; 00641 } 00642 00643 MQTTReturnCode MQTTConnect(Client *c, MQTTPacket_connectData *options) { 00644 Timer connect_timer; 00645 MQTTReturnCode connack_rc = FAILURE; 00646 char sessionPresent = 0; 00647 uint32_t len = 0; 00648 MQTTReturnCode rc = FAILURE; 00649 00650 if(NULL == c) { 00651 return MQTT_NULL_VALUE_ERROR; 00652 } 00653 00654 DEBUG("...connect_timer"); 00655 InitTimer(&connect_timer); 00656 countdown_ms(&connect_timer, c->commandTimeoutMs); 00657 00658 if(c->isConnected) { 00659 /* Don't send connect packet again if we are already connected */ 00660 ERROR("...MQTT_NETWORK_ALREADY_CONNECTED_ERROR"); 00661 return MQTT_NETWORK_ALREADY_CONNECTED_ERROR; 00662 } 00663 00664 if(NULL != options) { 00665 /* override default options if new options were supplied */ 00666 copyMQTTConnectData(&(c->options), options); 00667 } 00668 00669 DEBUG("...TLS Connect"); 00670 c->networkInitHandler(&(c->networkStack)); 00671 rc = (MQTTReturnCode)c->networkStack.connect(&(c->networkStack), c->tlsConnectParams); 00672 if(0 != rc) { 00673 /* TLS Connect failed, return error */ 00674 ERROR("...TLS Connect failed, return error"); 00675 return FAILURE; 00676 } 00677 00678 c->keepAliveInterval = c->options.keepAliveInterval; 00679 rc = MQTTSerialize_connect(c->buf, c->bufSize, &(c->options), &len); 00680 if(SUCCESS != rc || 0 >= len) { 00681 ERROR("...MQTTSerialize_connect FAIL"); 00682 return FAILURE; 00683 } 00684 00685 /* send the connect packet */ 00686 rc = sendPacket(c, len, &connect_timer); 00687 if(SUCCESS != rc) { 00688 ERROR("...sendPacket FAIL"); 00689 return rc; 00690 } 00691 00692 /* this will be a blocking call, wait for the CONNACK */ 00693 rc = waitfor(c, CONNACK, &connect_timer); 00694 if(SUCCESS != rc) { 00695 ERROR("...waitfor FAIL"); 00696 return rc; 00697 } 00698 00699 /* Received CONNACK, check the return code */ 00700 rc = MQTTDeserialize_connack((unsigned char *)&sessionPresent, &connack_rc, c->readbuf, c->readBufSize); 00701 if(SUCCESS != rc) { 00702 ERROR("...MQTTDeserialize_connack FAIL"); 00703 return rc; 00704 } 00705 00706 if(MQTT_CONNACK_CONNECTION_ACCEPTED != connack_rc) { 00707 ERROR("...MQTT_CONNACK_CONNECTION_ACCEPTED FAIL"); 00708 return connack_rc; 00709 } 00710 00711 DEBUG("...isConnected"); 00712 c->isConnected = 1; 00713 c->wasManuallyDisconnected = 0; 00714 c->isPingOutstanding = 0; 00715 countdown(&c->pingTimer, c->keepAliveInterval); 00716 00717 return SUCCESS; 00718 } 00719 00720 /* Return MAX_MESSAGE_HANDLERS value if no free index is available */ 00721 uint32_t GetFreeMessageHandlerIndex(Client *c) { 00722 uint32_t itr; 00723 for(itr = 0; itr < MAX_MESSAGE_HANDLERS; itr++) { 00724 if(c->messageHandlers[itr].topicFilter == NULL) { 00725 break; 00726 } 00727 } 00728 00729 return itr; 00730 } 00731 00732 MQTTReturnCode MQTTSubscribe(Client *c, const char *topicFilter, QoS qos, 00733 messageHandler messageHandler, pApplicationHandler_t applicationHandler) { 00734 MQTTReturnCode rc = FAILURE; 00735 Timer timer; 00736 uint32_t len = 0; 00737 uint32_t indexOfFreeMessageHandler; 00738 uint32_t count = 0; 00739 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00740 uint16_t packetId; 00741 MQTTString topic = MQTTString_initializer; 00742 00743 if(NULL == c || NULL == topicFilter 00744 || NULL == messageHandler || NULL == applicationHandler) { 00745 ERROR("...MQTT_NULL_VALUE_ERROR FAIL"); 00746 return MQTT_NULL_VALUE_ERROR; 00747 } 00748 00749 if(!c->isConnected) { 00750 ERROR("...MQTT_NETWORK_DISCONNECTED_ERROR FAIL"); 00751 return MQTT_NETWORK_DISCONNECTED_ERROR; 00752 } 00753 00754 topic.cstring = (char *)topicFilter; 00755 00756 InitTimer(&timer); 00757 countdown_ms(&timer, c->commandTimeoutMs); 00758 00759 rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &qos, &len); 00760 if(SUCCESS != rc) { 00761 ERROR("...MQTTSerialize_subscribe FAIL"); 00762 return rc; 00763 } 00764 00765 indexOfFreeMessageHandler = GetFreeMessageHandlerIndex(c); 00766 if(MAX_MESSAGE_HANDLERS <= indexOfFreeMessageHandler) { 00767 ERROR("...MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR FAIL"); 00768 return MQTT_MAX_SUBSCRIPTIONS_REACHED_ERROR; 00769 } 00770 00771 /* send the subscribe packet */ 00772 rc = sendPacket(c, len, &timer); 00773 if(SUCCESS != rc) { 00774 ERROR("...send the subscribe packet FAIL"); 00775 return rc; 00776 } 00777 00778 /* wait for suback */ 00779 rc = waitfor(c, SUBACK, &timer); 00780 if(SUCCESS != rc) { 00781 ERROR("...wait for suback FAIL"); 00782 return rc; 00783 } 00784 00785 /* Granted QoS can be 0, 1 or 2 */ 00786 rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); 00787 if(SUCCESS != rc) { 00788 ERROR("...Granted QoS can be 0, 1 or 2"); 00789 return rc; 00790 } 00791 00792 c->messageHandlers[indexOfFreeMessageHandler].topicFilter = 00793 topicFilter; 00794 c->messageHandlers[indexOfFreeMessageHandler].fp = messageHandler; 00795 c->messageHandlers[indexOfFreeMessageHandler].applicationHandler = 00796 applicationHandler; 00797 c->messageHandlers[indexOfFreeMessageHandler].qos = qos; 00798 00799 DEBUG("...MQTTSubscribe SUCCESS"); 00800 return SUCCESS; 00801 } 00802 00803 MQTTReturnCode MQTTResubscribe(Client *c) { 00804 MQTTReturnCode rc = FAILURE; 00805 Timer timer; 00806 uint32_t len = 0; 00807 uint32_t count = 0; 00808 QoS grantedQoS[3] = {QOS0, QOS0, QOS0}; 00809 uint16_t packetId; 00810 uint32_t existingSubCount = 0; 00811 uint32_t itr = 0; 00812 00813 if(NULL == c) { 00814 return MQTT_NULL_VALUE_ERROR; 00815 } 00816 00817 if(!c->isConnected) { 00818 return MQTT_NETWORK_DISCONNECTED_ERROR; 00819 } 00820 00821 existingSubCount = GetFreeMessageHandlerIndex(c); 00822 00823 for(itr = 0; itr < existingSubCount; itr++) { 00824 MQTTString topic = MQTTString_initializer; 00825 topic.cstring = (char *)c->messageHandlers[itr].topicFilter; 00826 00827 InitTimer(&timer); 00828 countdown_ms(&timer, c->commandTimeoutMs); 00829 00830 rc = MQTTSerialize_subscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, 00831 &topic, &(c->messageHandlers[itr].qos), &len); 00832 if(SUCCESS != rc) { 00833 return rc; 00834 } 00835 00836 /* send the subscribe packet */ 00837 rc = sendPacket(c, len, &timer); 00838 if(SUCCESS != rc) { 00839 return rc; 00840 } 00841 00842 /* wait for suback */ 00843 rc = waitfor(c, SUBACK, &timer); 00844 if(SUCCESS != rc) { 00845 return rc; 00846 } 00847 00848 /* Granted QoS can be 0, 1 or 2 */ 00849 rc = MQTTDeserialize_suback(&packetId, 1, &count, grantedQoS, c->readbuf, c->readBufSize); 00850 if(SUCCESS != rc) { 00851 return rc; 00852 } 00853 } 00854 00855 return SUCCESS; 00856 } 00857 00858 MQTTReturnCode MQTTUnsubscribe(Client *c, const char *topicFilter) { 00859 MQTTReturnCode rc = FAILURE; 00860 Timer timer; 00861 MQTTString topic = MQTTString_initializer; 00862 uint32_t len = 0; 00863 uint32_t i = 0; 00864 uint16_t packet_id; 00865 00866 if(NULL == c || NULL == topicFilter) { 00867 return MQTT_NULL_VALUE_ERROR; 00868 } 00869 00870 topic.cstring = (char *)topicFilter; 00871 00872 if(!c->isConnected) { 00873 return MQTT_NETWORK_DISCONNECTED_ERROR; 00874 } 00875 00876 InitTimer(&timer); 00877 countdown_ms(&timer, c->commandTimeoutMs); 00878 00879 rc = MQTTSerialize_unsubscribe(c->buf, c->bufSize, 0, getNextPacketId(c), 1, &topic, &len); 00880 if(SUCCESS != rc) { 00881 return rc; 00882 } 00883 00884 /* send the unsubscribe packet */ 00885 rc = sendPacket(c, len, &timer); 00886 if(SUCCESS != rc) { 00887 return rc; 00888 } 00889 00890 rc = waitfor(c, UNSUBACK, &timer); 00891 if(SUCCESS != rc) { 00892 return rc; 00893 } 00894 00895 rc = MQTTDeserialize_unsuback(&packet_id, c->readbuf, c->readBufSize); 00896 if(SUCCESS != rc) { 00897 return rc; 00898 } 00899 00900 /* Remove from message handler array */ 00901 for(i = 0; i < MAX_MESSAGE_HANDLERS; ++i) { 00902 if(c->messageHandlers[i].topicFilter != NULL && 00903 (strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)) { 00904 c->messageHandlers[i].topicFilter = NULL; 00905 /* We don't want to break here, if the same topic is registered 00906 * with 2 callbacks. Unlikely scenario */ 00907 } 00908 } 00909 00910 return SUCCESS; 00911 } 00912 00913 MQTTReturnCode MQTTPublish(Client *c, const char *topicName, MQTTMessage *message) { 00914 Timer timer; 00915 MQTTString topic = MQTTString_initializer; 00916 uint32_t len = 0; 00917 uint8_t waitForAck = 0; 00918 uint8_t packetType = PUBACK; 00919 uint16_t packet_id; 00920 unsigned char dup, type; 00921 MQTTReturnCode rc = FAILURE; 00922 00923 if(NULL == c || NULL == topicName || NULL == message) { 00924 return MQTT_NULL_VALUE_ERROR; 00925 } 00926 00927 topic.cstring = (char *)topicName; 00928 00929 if(!c->isConnected) { 00930 return MQTT_NETWORK_DISCONNECTED_ERROR; 00931 } 00932 00933 InitTimer(&timer); 00934 countdown_ms(&timer, c->commandTimeoutMs); 00935 00936 if(QOS1 == message->qos || QOS2 == message->qos) { 00937 message->id = getNextPacketId(c); 00938 waitForAck = 1; 00939 if(QOS2 == message->qos) { 00940 packetType = PUBCOMP; 00941 } 00942 } 00943 00944 rc = MQTTSerialize_publish(c->buf, c->bufSize, 0, message->qos, message->retained, message->id, 00945 topic, (unsigned char*)message->payload, message->payloadlen, &len); 00946 if(SUCCESS != rc) { 00947 return rc; 00948 } 00949 00950 /* send the publish packet */ 00951 rc = sendPacket(c, len, &timer); 00952 if(SUCCESS != rc) { 00953 return rc; 00954 } 00955 00956 /* Wait for ack if QoS1 or QoS2 */ 00957 if(1 == waitForAck) { 00958 rc = waitfor(c, packetType, &timer); 00959 if(SUCCESS != rc) { 00960 return rc; 00961 } 00962 00963 rc = MQTTDeserialize_ack(&type, &dup, &packet_id, c->readbuf, c->readBufSize); 00964 if(SUCCESS != rc) { 00965 return rc; 00966 } 00967 } 00968 00969 return SUCCESS; 00970 } 00971 /** 00972 * This is for the case when the sendPacket Fails. 00973 */ 00974 static void MQTTForceDisconnect(Client *c){ 00975 c->isConnected = 0; 00976 c->networkStack.disconnect(&(c->networkStack)); 00977 c->networkStack.destroy(&(c->networkStack)); 00978 } 00979 00980 MQTTReturnCode MQTTDisconnect(Client *c) { 00981 MQTTReturnCode rc = FAILURE; 00982 /* We might wait for incomplete incoming publishes to complete */ 00983 Timer timer; 00984 uint32_t serialized_len = 0; 00985 00986 if(NULL == c) { 00987 return MQTT_NULL_VALUE_ERROR; 00988 } 00989 00990 if(0 == c->isConnected) { 00991 /* Network is already disconnected. Do nothing */ 00992 return MQTT_NETWORK_DISCONNECTED_ERROR; 00993 } 00994 00995 rc = MQTTSerialize_disconnect(c->buf, c->bufSize, &serialized_len); 00996 if(SUCCESS != rc) { 00997 return rc; 00998 } 00999 01000 InitTimer(&timer); 01001 countdown_ms(&timer, c->commandTimeoutMs); 01002 01003 /* send the disconnect packet */ 01004 if(serialized_len > 0) { 01005 rc = sendPacket(c, serialized_len, &timer); 01006 if(SUCCESS != rc) { 01007 return rc; 01008 } 01009 } 01010 01011 /* Clean network stack */ 01012 c->networkStack.disconnect(&(c->networkStack)); 01013 rc = (MQTTReturnCode)c->networkStack.destroy(&(c->networkStack)); 01014 if(0 != rc) { 01015 /* TLS Destroy failed, return error */ 01016 return FAILURE; 01017 } 01018 01019 c->isConnected = 0; 01020 01021 /* Always set to 1 whenever disconnect is called. Keepalive resets to 0 */ 01022 c->wasManuallyDisconnected = 1; 01023 01024 return SUCCESS; 01025 } 01026 01027 uint8_t MQTTIsConnected(Client *c) { 01028 if(NULL == c) { 01029 return 0; 01030 } 01031 01032 return c->isConnected; 01033 } 01034 01035 uint8_t MQTTIsAutoReconnectEnabled(Client *c) { 01036 if(NULL == c) { 01037 return 0; 01038 } 01039 01040 return c->isAutoReconnectEnabled; 01041 } 01042 01043 MQTTReturnCode setDisconnectHandler(Client *c, disconnectHandler_t disconnectHandler) { 01044 if(NULL == c || NULL == disconnectHandler) { 01045 return MQTT_NULL_VALUE_ERROR; 01046 } 01047 01048 c->disconnectHandler = disconnectHandler; 01049 return SUCCESS; 01050 } 01051 01052 MQTTReturnCode setAutoReconnectEnabled(Client *c, uint8_t value) { 01053 if(NULL == c) { 01054 return FAILURE; 01055 } 01056 c->isAutoReconnectEnabled = value; 01057 return SUCCESS; 01058 } 01059 01060 uint32_t MQTTGetNetworkDisconnectedCount(Client *c) { 01061 return c->counterNetworkDisconnected; 01062 } 01063 01064 void MQTTResetNetworkDisconnectedCount(Client *c) { 01065 c->counterNetworkDisconnected = 0; 01066 } 01067 01068
Generated on Tue Jul 12 2022 14:16:20 by 1.7.2