A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.cpp
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/MQTTThreadedClient.cpp Sun Mar 26 04:35:46 2017 +0000
@@ -0,0 +1,564 @@
+#include "mbed.h"
+#include "rtos.h"
+#include "MQTTThreadedClient.h"
+
+
+static MemoryPool<PubMessage, 16> mpool;
+static Queue<PubMessage, 16> mqueue;
+
+int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ // non-blocking socket ...
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->recv( (void *) buffer, size);
+
+ // return 0 bytes if timeout ...
+ if (NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc; // return the number of bytes received or error
+}
+
+int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
+{
+ int rc;
+
+ if (tcpSocket == NULL)
+ return -1;
+
+ // set the write timeout
+ tcpSocket->set_timeout(timeout);
+ rc = tcpSocket->send(buffer, size);
+
+ if ( NSAPI_ERROR_WOULD_BLOCK == rc)
+ return TIMEOUT;
+ else
+ return rc;
+}
+
+int MQTTThreadedClient::readPacketLength(int* value)
+{
+ int rc = MQTTPACKET_READ_ERROR;
+ unsigned char c;
+ int multiplier = 1;
+ int len = 0;
+ const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
+
+ *value = 0;
+ do
+ {
+ if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
+ {
+ rc = MQTTPACKET_READ_ERROR; /* bad data */
+ goto exit;
+ }
+
+ rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
+ if (rc != 1)
+ {
+ rc = MQTTPACKET_READ_ERROR;
+ goto exit;
+ }
+
+ *value += (c & 127) * multiplier;
+ multiplier *= 128;
+ } while ((c & 128) != 0);
+
+ rc = MQTTPACKET_READ_COMPLETE;
+
+exit:
+ if (rc == MQTTPACKET_READ_ERROR )
+ len = -1;
+
+ return len;
+}
+
+int MQTTThreadedClient::sendPacket(size_t length)
+{
+ int rc = FAILURE;
+ int sent = 0;
+
+ while (sent < length)
+ {
+ rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
+ if (rc < 0) // there was an error writing the data
+ break;
+ sent += rc;
+ }
+
+ if (sent == length)
+ rc = SUCCESS;
+ else
+ rc = FAILURE;
+
+ return rc;
+}
+/**
+ * Reads the entire packet to readbuf and returns
+ * the type of packet when successful, otherwise
+ * a negative error code is returned.
+ **/
+int MQTTThreadedClient::readPacket()
+{
+ int rc = FAILURE;
+ MQTTHeader header = {0};
+ int len = 0;
+ int rem_len = 0;
+
+ /* 1. read the header byte. This has the packet type in it */
+ if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
+ goto exit;
+
+ len = 1;
+ /* 2. read the remaining length. This is variable in itself */
+ if ( readPacketLength(&rem_len) < 0 )
+ goto exit;
+
+ len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
+
+ if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
+ {
+ rc = BUFFER_OVERFLOW;
+ goto exit;
+ }
+
+ /* 3. read the rest of the buffer using a callback to supply the rest of the data */
+ if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
+ goto exit;
+
+ // Convert the header to type
+ // and update rc
+ header.byte = readbuf[0];
+ rc = header.bits.type;
+
+exit:
+
+ return rc;
+}
+
+/**
+ * Read until a specified packet type is received, or untill the specified
+ * timeout dropping packets along the way.
+ **/
+int MQTTThreadedClient::readUntil(int packetType, int timeout)
+{
+ int pType = FAILURE;
+ Timer timer;
+
+ timer.start();
+ do {
+ pType = readPacket();
+ if (pType < 0)
+ break;
+
+ if (timer.read_ms() > timeout)
+ {
+ pType = FAILURE;
+ break;
+ }
+ }while(pType != packetType);
+
+ return pType;
+}
+
+
+int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
+{
+ int rc = FAILURE;
+ int len = 0;
+
+ if (isConnected)
+ {
+ printf("Session already connected! \r\n");
+ return rc;
+ }
+
+ // Copy the keepAliveInterval value to local
+ // MQTT specifies in seconds, we have to multiply that
+ // amount for our 32 bit timers which accepts ms.
+ keepAliveInterval = (options.keepAliveInterval * 1000);
+
+ printf("Connecting with: \r\n");
+ printf("\tUsername: [%s]\r\n", options.username.cstring);
+ printf("\tPassword: [%s]\r\n", options.password.cstring);
+
+ if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
+ {
+ printf("Error serializing connect packet ...\r\n");
+ return rc;
+ }
+ if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
+ {
+ printf("Error sending the connect request packet ...\r\n");
+ return rc;
+ }
+
+ // Wait for the CONNACK
+ if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
+ {
+ unsigned char connack_rc = 255;
+ bool sessionPresent = false;
+ printf("Connection acknowledgement received ... deserializing respones ...\r\n");
+ if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = connack_rc;
+ else
+ rc = FAILURE;
+ }
+ else
+ rc = FAILURE;
+
+ if (rc == SUCCESS)
+ {
+ printf("Connected!!! ... starting connection timers ...\r\n");
+ isConnected = true;
+ resetConnectionTimer();
+ }else
+ {
+ // TODO: Call socket->disconnect()?
+ }
+
+ printf("Returning with rc = %d\r\n", rc);
+
+ return rc;
+}
+
+int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options)
+{
+ int ret;
+
+ tcpSocket->open(network);
+ if (( ret = tcpSocket->connect(host, port)) < 0 )
+ {
+
+ printf("Error connecting to %s:%d with %d\r\n", host, port, ret);
+ return ret;
+ }
+
+ return connect(options);
+}
+
+int MQTTThreadedClient::publish(PubMessage& msg)
+{
+#if 0
+ int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
+ // TODO: handle id values when the function is called later
+ if (id == 0)
+ return FAILURE;
+ else
+ return SUCCESS;
+#endif
+ PubMessage *message = mpool.alloc();
+ // Simple copy
+ *message = msg;
+
+ // Push the data to the thread
+ printf("Pushing data to consumer thread ...\r\n");
+ mqueue.put(message);
+
+ return SUCCESS;
+}
+
+int MQTTThreadedClient::sendPublish(PubMessage& message)
+{
+ MQTTString topicString = MQTTString_initializer;
+
+ if (!isConnected)
+ {
+ printf("Not connected!!! ...\r\n");
+ return FAILURE;
+ }
+
+ topicString.cstring = (char*) &message.topic[0];
+ int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
+ topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
+ if (len <= 0)
+ {
+ printf("Failed serializing message ...\r\n");
+ return FAILURE;
+ }
+
+ if (sendPacket(len) == SUCCESS)
+ {
+ printf("Successfully sent publish packet to server ...\r\n");
+ return SUCCESS;
+ }
+
+ printf("Failed to send publish packet to server ...\r\n");
+ return FAILURE;
+}
+
+int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
+{
+ int rc = FAILURE;
+ int len = 0;
+
+ MQTTString topic = {(char*)topicstr, {0, 0}};
+ printf("Subscribing to topic [%s]\r\n", topicstr);
+
+ if (!isConnected)
+ {
+ printf("Session already connected!!\r\n");
+ return rc;
+ }
+
+ len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
+ if (len <= 0)
+ {
+ printf("Error serializing subscribe packet ...\r\n");
+ return rc;
+ }
+
+ if ((rc = sendPacket(len)) != SUCCESS)
+ {
+ printf("Error sending subscribe packet [%d]\r\n", rc);
+ return rc;
+ }
+
+ printf("Waiting for subscription ack ...\r\n");
+ // Wait for SUBACK, dropping packets read along the way ...
+ if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback
+ {
+ int count = 0, grantedQoS = -1;
+ unsigned short mypacketid;
+ if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
+ rc = grantedQoS; // 0, 1, 2 or 0x80
+ // For as long as we do not get 0x80 ..
+ if (rc != 0x80)
+ {
+ // Add message handlers to the map
+ FP<void,MessageData &> fp;
+ fp.attach(function);
+
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+
+ // Reset connection timers here ...
+ resetConnectionTimer();
+
+ printf("Successfully subscribed to %s ...\r\n", topicstr);
+ rc = SUCCESS;
+ }else
+ {
+ printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
+ }
+ }
+ else
+ {
+ printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
+ rc = FAILURE;
+ }
+
+ return rc;
+
+}
+
+
+bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
+{
+ char* curf = topicFilter;
+ char* curn = topicName.lenstring.data;
+ char* curn_end = curn + topicName.lenstring.len;
+
+ while (*curf && curn < curn_end)
+ {
+ if (*curn == '/' && *curf != '/')
+ break;
+ if (*curf != '+' && *curf != '#' && *curf != *curn)
+ break;
+ if (*curf == '+')
+ { // skip until we meet the next separator, or end of string
+ char* nextpos = curn + 1;
+ while (nextpos < curn_end && *nextpos != '/')
+ nextpos = ++curn + 1;
+ }
+ else if (*curf == '#')
+ curn = curn_end - 1; // skip until end of string
+ curf++;
+ curn++;
+ };
+
+ return (curn == curn_end) && (*curf == '\0');
+}
+
+int MQTTThreadedClient::handlePublishMsg()
+{
+ MQTTString topicName = MQTTString_initializer;
+ Message msg;
+ int intQoS;
+ printf("Deserializing publish message ...\r\n");
+ if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
+ &intQoS,
+ (unsigned char*)&msg.retained,
+ (unsigned short*)&msg.id,
+ &topicName,
+ (unsigned char**)&msg.payload,
+ (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
+ {
+ printf("Error deserializing published message ...\r\n");
+ return -1;
+ }
+
+ std::string topic;
+ if (topicName.lenstring.len > 0)
+ {
+ topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
+ }else
+ topic = (const char *) topicName.cstring;
+
+ printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
+
+ msg.qos = (QoS) intQoS;
+
+
+ // Call the handlers for each topic
+ if (topicCBMap.find(topic) != topicCBMap.end())
+ {
+ // Call the callback function
+ if (topicCBMap[topic].attached())
+ {
+ printf("Invoking function handler for topic ...\r\n");
+ MessageData md(topicName, msg);
+ topicCBMap[topic](md);
+
+ return 1;
+ }
+ }
+
+ // TODO: depending on the QoS
+ // we send data to the server = PUBACK or PUBREC
+ switch(intQoS)
+ {
+ case QOS0:
+ // We send back nothing ...
+ break;
+ case QOS1:
+ // TODO: implement
+ break;
+ case QOS2:
+ // TODO: implement
+ break;
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+void MQTTThreadedClient::resetConnectionTimer()
+{
+ if (keepAliveInterval > 0)
+ {
+ comTimer.reset();
+ comTimer.start();
+ }
+}
+
+bool MQTTThreadedClient::hasConnectionTimedOut()
+{
+ if (keepAliveInterval > 0 ) {
+ // Check connection timer
+ if (comTimer.read_ms() > keepAliveInterval)
+ return true;
+ else
+ return false;
+ }
+
+ return false;
+}
+
+void MQTTThreadedClient::sendPingRequest()
+{
+ int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
+ if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
+ {
+ printf("Ping request sent successfully ...\r\n");
+ }
+}
+
+void MQTTThreadedClient::startListener()
+{
+ int pType;
+ // Continuesly listens for packets and dispatch
+ // message handlers ...
+ while(true)
+ {
+ pType = readPacket();
+ switch(pType)
+ {
+ case TIMEOUT:
+ // No data available from the network ...
+ break;
+ case FAILURE:
+ case BUFFER_OVERFLOW:
+ {
+ // TODO: Network error, do we disconnect and reconnect?
+ printf("Failure or buffer overflow problem ... \r\n");
+ MBED_ASSERT(false);
+ }
+ break;
+ /**
+ * The rest of the return codes below (all positive) is about MQTT
+ * response codes
+ **/
+ case CONNACK:
+ case PUBACK:
+ case SUBACK:
+ break;
+ case PUBLISH:
+ {
+ printf("Publish received!....\r\n");
+ // We receive data from the MQTT server ..
+ if (handlePublishMsg() < 0)
+ {
+ printf("Error handling PUBLISH message ... \r\n");
+ break;
+ }
+ }
+ break;
+ case PINGRESP:
+ {
+ printf("Got ping response ...\r\n");
+ resetConnectionTimer();
+ }
+ break;
+ default:
+ printf("Unknown/Not handled message from server pType[%d]\r\n", pType);
+ }
+
+ // Check if its time to send a keepAlive packet
+ if (hasConnectionTimedOut())
+ {
+ // Queue the ping request so that other
+ // pending operations queued above will go first
+ queue.call(this, &MQTTThreadedClient::sendPingRequest);
+ }
+
+ // Check if we have messages on the message queue
+ osEvent evt = mqueue.get(10);
+ if (evt.status == osEventMessage) {
+
+ printf("Got message to publish! ... \r\n");
+
+ // Unpack the message
+ PubMessage * message = (PubMessage *)evt.value.p;
+
+ // Send the packet, do not queue the call
+ // like the ping above ..
+ if ( sendPublish(*message) == SUCCESS)
+ // Reset timers if we have been able to send successfully
+ resetConnectionTimer();
+
+ // Free the message from mempool after using
+ mpool.free(message);
+ }
+
+ // Dispatch any queued events ...
+ queue.dispatch(100);
+ }
+
+}
+
\ No newline at end of file
