A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTThreadedClient.cpp Sun Mar 26 04:35:46 2017 +0000 @@ -0,0 +1,564 @@ +#include "mbed.h" +#include "rtos.h" +#include "MQTTThreadedClient.h" + + +static MemoryPool<PubMessage, 16> mpool; +static Queue<PubMessage, 16> mqueue; + +int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) +{ + int rc; + + if (tcpSocket == NULL) + return -1; + + // non-blocking socket ... + tcpSocket->set_timeout(timeout); + rc = tcpSocket->recv( (void *) buffer, size); + + // return 0 bytes if timeout ... + if (NSAPI_ERROR_WOULD_BLOCK == rc) + return TIMEOUT; + else + return rc; // return the number of bytes received or error +} + +int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) +{ + int rc; + + if (tcpSocket == NULL) + return -1; + + // set the write timeout + tcpSocket->set_timeout(timeout); + rc = tcpSocket->send(buffer, size); + + if ( NSAPI_ERROR_WOULD_BLOCK == rc) + return TIMEOUT; + else + return rc; +} + +int MQTTThreadedClient::readPacketLength(int* value) +{ + int rc = MQTTPACKET_READ_ERROR; + unsigned char c; + int multiplier = 1; + int len = 0; + const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; + + *value = 0; + do + { + if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) + { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + goto exit; + } + + rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); + if (rc != 1) + { + rc = MQTTPACKET_READ_ERROR; + goto exit; + } + + *value += (c & 127) * multiplier; + multiplier *= 128; + } while ((c & 128) != 0); + + rc = MQTTPACKET_READ_COMPLETE; + +exit: + if (rc == MQTTPACKET_READ_ERROR ) + len = -1; + + return len; +} + +int MQTTThreadedClient::sendPacket(size_t length) +{ + int rc = FAILURE; + int sent = 0; + + while (sent < length) + { + rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); + if (rc < 0) // there was an error writing the data + break; + sent += rc; + } + + if (sent == length) + rc = SUCCESS; + else + rc = FAILURE; + + return rc; +} +/** + * Reads the entire packet to readbuf and returns + * the type of packet when successful, otherwise + * a negative error code is returned. + **/ +int MQTTThreadedClient::readPacket() +{ + int rc = FAILURE; + MQTTHeader header = {0}; + int len = 0; + int rem_len = 0; + + /* 1. read the header byte. This has the packet type in it */ + if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) + goto exit; + + len = 1; + /* 2. read the remaining length. This is variable in itself */ + if ( readPacketLength(&rem_len) < 0 ) + goto exit; + + len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ + + if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) + { + rc = BUFFER_OVERFLOW; + goto exit; + } + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ + if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) + goto exit; + + // Convert the header to type + // and update rc + header.byte = readbuf[0]; + rc = header.bits.type; + +exit: + + return rc; +} + +/** + * Read until a specified packet type is received, or untill the specified + * timeout dropping packets along the way. + **/ +int MQTTThreadedClient::readUntil(int packetType, int timeout) +{ + int pType = FAILURE; + Timer timer; + + timer.start(); + do { + pType = readPacket(); + if (pType < 0) + break; + + if (timer.read_ms() > timeout) + { + pType = FAILURE; + break; + } + }while(pType != packetType); + + return pType; +} + + +int MQTTThreadedClient::connect(MQTTPacket_connectData& options) +{ + int rc = FAILURE; + int len = 0; + + if (isConnected) + { + printf("Session already connected! \r\n"); + return rc; + } + + // Copy the keepAliveInterval value to local + // MQTT specifies in seconds, we have to multiply that + // amount for our 32 bit timers which accepts ms. + keepAliveInterval = (options.keepAliveInterval * 1000); + + printf("Connecting with: \r\n"); + printf("\tUsername: [%s]\r\n", options.username.cstring); + printf("\tPassword: [%s]\r\n", options.password.cstring); + + if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) + { + printf("Error serializing connect packet ...\r\n"); + return rc; + } + if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet + { + printf("Error sending the connect request packet ...\r\n"); + return rc; + } + + // Wait for the CONNACK + if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) + { + unsigned char connack_rc = 255; + bool sessionPresent = false; + printf("Connection acknowledgement received ... deserializing respones ...\r\n"); + if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) + rc = connack_rc; + else + rc = FAILURE; + } + else + rc = FAILURE; + + if (rc == SUCCESS) + { + printf("Connected!!! ... starting connection timers ...\r\n"); + isConnected = true; + resetConnectionTimer(); + }else + { + // TODO: Call socket->disconnect()? + } + + printf("Returning with rc = %d\r\n", rc); + + return rc; +} + +int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options) +{ + int ret; + + tcpSocket->open(network); + if (( ret = tcpSocket->connect(host, port)) < 0 ) + { + + printf("Error connecting to %s:%d with %d\r\n", host, port, ret); + return ret; + } + + return connect(options); +} + +int MQTTThreadedClient::publish(PubMessage& msg) +{ +#if 0 + int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); + // TODO: handle id values when the function is called later + if (id == 0) + return FAILURE; + else + return SUCCESS; +#endif + PubMessage *message = mpool.alloc(); + // Simple copy + *message = msg; + + // Push the data to the thread + printf("Pushing data to consumer thread ...\r\n"); + mqueue.put(message); + + return SUCCESS; +} + +int MQTTThreadedClient::sendPublish(PubMessage& message) +{ + MQTTString topicString = MQTTString_initializer; + + if (!isConnected) + { + printf("Not connected!!! ...\r\n"); + return FAILURE; + } + + topicString.cstring = (char*) &message.topic[0]; + int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, + topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); + if (len <= 0) + { + printf("Failed serializing message ...\r\n"); + return FAILURE; + } + + if (sendPacket(len) == SUCCESS) + { + printf("Successfully sent publish packet to server ...\r\n"); + return SUCCESS; + } + + printf("Failed to send publish packet to server ...\r\n"); + return FAILURE; +} + +int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &)) +{ + int rc = FAILURE; + int len = 0; + + MQTTString topic = {(char*)topicstr, {0, 0}}; + printf("Subscribing to topic [%s]\r\n", topicstr); + + if (!isConnected) + { + printf("Session already connected!!\r\n"); + return rc; + } + + len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); + if (len <= 0) + { + printf("Error serializing subscribe packet ...\r\n"); + return rc; + } + + if ((rc = sendPacket(len)) != SUCCESS) + { + printf("Error sending subscribe packet [%d]\r\n", rc); + return rc; + } + + printf("Waiting for subscription ack ...\r\n"); + // Wait for SUBACK, dropping packets read along the way ... + if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback + { + int count = 0, grantedQoS = -1; + unsigned short mypacketid; + if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) + rc = grantedQoS; // 0, 1, 2 or 0x80 + // For as long as we do not get 0x80 .. + if (rc != 0x80) + { + // Add message handlers to the map + FP<void,MessageData &> fp; + fp.attach(function); + + topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); + + // Reset connection timers here ... + resetConnectionTimer(); + + printf("Successfully subscribed to %s ...\r\n", topicstr); + rc = SUCCESS; + }else + { + printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr); + } + } + else + { + printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); + rc = FAILURE; + } + + return rc; + +} + + +bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) +{ + char* curf = topicFilter; + char* curn = topicName.lenstring.data; + char* curn_end = curn + topicName.lenstring.len; + + while (*curf && curn < curn_end) + { + if (*curn == '/' && *curf != '/') + break; + if (*curf != '+' && *curf != '#' && *curf != *curn) + break; + if (*curf == '+') + { // skip until we meet the next separator, or end of string + char* nextpos = curn + 1; + while (nextpos < curn_end && *nextpos != '/') + nextpos = ++curn + 1; + } + else if (*curf == '#') + curn = curn_end - 1; // skip until end of string + curf++; + curn++; + }; + + return (curn == curn_end) && (*curf == '\0'); +} + +int MQTTThreadedClient::handlePublishMsg() +{ + MQTTString topicName = MQTTString_initializer; + Message msg; + int intQoS; + printf("Deserializing publish message ...\r\n"); + if (MQTTDeserialize_publish((unsigned char*)&msg.dup, + &intQoS, + (unsigned char*)&msg.retained, + (unsigned short*)&msg.id, + &topicName, + (unsigned char**)&msg.payload, + (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) + { + printf("Error deserializing published message ...\r\n"); + return -1; + } + + std::string topic; + if (topicName.lenstring.len > 0) + { + topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); + }else + topic = (const char *) topicName.cstring; + + printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); + + msg.qos = (QoS) intQoS; + + + // Call the handlers for each topic + if (topicCBMap.find(topic) != topicCBMap.end()) + { + // Call the callback function + if (topicCBMap[topic].attached()) + { + printf("Invoking function handler for topic ...\r\n"); + MessageData md(topicName, msg); + topicCBMap[topic](md); + + return 1; + } + } + + // TODO: depending on the QoS + // we send data to the server = PUBACK or PUBREC + switch(intQoS) + { + case QOS0: + // We send back nothing ... + break; + case QOS1: + // TODO: implement + break; + case QOS2: + // TODO: implement + break; + default: + break; + } + + return 0; +} + +void MQTTThreadedClient::resetConnectionTimer() +{ + if (keepAliveInterval > 0) + { + comTimer.reset(); + comTimer.start(); + } +} + +bool MQTTThreadedClient::hasConnectionTimedOut() +{ + if (keepAliveInterval > 0 ) { + // Check connection timer + if (comTimer.read_ms() > keepAliveInterval) + return true; + else + return false; + } + + return false; +} + +void MQTTThreadedClient::sendPingRequest() +{ + int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); + if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet + { + printf("Ping request sent successfully ...\r\n"); + } +} + +void MQTTThreadedClient::startListener() +{ + int pType; + // Continuesly listens for packets and dispatch + // message handlers ... + while(true) + { + pType = readPacket(); + switch(pType) + { + case TIMEOUT: + // No data available from the network ... + break; + case FAILURE: + case BUFFER_OVERFLOW: + { + // TODO: Network error, do we disconnect and reconnect? + printf("Failure or buffer overflow problem ... \r\n"); + MBED_ASSERT(false); + } + break; + /** + * The rest of the return codes below (all positive) is about MQTT + * response codes + **/ + case CONNACK: + case PUBACK: + case SUBACK: + break; + case PUBLISH: + { + printf("Publish received!....\r\n"); + // We receive data from the MQTT server .. + if (handlePublishMsg() < 0) + { + printf("Error handling PUBLISH message ... \r\n"); + break; + } + } + break; + case PINGRESP: + { + printf("Got ping response ...\r\n"); + resetConnectionTimer(); + } + break; + default: + printf("Unknown/Not handled message from server pType[%d]\r\n", pType); + } + + // Check if its time to send a keepAlive packet + if (hasConnectionTimedOut()) + { + // Queue the ping request so that other + // pending operations queued above will go first + queue.call(this, &MQTTThreadedClient::sendPingRequest); + } + + // Check if we have messages on the message queue + osEvent evt = mqueue.get(10); + if (evt.status == osEventMessage) { + + printf("Got message to publish! ... \r\n"); + + // Unpack the message + PubMessage * message = (PubMessage *)evt.value.p; + + // Send the packet, do not queue the call + // like the ping above .. + if ( sendPublish(*message) == SUCCESS) + // Reset timers if we have been able to send successfully + resetConnectionTimer(); + + // Free the message from mempool after using + mpool.free(message); + } + + // Dispatch any queued events ... + queue.dispatch(100); + } + +} + \ No newline at end of file