A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 26:4b21de8043a5
- Parent:
- 25:326f00faa092
- Child:
- 27:c90092f35d79
diff -r 326f00faa092 -r 4b21de8043a5 MQTTThreadedClient.h --- a/MQTTThreadedClient.h Mon Mar 27 03:53:18 2017 +0000 +++ b/MQTTThreadedClient.h Mon Mar 27 13:45:26 2017 +0000 @@ -80,8 +80,11 @@ MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) : network(aNetwork), ssl_ca_pem(pem), + port((pem != NULL) ? 8883 : 1883), queue(32 * EVENTS_EVENT_SIZE), - isConnected(false) + isConnected(false), + hasSavedSession(false), + useTLS(pem != NULL) { DRBG_PERS = "mbed TLS MQTT client"; tcpSocket = new TCPSocket(); @@ -98,10 +101,7 @@ } - - int connect(const char * host, uint16_t port, MQTTPacket_connectData & options); - void disconnect(); - + void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); int publish(PubMessage& message); void addTopicHandler(const char * topic, void (*function)(MessageData &)); @@ -111,68 +111,20 @@ FP<void,MessageData &> fp; fp.attach(object, member); - topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); + topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); } - int subscribe(const char * topic, QoS qos, void (*function)(MessageData &)); - template<typename T> - int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) { - int rc = FAILURE; - int len = 0; - - MQTTString topic = {(char*)topicstr, {0, 0}}; - printf("Subscribing to topic [%s]\r\n", topicstr); - - if (!isConnected) { - printf("Session already connected!!\r\n"); - return rc; - } - - len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); - if (len <= 0) { - printf("Error serializing subscribe packet ...\r\n"); - return rc; - } - - if ((rc = sendPacket(len)) != SUCCESS) { - printf("Error sending subscribe packet [%d]\r\n", rc); - return rc; - } - printf("Waiting for subscription ack ...\r\n"); - // Wait for SUBACK, dropping packets read along the way ... - if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback - int count = 0, grantedQoS = -1; - unsigned short mypacketid; - if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) - rc = grantedQoS; // 0, 1, 2 or 0x80 - // For as long as we do not get 0x80 .. - if (rc != 0x80) { - // Add message handlers to the map - FP<void,MessageData &> fp; - fp.attach(object, member); - - topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); - - // Reset connection timers here ... - resetConnectionTimer(); - printf("Successfully subscribed to %s ...\r\n", topicstr); - rc = SUCCESS; - } - } else - { - printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); - rc = FAILURE; - } - - return rc; - } - - // the listener thread ... + // Start the listener thread and start polling + // MQTT server. void startListener(); + // Stop the listerner thread and closes connection + void stopListener(); protected: - int connect(MQTTPacket_connectData& options); + int handlePublishMsg(); + void disconnect(); + int connect(); private: @@ -186,11 +138,10 @@ std::string host; uint16_t port; MQTTPacket_connectData connect_options; - - // Event queue EventQueue queue; bool isConnected; + bool hasSavedSession; // TODO: Because I'm using a map, I can only have one handler // for each topic (one that's mapped to the topic string). @@ -198,7 +149,6 @@ // In the future, use a vector instead of maps to allow multiple // handlers for the same topic. std::map<std::string, FP<void, MessageData &> > topicCBMap; - std::map<std::string, FP<void, MessageData &> > topicCBMap_t; unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; @@ -207,6 +157,7 @@ Timer comTimer; // SSL/TLS functions + bool useTLS; void setupTLS(); int initTLS(); void freeTLS(); @@ -224,7 +175,7 @@ void resetConnectionTimer(); void sendPingRequest(); bool hasConnectionTimedOut(); - + int login(); }; #endif