Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.h@25:326f00faa092, 2017-03-27 (annotated)
- Committer:
- vpcola
- Date:
- Mon Mar 27 03:53:18 2017 +0000
- Revision:
- 25:326f00faa092
- Parent:
- 23:06fac173529e
- Child:
- 26:4b21de8043a5
Added SSL/TLS code
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| vpcola | 23:06fac173529e | 1 | #ifndef _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 2 | #define _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 3 | |
| vpcola | 23:06fac173529e | 4 | #include "mbed.h" |
| vpcola | 23:06fac173529e | 5 | #include "rtos.h" |
| vpcola | 23:06fac173529e | 6 | #include "MQTTPacket.h" |
| vpcola | 23:06fac173529e | 7 | #include "NetworkInterface.h" |
| vpcola | 23:06fac173529e | 8 | #include "FP.h" |
| vpcola | 23:06fac173529e | 9 | |
| vpcola | 25:326f00faa092 | 10 | |
| vpcola | 23:06fac173529e | 11 | #include <cstdio> |
| vpcola | 23:06fac173529e | 12 | #include <string> |
| vpcola | 23:06fac173529e | 13 | #include <map> |
| vpcola | 23:06fac173529e | 14 | |
| vpcola | 23:06fac173529e | 15 | #define COMMAND_TIMEOUT 5000 |
| vpcola | 23:06fac173529e | 16 | #define DEFAULT_SOCKET_TIMEOUT 1000 |
| vpcola | 23:06fac173529e | 17 | #define MAX_MQTT_PACKET_SIZE 200 |
| vpcola | 23:06fac173529e | 18 | #define MAX_MQTT_PAYLOAD_SIZE 100 |
| vpcola | 23:06fac173529e | 19 | |
| vpcola | 23:06fac173529e | 20 | typedef enum { QOS0, QOS1, QOS2 } QoS; |
| vpcola | 23:06fac173529e | 21 | |
| vpcola | 23:06fac173529e | 22 | // all failure return codes must be negative |
| vpcola | 23:06fac173529e | 23 | typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; |
| vpcola | 23:06fac173529e | 24 | |
| vpcola | 23:06fac173529e | 25 | |
| vpcola | 23:06fac173529e | 26 | typedef struct |
| vpcola | 23:06fac173529e | 27 | { |
| vpcola | 23:06fac173529e | 28 | QoS qos; |
| vpcola | 23:06fac173529e | 29 | bool retained; |
| vpcola | 23:06fac173529e | 30 | bool dup; |
| vpcola | 23:06fac173529e | 31 | unsigned short id; |
| vpcola | 23:06fac173529e | 32 | void *payload; |
| vpcola | 23:06fac173529e | 33 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 34 | }Message, *pMessage; |
| vpcola | 23:06fac173529e | 35 | |
| vpcola | 23:06fac173529e | 36 | // TODO: |
| vpcola | 23:06fac173529e | 37 | // Merge this struct with the one above, in order to use the same |
| vpcola | 23:06fac173529e | 38 | // data structure for sending and receiving. I need to simplify |
| vpcola | 23:06fac173529e | 39 | // the PubMessage to not contain pointers like the one above. |
| vpcola | 23:06fac173529e | 40 | typedef struct |
| vpcola | 23:06fac173529e | 41 | { |
| vpcola | 23:06fac173529e | 42 | char topic[100]; |
| vpcola | 23:06fac173529e | 43 | QoS qos; |
| vpcola | 23:06fac173529e | 44 | unsigned short id; |
| vpcola | 23:06fac173529e | 45 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 46 | char payload[MAX_MQTT_PAYLOAD_SIZE]; |
| vpcola | 23:06fac173529e | 47 | }PubMessage, *pPubMessage; |
| vpcola | 23:06fac173529e | 48 | |
| vpcola | 23:06fac173529e | 49 | struct MessageData |
| vpcola | 23:06fac173529e | 50 | { |
| vpcola | 23:06fac173529e | 51 | MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) |
| vpcola | 23:06fac173529e | 52 | { } |
| vpcola | 23:06fac173529e | 53 | Message &message; |
| vpcola | 23:06fac173529e | 54 | MQTTString &topicName; |
| vpcola | 23:06fac173529e | 55 | }; |
| vpcola | 23:06fac173529e | 56 | |
| vpcola | 23:06fac173529e | 57 | class PacketId |
| vpcola | 23:06fac173529e | 58 | { |
| vpcola | 23:06fac173529e | 59 | public: |
| vpcola | 23:06fac173529e | 60 | PacketId() |
| vpcola | 23:06fac173529e | 61 | { |
| vpcola | 23:06fac173529e | 62 | next = 0; |
| vpcola | 23:06fac173529e | 63 | } |
| vpcola | 23:06fac173529e | 64 | |
| vpcola | 23:06fac173529e | 65 | int getNext() |
| vpcola | 23:06fac173529e | 66 | { |
| vpcola | 23:06fac173529e | 67 | return next = (next == MAX_PACKET_ID) ? 1 : ++next; |
| vpcola | 23:06fac173529e | 68 | } |
| vpcola | 23:06fac173529e | 69 | |
| vpcola | 23:06fac173529e | 70 | private: |
| vpcola | 23:06fac173529e | 71 | static const int MAX_PACKET_ID = 65535; |
| vpcola | 23:06fac173529e | 72 | int next; |
| vpcola | 23:06fac173529e | 73 | }; |
| vpcola | 23:06fac173529e | 74 | |
| vpcola | 23:06fac173529e | 75 | |
| vpcola | 23:06fac173529e | 76 | |
| vpcola | 23:06fac173529e | 77 | class MQTTThreadedClient |
| vpcola | 23:06fac173529e | 78 | { |
| vpcola | 23:06fac173529e | 79 | public: |
| vpcola | 25:326f00faa092 | 80 | MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) |
| vpcola | 23:06fac173529e | 81 | : network(aNetwork), |
| vpcola | 25:326f00faa092 | 82 | ssl_ca_pem(pem), |
| vpcola | 23:06fac173529e | 83 | queue(32 * EVENTS_EVENT_SIZE), |
| vpcola | 25:326f00faa092 | 84 | isConnected(false) |
| vpcola | 25:326f00faa092 | 85 | { |
| vpcola | 25:326f00faa092 | 86 | DRBG_PERS = "mbed TLS MQTT client"; |
| vpcola | 23:06fac173529e | 87 | tcpSocket = new TCPSocket(); |
| vpcola | 25:326f00faa092 | 88 | setupTLS(); |
| vpcola | 25:326f00faa092 | 89 | } |
| vpcola | 25:326f00faa092 | 90 | |
| vpcola | 25:326f00faa092 | 91 | ~MQTTThreadedClient() |
| vpcola | 25:326f00faa092 | 92 | { |
| vpcola | 25:326f00faa092 | 93 | // TODO: signal the thread to shutdown |
| vpcola | 25:326f00faa092 | 94 | freeTLS(); |
| vpcola | 25:326f00faa092 | 95 | |
| vpcola | 25:326f00faa092 | 96 | if (isConnected) |
| vpcola | 25:326f00faa092 | 97 | disconnect(); |
| vpcola | 25:326f00faa092 | 98 | |
| vpcola | 23:06fac173529e | 99 | } |
| vpcola | 23:06fac173529e | 100 | |
| vpcola | 25:326f00faa092 | 101 | |
| vpcola | 23:06fac173529e | 102 | int connect(const char * host, uint16_t port, MQTTPacket_connectData & options); |
| vpcola | 25:326f00faa092 | 103 | void disconnect(); |
| vpcola | 25:326f00faa092 | 104 | |
| vpcola | 23:06fac173529e | 105 | int publish(PubMessage& message); |
| vpcola | 25:326f00faa092 | 106 | |
| vpcola | 25:326f00faa092 | 107 | void addTopicHandler(const char * topic, void (*function)(MessageData &)); |
| vpcola | 25:326f00faa092 | 108 | template<typename T> |
| vpcola | 25:326f00faa092 | 109 | void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) |
| vpcola | 25:326f00faa092 | 110 | { |
| vpcola | 25:326f00faa092 | 111 | FP<void,MessageData &> fp; |
| vpcola | 25:326f00faa092 | 112 | fp.attach(object, member); |
| vpcola | 25:326f00faa092 | 113 | |
| vpcola | 25:326f00faa092 | 114 | topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
| vpcola | 25:326f00faa092 | 115 | } |
| vpcola | 25:326f00faa092 | 116 | |
| vpcola | 23:06fac173529e | 117 | int subscribe(const char * topic, QoS qos, void (*function)(MessageData &)); |
| vpcola | 23:06fac173529e | 118 | template<typename T> |
| vpcola | 23:06fac173529e | 119 | int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) { |
| vpcola | 23:06fac173529e | 120 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 121 | int len = 0; |
| vpcola | 23:06fac173529e | 122 | |
| vpcola | 23:06fac173529e | 123 | MQTTString topic = {(char*)topicstr, {0, 0}}; |
| vpcola | 25:326f00faa092 | 124 | printf("Subscribing to topic [%s]\r\n", topicstr); |
| vpcola | 25:326f00faa092 | 125 | |
| vpcola | 23:06fac173529e | 126 | if (!isConnected) { |
| vpcola | 23:06fac173529e | 127 | printf("Session already connected!!\r\n"); |
| vpcola | 23:06fac173529e | 128 | return rc; |
| vpcola | 23:06fac173529e | 129 | } |
| vpcola | 23:06fac173529e | 130 | |
| vpcola | 23:06fac173529e | 131 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
| vpcola | 23:06fac173529e | 132 | if (len <= 0) { |
| vpcola | 23:06fac173529e | 133 | printf("Error serializing subscribe packet ...\r\n"); |
| vpcola | 23:06fac173529e | 134 | return rc; |
| vpcola | 23:06fac173529e | 135 | } |
| vpcola | 23:06fac173529e | 136 | |
| vpcola | 23:06fac173529e | 137 | if ((rc = sendPacket(len)) != SUCCESS) { |
| vpcola | 23:06fac173529e | 138 | printf("Error sending subscribe packet [%d]\r\n", rc); |
| vpcola | 23:06fac173529e | 139 | return rc; |
| vpcola | 23:06fac173529e | 140 | } |
| vpcola | 25:326f00faa092 | 141 | printf("Waiting for subscription ack ...\r\n"); |
| vpcola | 23:06fac173529e | 142 | // Wait for SUBACK, dropping packets read along the way ... |
| vpcola | 23:06fac173529e | 143 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
| vpcola | 23:06fac173529e | 144 | int count = 0, grantedQoS = -1; |
| vpcola | 23:06fac173529e | 145 | unsigned short mypacketid; |
| vpcola | 23:06fac173529e | 146 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 147 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
| vpcola | 23:06fac173529e | 148 | // For as long as we do not get 0x80 .. |
| vpcola | 23:06fac173529e | 149 | if (rc != 0x80) { |
| vpcola | 23:06fac173529e | 150 | // Add message handlers to the map |
| vpcola | 23:06fac173529e | 151 | FP<void,MessageData &> fp; |
| vpcola | 23:06fac173529e | 152 | fp.attach(object, member); |
| vpcola | 23:06fac173529e | 153 | |
| vpcola | 23:06fac173529e | 154 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
| vpcola | 23:06fac173529e | 155 | |
| vpcola | 23:06fac173529e | 156 | // Reset connection timers here ... |
| vpcola | 23:06fac173529e | 157 | resetConnectionTimer(); |
| vpcola | 25:326f00faa092 | 158 | printf("Successfully subscribed to %s ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 159 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 160 | } |
| vpcola | 23:06fac173529e | 161 | } else |
| vpcola | 25:326f00faa092 | 162 | { |
| vpcola | 25:326f00faa092 | 163 | printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 164 | rc = FAILURE; |
| vpcola | 25:326f00faa092 | 165 | } |
| vpcola | 23:06fac173529e | 166 | |
| vpcola | 23:06fac173529e | 167 | return rc; |
| vpcola | 23:06fac173529e | 168 | } |
| vpcola | 23:06fac173529e | 169 | |
| vpcola | 23:06fac173529e | 170 | // the listener thread ... |
| vpcola | 23:06fac173529e | 171 | void startListener(); |
| vpcola | 23:06fac173529e | 172 | |
| vpcola | 23:06fac173529e | 173 | protected: |
| vpcola | 23:06fac173529e | 174 | int connect(MQTTPacket_connectData& options); |
| vpcola | 23:06fac173529e | 175 | int handlePublishMsg(); |
| vpcola | 23:06fac173529e | 176 | |
| vpcola | 23:06fac173529e | 177 | |
| vpcola | 23:06fac173529e | 178 | private: |
| vpcola | 23:06fac173529e | 179 | NetworkInterface * network; |
| vpcola | 25:326f00faa092 | 180 | const char * ssl_ca_pem; |
| vpcola | 23:06fac173529e | 181 | TCPSocket * tcpSocket; |
| vpcola | 23:06fac173529e | 182 | PacketId packetid; |
| vpcola | 25:326f00faa092 | 183 | const char *DRBG_PERS; |
| vpcola | 25:326f00faa092 | 184 | nsapi_error_t _error; |
| vpcola | 25:326f00faa092 | 185 | // Connection options |
| vpcola | 25:326f00faa092 | 186 | std::string host; |
| vpcola | 25:326f00faa092 | 187 | uint16_t port; |
| vpcola | 25:326f00faa092 | 188 | MQTTPacket_connectData connect_options; |
| vpcola | 25:326f00faa092 | 189 | |
| vpcola | 25:326f00faa092 | 190 | |
| vpcola | 23:06fac173529e | 191 | // Event queue |
| vpcola | 23:06fac173529e | 192 | EventQueue queue; |
| vpcola | 23:06fac173529e | 193 | bool isConnected; |
| vpcola | 23:06fac173529e | 194 | |
| vpcola | 23:06fac173529e | 195 | // TODO: Because I'm using a map, I can only have one handler |
| vpcola | 23:06fac173529e | 196 | // for each topic (one that's mapped to the topic string). |
| vpcola | 23:06fac173529e | 197 | // Attaching another handler on the same topic is not possible. |
| vpcola | 23:06fac173529e | 198 | // In the future, use a vector instead of maps to allow multiple |
| vpcola | 23:06fac173529e | 199 | // handlers for the same topic. |
| vpcola | 23:06fac173529e | 200 | std::map<std::string, FP<void, MessageData &> > topicCBMap; |
| vpcola | 25:326f00faa092 | 201 | std::map<std::string, FP<void, MessageData &> > topicCBMap_t; |
| vpcola | 25:326f00faa092 | 202 | |
| vpcola | 23:06fac173529e | 203 | unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 204 | unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 205 | |
| vpcola | 23:06fac173529e | 206 | unsigned int keepAliveInterval; |
| vpcola | 23:06fac173529e | 207 | Timer comTimer; |
| vpcola | 23:06fac173529e | 208 | |
| vpcola | 25:326f00faa092 | 209 | // SSL/TLS functions |
| vpcola | 25:326f00faa092 | 210 | void setupTLS(); |
| vpcola | 25:326f00faa092 | 211 | int initTLS(); |
| vpcola | 25:326f00faa092 | 212 | void freeTLS(); |
| vpcola | 25:326f00faa092 | 213 | int doTLSHandshake(); |
| vpcola | 25:326f00faa092 | 214 | |
| vpcola | 25:326f00faa092 | 215 | int processSubscriptions(); |
| vpcola | 23:06fac173529e | 216 | int readPacket(); |
| vpcola | 23:06fac173529e | 217 | int sendPacket(size_t length); |
| vpcola | 23:06fac173529e | 218 | int readPacketLength(int* value); |
| vpcola | 23:06fac173529e | 219 | int readUntil(int packetType, int timeout); |
| vpcola | 23:06fac173529e | 220 | int readBytesToBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 221 | int sendBytesFromBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 222 | bool isTopicMatched(char* topic, MQTTString& topicName); |
| vpcola | 23:06fac173529e | 223 | int sendPublish(PubMessage& message); |
| vpcola | 23:06fac173529e | 224 | void resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 225 | void sendPingRequest(); |
| vpcola | 23:06fac173529e | 226 | bool hasConnectionTimedOut(); |
| vpcola | 23:06fac173529e | 227 | |
| vpcola | 23:06fac173529e | 228 | }; |
| vpcola | 23:06fac173529e | 229 | |
| vpcola | 23:06fac173529e | 230 | #endif |
