Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.h@34:dc31ab8c2241, 2017-04-04 (annotated)
- Committer:
- vpcola
- Date:
- Tue Apr 04 04:58:49 2017 +0000
- Revision:
- 34:dc31ab8c2241
- Parent:
- 32:16ef25cbb05c
Minor updates
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| vpcola | 23:06fac173529e | 1 | #ifndef _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 2 | #define _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 3 | |
| vpcola | 23:06fac173529e | 4 | #include "mbed.h" |
| vpcola | 23:06fac173529e | 5 | #include "rtos.h" |
| vpcola | 23:06fac173529e | 6 | #include "MQTTPacket.h" |
| vpcola | 23:06fac173529e | 7 | #include "NetworkInterface.h" |
| vpcola | 23:06fac173529e | 8 | #include "FP.h" |
| vpcola | 23:06fac173529e | 9 | |
| vpcola | 32:16ef25cbb05c | 10 | //#define MQTT_DEBUG 1 |
| vpcola | 30:b2aed80037db | 11 | |
| vpcola | 30:b2aed80037db | 12 | #ifdef MQTT_DEBUG |
| vpcola | 30:b2aed80037db | 13 | #define DBG(fmt, args...) printf(fmt, ## args) |
| vpcola | 30:b2aed80037db | 14 | #else |
| vpcola | 30:b2aed80037db | 15 | #define DBG(fmt, args...) /* Don't do anything in release builds */ |
| vpcola | 30:b2aed80037db | 16 | #endif |
| vpcola | 25:326f00faa092 | 17 | |
| vpcola | 23:06fac173529e | 18 | #include <cstdio> |
| vpcola | 23:06fac173529e | 19 | #include <string> |
| vpcola | 23:06fac173529e | 20 | #include <map> |
| vpcola | 23:06fac173529e | 21 | |
| vpcola | 23:06fac173529e | 22 | #define COMMAND_TIMEOUT 5000 |
| vpcola | 23:06fac173529e | 23 | #define DEFAULT_SOCKET_TIMEOUT 1000 |
| vpcola | 23:06fac173529e | 24 | #define MAX_MQTT_PACKET_SIZE 200 |
| vpcola | 23:06fac173529e | 25 | #define MAX_MQTT_PAYLOAD_SIZE 100 |
| vpcola | 23:06fac173529e | 26 | |
| vpcola | 30:b2aed80037db | 27 | namespace MQTT |
| vpcola | 30:b2aed80037db | 28 | { |
| vpcola | 30:b2aed80037db | 29 | |
| vpcola | 23:06fac173529e | 30 | typedef enum { QOS0, QOS1, QOS2 } QoS; |
| vpcola | 23:06fac173529e | 31 | |
| vpcola | 23:06fac173529e | 32 | // all failure return codes must be negative |
| vpcola | 23:06fac173529e | 33 | typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; |
| vpcola | 23:06fac173529e | 34 | |
| vpcola | 23:06fac173529e | 35 | |
| vpcola | 23:06fac173529e | 36 | typedef struct |
| vpcola | 23:06fac173529e | 37 | { |
| vpcola | 23:06fac173529e | 38 | QoS qos; |
| vpcola | 23:06fac173529e | 39 | bool retained; |
| vpcola | 23:06fac173529e | 40 | bool dup; |
| vpcola | 23:06fac173529e | 41 | unsigned short id; |
| vpcola | 23:06fac173529e | 42 | void *payload; |
| vpcola | 23:06fac173529e | 43 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 44 | }Message, *pMessage; |
| vpcola | 23:06fac173529e | 45 | |
| vpcola | 23:06fac173529e | 46 | // TODO: |
| vpcola | 23:06fac173529e | 47 | // Merge this struct with the one above, in order to use the same |
| vpcola | 23:06fac173529e | 48 | // data structure for sending and receiving. I need to simplify |
| vpcola | 23:06fac173529e | 49 | // the PubMessage to not contain pointers like the one above. |
| vpcola | 23:06fac173529e | 50 | typedef struct |
| vpcola | 23:06fac173529e | 51 | { |
| vpcola | 23:06fac173529e | 52 | char topic[100]; |
| vpcola | 23:06fac173529e | 53 | QoS qos; |
| vpcola | 23:06fac173529e | 54 | unsigned short id; |
| vpcola | 23:06fac173529e | 55 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 56 | char payload[MAX_MQTT_PAYLOAD_SIZE]; |
| vpcola | 23:06fac173529e | 57 | }PubMessage, *pPubMessage; |
| vpcola | 23:06fac173529e | 58 | |
| vpcola | 23:06fac173529e | 59 | struct MessageData |
| vpcola | 23:06fac173529e | 60 | { |
| vpcola | 23:06fac173529e | 61 | MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) |
| vpcola | 23:06fac173529e | 62 | { } |
| vpcola | 23:06fac173529e | 63 | Message &message; |
| vpcola | 23:06fac173529e | 64 | MQTTString &topicName; |
| vpcola | 23:06fac173529e | 65 | }; |
| vpcola | 23:06fac173529e | 66 | |
| vpcola | 23:06fac173529e | 67 | class PacketId |
| vpcola | 23:06fac173529e | 68 | { |
| vpcola | 23:06fac173529e | 69 | public: |
| vpcola | 23:06fac173529e | 70 | PacketId() |
| vpcola | 23:06fac173529e | 71 | { |
| vpcola | 23:06fac173529e | 72 | next = 0; |
| vpcola | 23:06fac173529e | 73 | } |
| vpcola | 23:06fac173529e | 74 | |
| vpcola | 23:06fac173529e | 75 | int getNext() |
| vpcola | 23:06fac173529e | 76 | { |
| vpcola | 23:06fac173529e | 77 | return next = (next == MAX_PACKET_ID) ? 1 : ++next; |
| vpcola | 23:06fac173529e | 78 | } |
| vpcola | 23:06fac173529e | 79 | |
| vpcola | 23:06fac173529e | 80 | private: |
| vpcola | 23:06fac173529e | 81 | static const int MAX_PACKET_ID = 65535; |
| vpcola | 23:06fac173529e | 82 | int next; |
| vpcola | 23:06fac173529e | 83 | }; |
| vpcola | 23:06fac173529e | 84 | |
| vpcola | 23:06fac173529e | 85 | |
| vpcola | 23:06fac173529e | 86 | |
| vpcola | 23:06fac173529e | 87 | class MQTTThreadedClient |
| vpcola | 23:06fac173529e | 88 | { |
| vpcola | 23:06fac173529e | 89 | public: |
| vpcola | 25:326f00faa092 | 90 | MQTTThreadedClient(NetworkInterface * aNetwork, const char * pem = NULL) |
| vpcola | 23:06fac173529e | 91 | : network(aNetwork), |
| vpcola | 25:326f00faa092 | 92 | ssl_ca_pem(pem), |
| vpcola | 26:4b21de8043a5 | 93 | port((pem != NULL) ? 8883 : 1883), |
| vpcola | 23:06fac173529e | 94 | queue(32 * EVENTS_EVENT_SIZE), |
| vpcola | 26:4b21de8043a5 | 95 | isConnected(false), |
| vpcola | 26:4b21de8043a5 | 96 | hasSavedSession(false), |
| vpcola | 26:4b21de8043a5 | 97 | useTLS(pem != NULL) |
| vpcola | 25:326f00faa092 | 98 | { |
| vpcola | 25:326f00faa092 | 99 | DRBG_PERS = "mbed TLS MQTT client"; |
| vpcola | 23:06fac173529e | 100 | tcpSocket = new TCPSocket(); |
| vpcola | 25:326f00faa092 | 101 | setupTLS(); |
| vpcola | 25:326f00faa092 | 102 | } |
| vpcola | 25:326f00faa092 | 103 | |
| vpcola | 25:326f00faa092 | 104 | ~MQTTThreadedClient() |
| vpcola | 25:326f00faa092 | 105 | { |
| vpcola | 25:326f00faa092 | 106 | // TODO: signal the thread to shutdown |
| vpcola | 25:326f00faa092 | 107 | freeTLS(); |
| vpcola | 25:326f00faa092 | 108 | |
| vpcola | 25:326f00faa092 | 109 | if (isConnected) |
| vpcola | 25:326f00faa092 | 110 | disconnect(); |
| vpcola | 25:326f00faa092 | 111 | |
| vpcola | 23:06fac173529e | 112 | } |
| vpcola | 27:c90092f35d79 | 113 | /** |
| vpcola | 27:c90092f35d79 | 114 | * Sets the connection parameters. Must be called before running the startListener as a thread. |
| vpcola | 27:c90092f35d79 | 115 | * |
| vpcola | 27:c90092f35d79 | 116 | * @param host - pointer to the host where the MQTT server is running |
| vpcola | 27:c90092f35d79 | 117 | * @param port - the port number to connect, 1883 for non secure connections, 8883 for |
| vpcola | 27:c90092f35d79 | 118 | * secure connections |
| vpcola | 27:c90092f35d79 | 119 | * @param options - the connect data used for logging into the MQTT server. |
| vpcola | 27:c90092f35d79 | 120 | */ |
| vpcola | 26:4b21de8043a5 | 121 | void setConnectionParameters(const char * host, uint16_t port, MQTTPacket_connectData & options); |
| vpcola | 23:06fac173529e | 122 | int publish(PubMessage& message); |
| vpcola | 25:326f00faa092 | 123 | |
| vpcola | 25:326f00faa092 | 124 | void addTopicHandler(const char * topic, void (*function)(MessageData &)); |
| vpcola | 25:326f00faa092 | 125 | template<typename T> |
| vpcola | 25:326f00faa092 | 126 | void addTopicHandler(const char * topic, T *object, void (T::*member)(MessageData &)) |
| vpcola | 25:326f00faa092 | 127 | { |
| vpcola | 25:326f00faa092 | 128 | FP<void,MessageData &> fp; |
| vpcola | 25:326f00faa092 | 129 | fp.attach(object, member); |
| vpcola | 25:326f00faa092 | 130 | |
| vpcola | 26:4b21de8043a5 | 131 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topic),fp)); |
| vpcola | 25:326f00faa092 | 132 | } |
| vpcola | 25:326f00faa092 | 133 | |
| vpcola | 27:c90092f35d79 | 134 | // TODO: Add unsubscribe functionality. |
| vpcola | 27:c90092f35d79 | 135 | |
| vpcola | 26:4b21de8043a5 | 136 | // Start the listener thread and start polling |
| vpcola | 26:4b21de8043a5 | 137 | // MQTT server. |
| vpcola | 23:06fac173529e | 138 | void startListener(); |
| vpcola | 26:4b21de8043a5 | 139 | // Stop the listerner thread and closes connection |
| vpcola | 26:4b21de8043a5 | 140 | void stopListener(); |
| vpcola | 23:06fac173529e | 141 | |
| vpcola | 23:06fac173529e | 142 | protected: |
| vpcola | 26:4b21de8043a5 | 143 | |
| vpcola | 23:06fac173529e | 144 | int handlePublishMsg(); |
| vpcola | 26:4b21de8043a5 | 145 | void disconnect(); |
| vpcola | 26:4b21de8043a5 | 146 | int connect(); |
| vpcola | 23:06fac173529e | 147 | |
| vpcola | 23:06fac173529e | 148 | |
| vpcola | 23:06fac173529e | 149 | private: |
| vpcola | 23:06fac173529e | 150 | NetworkInterface * network; |
| vpcola | 25:326f00faa092 | 151 | const char * ssl_ca_pem; |
| vpcola | 23:06fac173529e | 152 | TCPSocket * tcpSocket; |
| vpcola | 23:06fac173529e | 153 | PacketId packetid; |
| vpcola | 25:326f00faa092 | 154 | const char *DRBG_PERS; |
| vpcola | 25:326f00faa092 | 155 | nsapi_error_t _error; |
| vpcola | 25:326f00faa092 | 156 | // Connection options |
| vpcola | 25:326f00faa092 | 157 | std::string host; |
| vpcola | 25:326f00faa092 | 158 | uint16_t port; |
| vpcola | 25:326f00faa092 | 159 | MQTTPacket_connectData connect_options; |
| vpcola | 23:06fac173529e | 160 | // Event queue |
| vpcola | 23:06fac173529e | 161 | EventQueue queue; |
| vpcola | 23:06fac173529e | 162 | bool isConnected; |
| vpcola | 26:4b21de8043a5 | 163 | bool hasSavedSession; |
| vpcola | 23:06fac173529e | 164 | |
| vpcola | 23:06fac173529e | 165 | // TODO: Because I'm using a map, I can only have one handler |
| vpcola | 23:06fac173529e | 166 | // for each topic (one that's mapped to the topic string). |
| vpcola | 23:06fac173529e | 167 | // Attaching another handler on the same topic is not possible. |
| vpcola | 23:06fac173529e | 168 | // In the future, use a vector instead of maps to allow multiple |
| vpcola | 23:06fac173529e | 169 | // handlers for the same topic. |
| vpcola | 23:06fac173529e | 170 | std::map<std::string, FP<void, MessageData &> > topicCBMap; |
| vpcola | 25:326f00faa092 | 171 | |
| vpcola | 23:06fac173529e | 172 | unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 173 | unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 174 | |
| vpcola | 23:06fac173529e | 175 | unsigned int keepAliveInterval; |
| vpcola | 23:06fac173529e | 176 | Timer comTimer; |
| vpcola | 23:06fac173529e | 177 | |
| vpcola | 25:326f00faa092 | 178 | // SSL/TLS functions |
| vpcola | 26:4b21de8043a5 | 179 | bool useTLS; |
| vpcola | 25:326f00faa092 | 180 | void setupTLS(); |
| vpcola | 25:326f00faa092 | 181 | int initTLS(); |
| vpcola | 25:326f00faa092 | 182 | void freeTLS(); |
| vpcola | 25:326f00faa092 | 183 | int doTLSHandshake(); |
| vpcola | 25:326f00faa092 | 184 | |
| vpcola | 25:326f00faa092 | 185 | int processSubscriptions(); |
| vpcola | 23:06fac173529e | 186 | int readPacket(); |
| vpcola | 23:06fac173529e | 187 | int sendPacket(size_t length); |
| vpcola | 23:06fac173529e | 188 | int readPacketLength(int* value); |
| vpcola | 23:06fac173529e | 189 | int readUntil(int packetType, int timeout); |
| vpcola | 23:06fac173529e | 190 | int readBytesToBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 191 | int sendBytesFromBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 192 | bool isTopicMatched(char* topic, MQTTString& topicName); |
| vpcola | 23:06fac173529e | 193 | int sendPublish(PubMessage& message); |
| vpcola | 23:06fac173529e | 194 | void resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 195 | void sendPingRequest(); |
| vpcola | 23:06fac173529e | 196 | bool hasConnectionTimedOut(); |
| vpcola | 26:4b21de8043a5 | 197 | int login(); |
| vpcola | 23:06fac173529e | 198 | }; |
| vpcola | 23:06fac173529e | 199 | |
| vpcola | 30:b2aed80037db | 200 | } |
| vpcola | 23:06fac173529e | 201 | #endif |
