Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.h@23:06fac173529e, 2017-03-26 (annotated)
- Committer:
- vpcola
- Date:
- Sun Mar 26 04:35:46 2017 +0000
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
Code is stable now in both publish/subscribe ...
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| vpcola | 23:06fac173529e | 1 | #ifndef _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 2 | #define _MQTT_THREADED_CLIENT_H_ |
| vpcola | 23:06fac173529e | 3 | |
| vpcola | 23:06fac173529e | 4 | #include "mbed.h" |
| vpcola | 23:06fac173529e | 5 | #include "rtos.h" |
| vpcola | 23:06fac173529e | 6 | #include "MQTTPacket.h" |
| vpcola | 23:06fac173529e | 7 | #include "NetworkInterface.h" |
| vpcola | 23:06fac173529e | 8 | #include "FP.h" |
| vpcola | 23:06fac173529e | 9 | |
| vpcola | 23:06fac173529e | 10 | #include <cstdio> |
| vpcola | 23:06fac173529e | 11 | #include <string> |
| vpcola | 23:06fac173529e | 12 | #include <map> |
| vpcola | 23:06fac173529e | 13 | |
| vpcola | 23:06fac173529e | 14 | #define COMMAND_TIMEOUT 5000 |
| vpcola | 23:06fac173529e | 15 | #define DEFAULT_SOCKET_TIMEOUT 1000 |
| vpcola | 23:06fac173529e | 16 | #define MAX_MQTT_PACKET_SIZE 200 |
| vpcola | 23:06fac173529e | 17 | #define MAX_MQTT_PAYLOAD_SIZE 100 |
| vpcola | 23:06fac173529e | 18 | |
| vpcola | 23:06fac173529e | 19 | typedef enum { QOS0, QOS1, QOS2 } QoS; |
| vpcola | 23:06fac173529e | 20 | |
| vpcola | 23:06fac173529e | 21 | // all failure return codes must be negative |
| vpcola | 23:06fac173529e | 22 | typedef enum { BUFFER_OVERFLOW = -3, TIMEOUT = -2, FAILURE = -1, SUCCESS = 0 } returnCode; |
| vpcola | 23:06fac173529e | 23 | |
| vpcola | 23:06fac173529e | 24 | |
| vpcola | 23:06fac173529e | 25 | typedef struct |
| vpcola | 23:06fac173529e | 26 | { |
| vpcola | 23:06fac173529e | 27 | QoS qos; |
| vpcola | 23:06fac173529e | 28 | bool retained; |
| vpcola | 23:06fac173529e | 29 | bool dup; |
| vpcola | 23:06fac173529e | 30 | unsigned short id; |
| vpcola | 23:06fac173529e | 31 | void *payload; |
| vpcola | 23:06fac173529e | 32 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 33 | }Message, *pMessage; |
| vpcola | 23:06fac173529e | 34 | |
| vpcola | 23:06fac173529e | 35 | // TODO: |
| vpcola | 23:06fac173529e | 36 | // Merge this struct with the one above, in order to use the same |
| vpcola | 23:06fac173529e | 37 | // data structure for sending and receiving. I need to simplify |
| vpcola | 23:06fac173529e | 38 | // the PubMessage to not contain pointers like the one above. |
| vpcola | 23:06fac173529e | 39 | typedef struct |
| vpcola | 23:06fac173529e | 40 | { |
| vpcola | 23:06fac173529e | 41 | char topic[100]; |
| vpcola | 23:06fac173529e | 42 | QoS qos; |
| vpcola | 23:06fac173529e | 43 | unsigned short id; |
| vpcola | 23:06fac173529e | 44 | size_t payloadlen; |
| vpcola | 23:06fac173529e | 45 | char payload[MAX_MQTT_PAYLOAD_SIZE]; |
| vpcola | 23:06fac173529e | 46 | }PubMessage, *pPubMessage; |
| vpcola | 23:06fac173529e | 47 | |
| vpcola | 23:06fac173529e | 48 | struct MessageData |
| vpcola | 23:06fac173529e | 49 | { |
| vpcola | 23:06fac173529e | 50 | MessageData(MQTTString &aTopicName, Message &aMessage) : message(aMessage), topicName(aTopicName) |
| vpcola | 23:06fac173529e | 51 | { } |
| vpcola | 23:06fac173529e | 52 | Message &message; |
| vpcola | 23:06fac173529e | 53 | MQTTString &topicName; |
| vpcola | 23:06fac173529e | 54 | }; |
| vpcola | 23:06fac173529e | 55 | |
| vpcola | 23:06fac173529e | 56 | class PacketId |
| vpcola | 23:06fac173529e | 57 | { |
| vpcola | 23:06fac173529e | 58 | public: |
| vpcola | 23:06fac173529e | 59 | PacketId() |
| vpcola | 23:06fac173529e | 60 | { |
| vpcola | 23:06fac173529e | 61 | next = 0; |
| vpcola | 23:06fac173529e | 62 | } |
| vpcola | 23:06fac173529e | 63 | |
| vpcola | 23:06fac173529e | 64 | int getNext() |
| vpcola | 23:06fac173529e | 65 | { |
| vpcola | 23:06fac173529e | 66 | return next = (next == MAX_PACKET_ID) ? 1 : ++next; |
| vpcola | 23:06fac173529e | 67 | } |
| vpcola | 23:06fac173529e | 68 | |
| vpcola | 23:06fac173529e | 69 | private: |
| vpcola | 23:06fac173529e | 70 | static const int MAX_PACKET_ID = 65535; |
| vpcola | 23:06fac173529e | 71 | int next; |
| vpcola | 23:06fac173529e | 72 | }; |
| vpcola | 23:06fac173529e | 73 | |
| vpcola | 23:06fac173529e | 74 | |
| vpcola | 23:06fac173529e | 75 | |
| vpcola | 23:06fac173529e | 76 | class MQTTThreadedClient |
| vpcola | 23:06fac173529e | 77 | { |
| vpcola | 23:06fac173529e | 78 | public: |
| vpcola | 23:06fac173529e | 79 | MQTTThreadedClient(NetworkInterface * aNetwork) |
| vpcola | 23:06fac173529e | 80 | : network(aNetwork), |
| vpcola | 23:06fac173529e | 81 | queue(32 * EVENTS_EVENT_SIZE), |
| vpcola | 23:06fac173529e | 82 | isConnected(false) { |
| vpcola | 23:06fac173529e | 83 | tcpSocket = new TCPSocket(); |
| vpcola | 23:06fac173529e | 84 | } |
| vpcola | 23:06fac173529e | 85 | |
| vpcola | 23:06fac173529e | 86 | int connect(const char * host, uint16_t port, MQTTPacket_connectData & options); |
| vpcola | 23:06fac173529e | 87 | int publish(PubMessage& message); |
| vpcola | 23:06fac173529e | 88 | int subscribe(const char * topic, QoS qos, void (*function)(MessageData &)); |
| vpcola | 23:06fac173529e | 89 | |
| vpcola | 23:06fac173529e | 90 | template<typename T> |
| vpcola | 23:06fac173529e | 91 | int subscribe(const char * topicstr, QoS qos, T *object, void (T::*member)(MessageData &)) { |
| vpcola | 23:06fac173529e | 92 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 93 | int len = 0; |
| vpcola | 23:06fac173529e | 94 | |
| vpcola | 23:06fac173529e | 95 | MQTTString topic = {(char*)topicstr, {0, 0}}; |
| vpcola | 23:06fac173529e | 96 | |
| vpcola | 23:06fac173529e | 97 | if (!isConnected) { |
| vpcola | 23:06fac173529e | 98 | printf("Session already connected!!\r\n"); |
| vpcola | 23:06fac173529e | 99 | return rc; |
| vpcola | 23:06fac173529e | 100 | } |
| vpcola | 23:06fac173529e | 101 | |
| vpcola | 23:06fac173529e | 102 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
| vpcola | 23:06fac173529e | 103 | if (len <= 0) { |
| vpcola | 23:06fac173529e | 104 | printf("Error serializing subscribe packet ...\r\n"); |
| vpcola | 23:06fac173529e | 105 | return rc; |
| vpcola | 23:06fac173529e | 106 | } |
| vpcola | 23:06fac173529e | 107 | |
| vpcola | 23:06fac173529e | 108 | if ((rc = sendPacket(len)) != SUCCESS) { |
| vpcola | 23:06fac173529e | 109 | printf("Error sending subscribe packet [%d]\r\n", rc); |
| vpcola | 23:06fac173529e | 110 | return rc; |
| vpcola | 23:06fac173529e | 111 | } |
| vpcola | 23:06fac173529e | 112 | |
| vpcola | 23:06fac173529e | 113 | // Wait for SUBACK, dropping packets read along the way ... |
| vpcola | 23:06fac173529e | 114 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
| vpcola | 23:06fac173529e | 115 | int count = 0, grantedQoS = -1; |
| vpcola | 23:06fac173529e | 116 | unsigned short mypacketid; |
| vpcola | 23:06fac173529e | 117 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 118 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
| vpcola | 23:06fac173529e | 119 | // For as long as we do not get 0x80 .. |
| vpcola | 23:06fac173529e | 120 | if (rc != 0x80) { |
| vpcola | 23:06fac173529e | 121 | // Add message handlers to the map |
| vpcola | 23:06fac173529e | 122 | FP<void,MessageData &> fp; |
| vpcola | 23:06fac173529e | 123 | fp.attach(object, member); |
| vpcola | 23:06fac173529e | 124 | |
| vpcola | 23:06fac173529e | 125 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
| vpcola | 23:06fac173529e | 126 | |
| vpcola | 23:06fac173529e | 127 | // Reset connection timers here ... |
| vpcola | 23:06fac173529e | 128 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 129 | |
| vpcola | 23:06fac173529e | 130 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 131 | } |
| vpcola | 23:06fac173529e | 132 | } else |
| vpcola | 23:06fac173529e | 133 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 134 | |
| vpcola | 23:06fac173529e | 135 | return rc; |
| vpcola | 23:06fac173529e | 136 | } |
| vpcola | 23:06fac173529e | 137 | |
| vpcola | 23:06fac173529e | 138 | // the listener thread ... |
| vpcola | 23:06fac173529e | 139 | void startListener(); |
| vpcola | 23:06fac173529e | 140 | |
| vpcola | 23:06fac173529e | 141 | protected: |
| vpcola | 23:06fac173529e | 142 | int connect(MQTTPacket_connectData& options); |
| vpcola | 23:06fac173529e | 143 | int handlePublishMsg(); |
| vpcola | 23:06fac173529e | 144 | |
| vpcola | 23:06fac173529e | 145 | |
| vpcola | 23:06fac173529e | 146 | private: |
| vpcola | 23:06fac173529e | 147 | NetworkInterface * network; |
| vpcola | 23:06fac173529e | 148 | TCPSocket * tcpSocket; |
| vpcola | 23:06fac173529e | 149 | PacketId packetid; |
| vpcola | 23:06fac173529e | 150 | // Event queue |
| vpcola | 23:06fac173529e | 151 | EventQueue queue; |
| vpcola | 23:06fac173529e | 152 | bool isConnected; |
| vpcola | 23:06fac173529e | 153 | |
| vpcola | 23:06fac173529e | 154 | // TODO: Because I'm using a map, I can only have one handler |
| vpcola | 23:06fac173529e | 155 | // for each topic (one that's mapped to the topic string). |
| vpcola | 23:06fac173529e | 156 | // Attaching another handler on the same topic is not possible. |
| vpcola | 23:06fac173529e | 157 | // In the future, use a vector instead of maps to allow multiple |
| vpcola | 23:06fac173529e | 158 | // handlers for the same topic. |
| vpcola | 23:06fac173529e | 159 | std::map<std::string, FP<void, MessageData &> > topicCBMap; |
| vpcola | 23:06fac173529e | 160 | |
| vpcola | 23:06fac173529e | 161 | unsigned char sendbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 162 | unsigned char readbuf[MAX_MQTT_PACKET_SIZE]; |
| vpcola | 23:06fac173529e | 163 | |
| vpcola | 23:06fac173529e | 164 | unsigned int keepAliveInterval; |
| vpcola | 23:06fac173529e | 165 | Timer comTimer; |
| vpcola | 23:06fac173529e | 166 | |
| vpcola | 23:06fac173529e | 167 | int readPacket(); |
| vpcola | 23:06fac173529e | 168 | int sendPacket(size_t length); |
| vpcola | 23:06fac173529e | 169 | int readPacketLength(int* value); |
| vpcola | 23:06fac173529e | 170 | int readUntil(int packetType, int timeout); |
| vpcola | 23:06fac173529e | 171 | int readBytesToBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 172 | int sendBytesFromBuffer(char * buffer, size_t size, int timeout); |
| vpcola | 23:06fac173529e | 173 | bool isTopicMatched(char* topic, MQTTString& topicName); |
| vpcola | 23:06fac173529e | 174 | int sendPublish(PubMessage& message); |
| vpcola | 23:06fac173529e | 175 | void resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 176 | void sendPingRequest(); |
| vpcola | 23:06fac173529e | 177 | bool hasConnectionTimedOut(); |
| vpcola | 23:06fac173529e | 178 | |
| vpcola | 23:06fac173529e | 179 | }; |
| vpcola | 23:06fac173529e | 180 | |
| vpcola | 23:06fac173529e | 181 | #endif |
