A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
Diff: MQTTThreadedClient.h
- Revision:
- 26:4b21de8043a5
- Parent:
- 25:326f00faa092
- Child:
- 27:c90092f35d79
--- a/MQTTThreadedClient.h Mon Mar 27 03:53:18 2017 +0000
+++ b/MQTTThreadedClient.h Mon Mar 27 13:45:26 2017 +0000
@@ -80,8 +80,11 @@
MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL)
: network(aNetwork),
ssl_ca_pem(pem),
+ port((pem != NULL) ? 8883 : 1883),
queue(32 * EVENTS_EVENT_SIZE),
- isConnected(false)
+ isConnected(false),
+ hasSavedSession(false),
+ useTLS(pem != NULL)
{
DRBG_PERS = "mbed TLS MQTT client";
tcpSocket = new TCPSocket();
@@ -98,10 +101,7 @@
}
-
- int connect(const char * host, uint16_t port, MQTTPacket_connectData & options);
- void disconnect();
-
+ void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options);
int publish(PubMessage& message);
void addTopicHandler(const char * topic, void (*function)(MessageData &));
@@ -111,68 +111,20 @@
FP<void,MessageData &> fp;
fp.attach(object, member);
- topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
+ topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp));
}
- int subscribe(const char * topic, QoS qos, void (*function)(MessageData &));
- template<typename T>
- int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) {
- int rc = FAILURE;
- int len = 0;
-
- MQTTString topic = {(char*)topicstr, {0, 0}};
- printf("Subscribing to topic [%s]\r\n", topicstr);
-
- if (!isConnected) {
- printf("Session already connected!!\r\n");
- return rc;
- }
-
- len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
- if (len <= 0) {
- printf("Error serializing subscribe packet ...\r\n");
- return rc;
- }
-
- if ((rc = sendPacket(len)) != SUCCESS) {
- printf("Error sending subscribe packet [%d]\r\n", rc);
- return rc;
- }
- printf("Waiting for subscription ack ...\r\n");
- // Wait for SUBACK, dropping packets read along the way ...
- if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
- int count = 0, grantedQoS = -1;
- unsigned short mypacketid;
- if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
- rc = grantedQoS; // 0, 1, 2 or 0x80
- // For as long as we do not get 0x80 ..
- if (rc != 0x80) {
- // Add message handlers to the map
- FP<void,MessageData &> fp;
- fp.attach(object, member);
-
- topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
-
- // Reset connection timers here ...
- resetConnectionTimer();
- printf("Successfully subscribed to %s ...\r\n", topicstr);
- rc = SUCCESS;
- }
- } else
- {
- printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
- rc = FAILURE;
- }
-
- return rc;
- }
-
- // the listener thread ...
+ // Start the listener thread and start polling
+ // MQTT server.
void startListener();
+ // Stop the listerner thread and closes connection
+ void stopListener();
protected:
- int connect(MQTTPacket_connectData& options);
+
int handlePublishMsg();
+ void disconnect();
+ int connect();
private:
@@ -186,11 +138,10 @@
std::string host;
uint16_t port;
MQTTPacket_connectData connect_options;
-
-
// Event queue
EventQueue queue;
bool isConnected;
+ bool hasSavedSession;
// TODO: Because I'm using a map, I can only have one handler
// for each topic (one that's mapped to the topic string).
@@ -198,7 +149,6 @@
// In the future, use a vector instead of maps to allow multiple
// handlers for the same topic.
std::map<std::string, FP<void, MessageData &> > topicCBMap;
- std::map<std::string, FP<void, MessageData &> > topicCBMap_t;
unsigned char sendbuf[MAX_MQTT_PACKET_SIZE];
unsigned char readbuf[MAX_MQTT_PACKET_SIZE];
@@ -207,6 +157,7 @@
Timer comTimer;
// SSL/TLS functions
+ bool useTLS;
void setupTLS();
int initTLS();
void freeTLS();
@@ -224,7 +175,7 @@
void resetConnectionTimer();
void sendPingRequest();
bool hasConnectionTimedOut();
-
+ int login();
};
#endif
