Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Fork of HelloMQTT by
MQTTThreadedClient.cpp@23:06fac173529e, 2017-03-26 (annotated)
- Committer:
- vpcola
- Date:
- Sun Mar 26 04:35:46 2017 +0000
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
Code is stable now in both publish/subscribe ...
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| vpcola | 23:06fac173529e | 1 | #include "mbed.h" |
| vpcola | 23:06fac173529e | 2 | #include "rtos.h" |
| vpcola | 23:06fac173529e | 3 | #include "MQTTThreadedClient.h" |
| vpcola | 23:06fac173529e | 4 | |
| vpcola | 23:06fac173529e | 5 | |
| vpcola | 23:06fac173529e | 6 | static MemoryPool<PubMessage, 16> mpool; |
| vpcola | 23:06fac173529e | 7 | static Queue<PubMessage, 16> mqueue; |
| vpcola | 23:06fac173529e | 8 | |
| vpcola | 23:06fac173529e | 9 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
| vpcola | 23:06fac173529e | 10 | { |
| vpcola | 23:06fac173529e | 11 | int rc; |
| vpcola | 23:06fac173529e | 12 | |
| vpcola | 23:06fac173529e | 13 | if (tcpSocket == NULL) |
| vpcola | 23:06fac173529e | 14 | return -1; |
| vpcola | 23:06fac173529e | 15 | |
| vpcola | 23:06fac173529e | 16 | // non-blocking socket ... |
| vpcola | 23:06fac173529e | 17 | tcpSocket->set_timeout(timeout); |
| vpcola | 23:06fac173529e | 18 | rc = tcpSocket->recv( (void *) buffer, size); |
| vpcola | 23:06fac173529e | 19 | |
| vpcola | 23:06fac173529e | 20 | // return 0 bytes if timeout ... |
| vpcola | 23:06fac173529e | 21 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
| vpcola | 23:06fac173529e | 22 | return TIMEOUT; |
| vpcola | 23:06fac173529e | 23 | else |
| vpcola | 23:06fac173529e | 24 | return rc; // return the number of bytes received or error |
| vpcola | 23:06fac173529e | 25 | } |
| vpcola | 23:06fac173529e | 26 | |
| vpcola | 23:06fac173529e | 27 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
| vpcola | 23:06fac173529e | 28 | { |
| vpcola | 23:06fac173529e | 29 | int rc; |
| vpcola | 23:06fac173529e | 30 | |
| vpcola | 23:06fac173529e | 31 | if (tcpSocket == NULL) |
| vpcola | 23:06fac173529e | 32 | return -1; |
| vpcola | 23:06fac173529e | 33 | |
| vpcola | 23:06fac173529e | 34 | // set the write timeout |
| vpcola | 23:06fac173529e | 35 | tcpSocket->set_timeout(timeout); |
| vpcola | 23:06fac173529e | 36 | rc = tcpSocket->send(buffer, size); |
| vpcola | 23:06fac173529e | 37 | |
| vpcola | 23:06fac173529e | 38 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
| vpcola | 23:06fac173529e | 39 | return TIMEOUT; |
| vpcola | 23:06fac173529e | 40 | else |
| vpcola | 23:06fac173529e | 41 | return rc; |
| vpcola | 23:06fac173529e | 42 | } |
| vpcola | 23:06fac173529e | 43 | |
| vpcola | 23:06fac173529e | 44 | int MQTTThreadedClient::readPacketLength(int* value) |
| vpcola | 23:06fac173529e | 45 | { |
| vpcola | 23:06fac173529e | 46 | int rc = MQTTPACKET_READ_ERROR; |
| vpcola | 23:06fac173529e | 47 | unsigned char c; |
| vpcola | 23:06fac173529e | 48 | int multiplier = 1; |
| vpcola | 23:06fac173529e | 49 | int len = 0; |
| vpcola | 23:06fac173529e | 50 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
| vpcola | 23:06fac173529e | 51 | |
| vpcola | 23:06fac173529e | 52 | *value = 0; |
| vpcola | 23:06fac173529e | 53 | do |
| vpcola | 23:06fac173529e | 54 | { |
| vpcola | 23:06fac173529e | 55 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
| vpcola | 23:06fac173529e | 56 | { |
| vpcola | 23:06fac173529e | 57 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
| vpcola | 23:06fac173529e | 58 | goto exit; |
| vpcola | 23:06fac173529e | 59 | } |
| vpcola | 23:06fac173529e | 60 | |
| vpcola | 23:06fac173529e | 61 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
| vpcola | 23:06fac173529e | 62 | if (rc != 1) |
| vpcola | 23:06fac173529e | 63 | { |
| vpcola | 23:06fac173529e | 64 | rc = MQTTPACKET_READ_ERROR; |
| vpcola | 23:06fac173529e | 65 | goto exit; |
| vpcola | 23:06fac173529e | 66 | } |
| vpcola | 23:06fac173529e | 67 | |
| vpcola | 23:06fac173529e | 68 | *value += (c & 127) * multiplier; |
| vpcola | 23:06fac173529e | 69 | multiplier *= 128; |
| vpcola | 23:06fac173529e | 70 | } while ((c & 128) != 0); |
| vpcola | 23:06fac173529e | 71 | |
| vpcola | 23:06fac173529e | 72 | rc = MQTTPACKET_READ_COMPLETE; |
| vpcola | 23:06fac173529e | 73 | |
| vpcola | 23:06fac173529e | 74 | exit: |
| vpcola | 23:06fac173529e | 75 | if (rc == MQTTPACKET_READ_ERROR ) |
| vpcola | 23:06fac173529e | 76 | len = -1; |
| vpcola | 23:06fac173529e | 77 | |
| vpcola | 23:06fac173529e | 78 | return len; |
| vpcola | 23:06fac173529e | 79 | } |
| vpcola | 23:06fac173529e | 80 | |
| vpcola | 23:06fac173529e | 81 | int MQTTThreadedClient::sendPacket(size_t length) |
| vpcola | 23:06fac173529e | 82 | { |
| vpcola | 23:06fac173529e | 83 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 84 | int sent = 0; |
| vpcola | 23:06fac173529e | 85 | |
| vpcola | 23:06fac173529e | 86 | while (sent < length) |
| vpcola | 23:06fac173529e | 87 | { |
| vpcola | 23:06fac173529e | 88 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
| vpcola | 23:06fac173529e | 89 | if (rc < 0) // there was an error writing the data |
| vpcola | 23:06fac173529e | 90 | break; |
| vpcola | 23:06fac173529e | 91 | sent += rc; |
| vpcola | 23:06fac173529e | 92 | } |
| vpcola | 23:06fac173529e | 93 | |
| vpcola | 23:06fac173529e | 94 | if (sent == length) |
| vpcola | 23:06fac173529e | 95 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 96 | else |
| vpcola | 23:06fac173529e | 97 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 98 | |
| vpcola | 23:06fac173529e | 99 | return rc; |
| vpcola | 23:06fac173529e | 100 | } |
| vpcola | 23:06fac173529e | 101 | /** |
| vpcola | 23:06fac173529e | 102 | * Reads the entire packet to readbuf and returns |
| vpcola | 23:06fac173529e | 103 | * the type of packet when successful, otherwise |
| vpcola | 23:06fac173529e | 104 | * a negative error code is returned. |
| vpcola | 23:06fac173529e | 105 | **/ |
| vpcola | 23:06fac173529e | 106 | int MQTTThreadedClient::readPacket() |
| vpcola | 23:06fac173529e | 107 | { |
| vpcola | 23:06fac173529e | 108 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 109 | MQTTHeader header = {0}; |
| vpcola | 23:06fac173529e | 110 | int len = 0; |
| vpcola | 23:06fac173529e | 111 | int rem_len = 0; |
| vpcola | 23:06fac173529e | 112 | |
| vpcola | 23:06fac173529e | 113 | /* 1. read the header byte. This has the packet type in it */ |
| vpcola | 23:06fac173529e | 114 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
| vpcola | 23:06fac173529e | 115 | goto exit; |
| vpcola | 23:06fac173529e | 116 | |
| vpcola | 23:06fac173529e | 117 | len = 1; |
| vpcola | 23:06fac173529e | 118 | /* 2. read the remaining length. This is variable in itself */ |
| vpcola | 23:06fac173529e | 119 | if ( readPacketLength(&rem_len) < 0 ) |
| vpcola | 23:06fac173529e | 120 | goto exit; |
| vpcola | 23:06fac173529e | 121 | |
| vpcola | 23:06fac173529e | 122 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
| vpcola | 23:06fac173529e | 123 | |
| vpcola | 23:06fac173529e | 124 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
| vpcola | 23:06fac173529e | 125 | { |
| vpcola | 23:06fac173529e | 126 | rc = BUFFER_OVERFLOW; |
| vpcola | 23:06fac173529e | 127 | goto exit; |
| vpcola | 23:06fac173529e | 128 | } |
| vpcola | 23:06fac173529e | 129 | |
| vpcola | 23:06fac173529e | 130 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
| vpcola | 23:06fac173529e | 131 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
| vpcola | 23:06fac173529e | 132 | goto exit; |
| vpcola | 23:06fac173529e | 133 | |
| vpcola | 23:06fac173529e | 134 | // Convert the header to type |
| vpcola | 23:06fac173529e | 135 | // and update rc |
| vpcola | 23:06fac173529e | 136 | header.byte = readbuf[0]; |
| vpcola | 23:06fac173529e | 137 | rc = header.bits.type; |
| vpcola | 23:06fac173529e | 138 | |
| vpcola | 23:06fac173529e | 139 | exit: |
| vpcola | 23:06fac173529e | 140 | |
| vpcola | 23:06fac173529e | 141 | return rc; |
| vpcola | 23:06fac173529e | 142 | } |
| vpcola | 23:06fac173529e | 143 | |
| vpcola | 23:06fac173529e | 144 | /** |
| vpcola | 23:06fac173529e | 145 | * Read until a specified packet type is received, or untill the specified |
| vpcola | 23:06fac173529e | 146 | * timeout dropping packets along the way. |
| vpcola | 23:06fac173529e | 147 | **/ |
| vpcola | 23:06fac173529e | 148 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
| vpcola | 23:06fac173529e | 149 | { |
| vpcola | 23:06fac173529e | 150 | int pType = FAILURE; |
| vpcola | 23:06fac173529e | 151 | Timer timer; |
| vpcola | 23:06fac173529e | 152 | |
| vpcola | 23:06fac173529e | 153 | timer.start(); |
| vpcola | 23:06fac173529e | 154 | do { |
| vpcola | 23:06fac173529e | 155 | pType = readPacket(); |
| vpcola | 23:06fac173529e | 156 | if (pType < 0) |
| vpcola | 23:06fac173529e | 157 | break; |
| vpcola | 23:06fac173529e | 158 | |
| vpcola | 23:06fac173529e | 159 | if (timer.read_ms() > timeout) |
| vpcola | 23:06fac173529e | 160 | { |
| vpcola | 23:06fac173529e | 161 | pType = FAILURE; |
| vpcola | 23:06fac173529e | 162 | break; |
| vpcola | 23:06fac173529e | 163 | } |
| vpcola | 23:06fac173529e | 164 | }while(pType != packetType); |
| vpcola | 23:06fac173529e | 165 | |
| vpcola | 23:06fac173529e | 166 | return pType; |
| vpcola | 23:06fac173529e | 167 | } |
| vpcola | 23:06fac173529e | 168 | |
| vpcola | 23:06fac173529e | 169 | |
| vpcola | 23:06fac173529e | 170 | int MQTTThreadedClient::connect(MQTTPacket_connectData& options) |
| vpcola | 23:06fac173529e | 171 | { |
| vpcola | 23:06fac173529e | 172 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 173 | int len = 0; |
| vpcola | 23:06fac173529e | 174 | |
| vpcola | 23:06fac173529e | 175 | if (isConnected) |
| vpcola | 23:06fac173529e | 176 | { |
| vpcola | 23:06fac173529e | 177 | printf("Session already connected! \r\n"); |
| vpcola | 23:06fac173529e | 178 | return rc; |
| vpcola | 23:06fac173529e | 179 | } |
| vpcola | 23:06fac173529e | 180 | |
| vpcola | 23:06fac173529e | 181 | // Copy the keepAliveInterval value to local |
| vpcola | 23:06fac173529e | 182 | // MQTT specifies in seconds, we have to multiply that |
| vpcola | 23:06fac173529e | 183 | // amount for our 32 bit timers which accepts ms. |
| vpcola | 23:06fac173529e | 184 | keepAliveInterval = (options.keepAliveInterval * 1000); |
| vpcola | 23:06fac173529e | 185 | |
| vpcola | 23:06fac173529e | 186 | printf("Connecting with: \r\n"); |
| vpcola | 23:06fac173529e | 187 | printf("\tUsername: [%s]\r\n", options.username.cstring); |
| vpcola | 23:06fac173529e | 188 | printf("\tPassword: [%s]\r\n", options.password.cstring); |
| vpcola | 23:06fac173529e | 189 | |
| vpcola | 23:06fac173529e | 190 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) |
| vpcola | 23:06fac173529e | 191 | { |
| vpcola | 23:06fac173529e | 192 | printf("Error serializing connect packet ...\r\n"); |
| vpcola | 23:06fac173529e | 193 | return rc; |
| vpcola | 23:06fac173529e | 194 | } |
| vpcola | 23:06fac173529e | 195 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
| vpcola | 23:06fac173529e | 196 | { |
| vpcola | 23:06fac173529e | 197 | printf("Error sending the connect request packet ...\r\n"); |
| vpcola | 23:06fac173529e | 198 | return rc; |
| vpcola | 23:06fac173529e | 199 | } |
| vpcola | 23:06fac173529e | 200 | |
| vpcola | 23:06fac173529e | 201 | // Wait for the CONNACK |
| vpcola | 23:06fac173529e | 202 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
| vpcola | 23:06fac173529e | 203 | { |
| vpcola | 23:06fac173529e | 204 | unsigned char connack_rc = 255; |
| vpcola | 23:06fac173529e | 205 | bool sessionPresent = false; |
| vpcola | 23:06fac173529e | 206 | printf("Connection acknowledgement received ... deserializing respones ...\r\n"); |
| vpcola | 23:06fac173529e | 207 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 208 | rc = connack_rc; |
| vpcola | 23:06fac173529e | 209 | else |
| vpcola | 23:06fac173529e | 210 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 211 | } |
| vpcola | 23:06fac173529e | 212 | else |
| vpcola | 23:06fac173529e | 213 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 214 | |
| vpcola | 23:06fac173529e | 215 | if (rc == SUCCESS) |
| vpcola | 23:06fac173529e | 216 | { |
| vpcola | 23:06fac173529e | 217 | printf("Connected!!! ... starting connection timers ...\r\n"); |
| vpcola | 23:06fac173529e | 218 | isConnected = true; |
| vpcola | 23:06fac173529e | 219 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 220 | }else |
| vpcola | 23:06fac173529e | 221 | { |
| vpcola | 23:06fac173529e | 222 | // TODO: Call socket->disconnect()? |
| vpcola | 23:06fac173529e | 223 | } |
| vpcola | 23:06fac173529e | 224 | |
| vpcola | 23:06fac173529e | 225 | printf("Returning with rc = %d\r\n", rc); |
| vpcola | 23:06fac173529e | 226 | |
| vpcola | 23:06fac173529e | 227 | return rc; |
| vpcola | 23:06fac173529e | 228 | } |
| vpcola | 23:06fac173529e | 229 | |
| vpcola | 23:06fac173529e | 230 | int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options) |
| vpcola | 23:06fac173529e | 231 | { |
| vpcola | 23:06fac173529e | 232 | int ret; |
| vpcola | 23:06fac173529e | 233 | |
| vpcola | 23:06fac173529e | 234 | tcpSocket->open(network); |
| vpcola | 23:06fac173529e | 235 | if (( ret = tcpSocket->connect(host, port)) < 0 ) |
| vpcola | 23:06fac173529e | 236 | { |
| vpcola | 23:06fac173529e | 237 | |
| vpcola | 23:06fac173529e | 238 | printf("Error connecting to %s:%d with %d\r\n", host, port, ret); |
| vpcola | 23:06fac173529e | 239 | return ret; |
| vpcola | 23:06fac173529e | 240 | } |
| vpcola | 23:06fac173529e | 241 | |
| vpcola | 23:06fac173529e | 242 | return connect(options); |
| vpcola | 23:06fac173529e | 243 | } |
| vpcola | 23:06fac173529e | 244 | |
| vpcola | 23:06fac173529e | 245 | int MQTTThreadedClient::publish(PubMessage& msg) |
| vpcola | 23:06fac173529e | 246 | { |
| vpcola | 23:06fac173529e | 247 | #if 0 |
| vpcola | 23:06fac173529e | 248 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
| vpcola | 23:06fac173529e | 249 | // TODO: handle id values when the function is called later |
| vpcola | 23:06fac173529e | 250 | if (id == 0) |
| vpcola | 23:06fac173529e | 251 | return FAILURE; |
| vpcola | 23:06fac173529e | 252 | else |
| vpcola | 23:06fac173529e | 253 | return SUCCESS; |
| vpcola | 23:06fac173529e | 254 | #endif |
| vpcola | 23:06fac173529e | 255 | PubMessage *message = mpool.alloc(); |
| vpcola | 23:06fac173529e | 256 | // Simple copy |
| vpcola | 23:06fac173529e | 257 | *message = msg; |
| vpcola | 23:06fac173529e | 258 | |
| vpcola | 23:06fac173529e | 259 | // Push the data to the thread |
| vpcola | 23:06fac173529e | 260 | printf("Pushing data to consumer thread ...\r\n"); |
| vpcola | 23:06fac173529e | 261 | mqueue.put(message); |
| vpcola | 23:06fac173529e | 262 | |
| vpcola | 23:06fac173529e | 263 | return SUCCESS; |
| vpcola | 23:06fac173529e | 264 | } |
| vpcola | 23:06fac173529e | 265 | |
| vpcola | 23:06fac173529e | 266 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
| vpcola | 23:06fac173529e | 267 | { |
| vpcola | 23:06fac173529e | 268 | MQTTString topicString = MQTTString_initializer; |
| vpcola | 23:06fac173529e | 269 | |
| vpcola | 23:06fac173529e | 270 | if (!isConnected) |
| vpcola | 23:06fac173529e | 271 | { |
| vpcola | 23:06fac173529e | 272 | printf("Not connected!!! ...\r\n"); |
| vpcola | 23:06fac173529e | 273 | return FAILURE; |
| vpcola | 23:06fac173529e | 274 | } |
| vpcola | 23:06fac173529e | 275 | |
| vpcola | 23:06fac173529e | 276 | topicString.cstring = (char*) &message.topic[0]; |
| vpcola | 23:06fac173529e | 277 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
| vpcola | 23:06fac173529e | 278 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
| vpcola | 23:06fac173529e | 279 | if (len <= 0) |
| vpcola | 23:06fac173529e | 280 | { |
| vpcola | 23:06fac173529e | 281 | printf("Failed serializing message ...\r\n"); |
| vpcola | 23:06fac173529e | 282 | return FAILURE; |
| vpcola | 23:06fac173529e | 283 | } |
| vpcola | 23:06fac173529e | 284 | |
| vpcola | 23:06fac173529e | 285 | if (sendPacket(len) == SUCCESS) |
| vpcola | 23:06fac173529e | 286 | { |
| vpcola | 23:06fac173529e | 287 | printf("Successfully sent publish packet to server ...\r\n"); |
| vpcola | 23:06fac173529e | 288 | return SUCCESS; |
| vpcola | 23:06fac173529e | 289 | } |
| vpcola | 23:06fac173529e | 290 | |
| vpcola | 23:06fac173529e | 291 | printf("Failed to send publish packet to server ...\r\n"); |
| vpcola | 23:06fac173529e | 292 | return FAILURE; |
| vpcola | 23:06fac173529e | 293 | } |
| vpcola | 23:06fac173529e | 294 | |
| vpcola | 23:06fac173529e | 295 | int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &)) |
| vpcola | 23:06fac173529e | 296 | { |
| vpcola | 23:06fac173529e | 297 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 298 | int len = 0; |
| vpcola | 23:06fac173529e | 299 | |
| vpcola | 23:06fac173529e | 300 | MQTTString topic = {(char*)topicstr, {0, 0}}; |
| vpcola | 23:06fac173529e | 301 | printf("Subscribing to topic [%s]\r\n", topicstr); |
| vpcola | 23:06fac173529e | 302 | |
| vpcola | 23:06fac173529e | 303 | if (!isConnected) |
| vpcola | 23:06fac173529e | 304 | { |
| vpcola | 23:06fac173529e | 305 | printf("Session already connected!!\r\n"); |
| vpcola | 23:06fac173529e | 306 | return rc; |
| vpcola | 23:06fac173529e | 307 | } |
| vpcola | 23:06fac173529e | 308 | |
| vpcola | 23:06fac173529e | 309 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
| vpcola | 23:06fac173529e | 310 | if (len <= 0) |
| vpcola | 23:06fac173529e | 311 | { |
| vpcola | 23:06fac173529e | 312 | printf("Error serializing subscribe packet ...\r\n"); |
| vpcola | 23:06fac173529e | 313 | return rc; |
| vpcola | 23:06fac173529e | 314 | } |
| vpcola | 23:06fac173529e | 315 | |
| vpcola | 23:06fac173529e | 316 | if ((rc = sendPacket(len)) != SUCCESS) |
| vpcola | 23:06fac173529e | 317 | { |
| vpcola | 23:06fac173529e | 318 | printf("Error sending subscribe packet [%d]\r\n", rc); |
| vpcola | 23:06fac173529e | 319 | return rc; |
| vpcola | 23:06fac173529e | 320 | } |
| vpcola | 23:06fac173529e | 321 | |
| vpcola | 23:06fac173529e | 322 | printf("Waiting for subscription ack ...\r\n"); |
| vpcola | 23:06fac173529e | 323 | // Wait for SUBACK, dropping packets read along the way ... |
| vpcola | 23:06fac173529e | 324 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback |
| vpcola | 23:06fac173529e | 325 | { |
| vpcola | 23:06fac173529e | 326 | int count = 0, grantedQoS = -1; |
| vpcola | 23:06fac173529e | 327 | unsigned short mypacketid; |
| vpcola | 23:06fac173529e | 328 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 329 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
| vpcola | 23:06fac173529e | 330 | // For as long as we do not get 0x80 .. |
| vpcola | 23:06fac173529e | 331 | if (rc != 0x80) |
| vpcola | 23:06fac173529e | 332 | { |
| vpcola | 23:06fac173529e | 333 | // Add message handlers to the map |
| vpcola | 23:06fac173529e | 334 | FP<void,MessageData &> fp; |
| vpcola | 23:06fac173529e | 335 | fp.attach(function); |
| vpcola | 23:06fac173529e | 336 | |
| vpcola | 23:06fac173529e | 337 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
| vpcola | 23:06fac173529e | 338 | |
| vpcola | 23:06fac173529e | 339 | // Reset connection timers here ... |
| vpcola | 23:06fac173529e | 340 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 341 | |
| vpcola | 23:06fac173529e | 342 | printf("Successfully subscribed to %s ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 343 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 344 | }else |
| vpcola | 23:06fac173529e | 345 | { |
| vpcola | 23:06fac173529e | 346 | printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr); |
| vpcola | 23:06fac173529e | 347 | } |
| vpcola | 23:06fac173529e | 348 | } |
| vpcola | 23:06fac173529e | 349 | else |
| vpcola | 23:06fac173529e | 350 | { |
| vpcola | 23:06fac173529e | 351 | printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 352 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 353 | } |
| vpcola | 23:06fac173529e | 354 | |
| vpcola | 23:06fac173529e | 355 | return rc; |
| vpcola | 23:06fac173529e | 356 | |
| vpcola | 23:06fac173529e | 357 | } |
| vpcola | 23:06fac173529e | 358 | |
| vpcola | 23:06fac173529e | 359 | |
| vpcola | 23:06fac173529e | 360 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
| vpcola | 23:06fac173529e | 361 | { |
| vpcola | 23:06fac173529e | 362 | char* curf = topicFilter; |
| vpcola | 23:06fac173529e | 363 | char* curn = topicName.lenstring.data; |
| vpcola | 23:06fac173529e | 364 | char* curn_end = curn + topicName.lenstring.len; |
| vpcola | 23:06fac173529e | 365 | |
| vpcola | 23:06fac173529e | 366 | while (*curf && curn < curn_end) |
| vpcola | 23:06fac173529e | 367 | { |
| vpcola | 23:06fac173529e | 368 | if (*curn == '/' && *curf != '/') |
| vpcola | 23:06fac173529e | 369 | break; |
| vpcola | 23:06fac173529e | 370 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
| vpcola | 23:06fac173529e | 371 | break; |
| vpcola | 23:06fac173529e | 372 | if (*curf == '+') |
| vpcola | 23:06fac173529e | 373 | { // skip until we meet the next separator, or end of string |
| vpcola | 23:06fac173529e | 374 | char* nextpos = curn + 1; |
| vpcola | 23:06fac173529e | 375 | while (nextpos < curn_end && *nextpos != '/') |
| vpcola | 23:06fac173529e | 376 | nextpos = ++curn + 1; |
| vpcola | 23:06fac173529e | 377 | } |
| vpcola | 23:06fac173529e | 378 | else if (*curf == '#') |
| vpcola | 23:06fac173529e | 379 | curn = curn_end - 1; // skip until end of string |
| vpcola | 23:06fac173529e | 380 | curf++; |
| vpcola | 23:06fac173529e | 381 | curn++; |
| vpcola | 23:06fac173529e | 382 | }; |
| vpcola | 23:06fac173529e | 383 | |
| vpcola | 23:06fac173529e | 384 | return (curn == curn_end) && (*curf == '\0'); |
| vpcola | 23:06fac173529e | 385 | } |
| vpcola | 23:06fac173529e | 386 | |
| vpcola | 23:06fac173529e | 387 | int MQTTThreadedClient::handlePublishMsg() |
| vpcola | 23:06fac173529e | 388 | { |
| vpcola | 23:06fac173529e | 389 | MQTTString topicName = MQTTString_initializer; |
| vpcola | 23:06fac173529e | 390 | Message msg; |
| vpcola | 23:06fac173529e | 391 | int intQoS; |
| vpcola | 23:06fac173529e | 392 | printf("Deserializing publish message ...\r\n"); |
| vpcola | 23:06fac173529e | 393 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
| vpcola | 23:06fac173529e | 394 | &intQoS, |
| vpcola | 23:06fac173529e | 395 | (unsigned char*)&msg.retained, |
| vpcola | 23:06fac173529e | 396 | (unsigned short*)&msg.id, |
| vpcola | 23:06fac173529e | 397 | &topicName, |
| vpcola | 23:06fac173529e | 398 | (unsigned char**)&msg.payload, |
| vpcola | 23:06fac173529e | 399 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
| vpcola | 23:06fac173529e | 400 | { |
| vpcola | 23:06fac173529e | 401 | printf("Error deserializing published message ...\r\n"); |
| vpcola | 23:06fac173529e | 402 | return -1; |
| vpcola | 23:06fac173529e | 403 | } |
| vpcola | 23:06fac173529e | 404 | |
| vpcola | 23:06fac173529e | 405 | std::string topic; |
| vpcola | 23:06fac173529e | 406 | if (topicName.lenstring.len > 0) |
| vpcola | 23:06fac173529e | 407 | { |
| vpcola | 23:06fac173529e | 408 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
| vpcola | 23:06fac173529e | 409 | }else |
| vpcola | 23:06fac173529e | 410 | topic = (const char *) topicName.cstring; |
| vpcola | 23:06fac173529e | 411 | |
| vpcola | 23:06fac173529e | 412 | printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); |
| vpcola | 23:06fac173529e | 413 | |
| vpcola | 23:06fac173529e | 414 | msg.qos = (QoS) intQoS; |
| vpcola | 23:06fac173529e | 415 | |
| vpcola | 23:06fac173529e | 416 | |
| vpcola | 23:06fac173529e | 417 | // Call the handlers for each topic |
| vpcola | 23:06fac173529e | 418 | if (topicCBMap.find(topic) != topicCBMap.end()) |
| vpcola | 23:06fac173529e | 419 | { |
| vpcola | 23:06fac173529e | 420 | // Call the callback function |
| vpcola | 23:06fac173529e | 421 | if (topicCBMap[topic].attached()) |
| vpcola | 23:06fac173529e | 422 | { |
| vpcola | 23:06fac173529e | 423 | printf("Invoking function handler for topic ...\r\n"); |
| vpcola | 23:06fac173529e | 424 | MessageData md(topicName, msg); |
| vpcola | 23:06fac173529e | 425 | topicCBMap[topic](md); |
| vpcola | 23:06fac173529e | 426 | |
| vpcola | 23:06fac173529e | 427 | return 1; |
| vpcola | 23:06fac173529e | 428 | } |
| vpcola | 23:06fac173529e | 429 | } |
| vpcola | 23:06fac173529e | 430 | |
| vpcola | 23:06fac173529e | 431 | // TODO: depending on the QoS |
| vpcola | 23:06fac173529e | 432 | // we send data to the server = PUBACK or PUBREC |
| vpcola | 23:06fac173529e | 433 | switch(intQoS) |
| vpcola | 23:06fac173529e | 434 | { |
| vpcola | 23:06fac173529e | 435 | case QOS0: |
| vpcola | 23:06fac173529e | 436 | // We send back nothing ... |
| vpcola | 23:06fac173529e | 437 | break; |
| vpcola | 23:06fac173529e | 438 | case QOS1: |
| vpcola | 23:06fac173529e | 439 | // TODO: implement |
| vpcola | 23:06fac173529e | 440 | break; |
| vpcola | 23:06fac173529e | 441 | case QOS2: |
| vpcola | 23:06fac173529e | 442 | // TODO: implement |
| vpcola | 23:06fac173529e | 443 | break; |
| vpcola | 23:06fac173529e | 444 | default: |
| vpcola | 23:06fac173529e | 445 | break; |
| vpcola | 23:06fac173529e | 446 | } |
| vpcola | 23:06fac173529e | 447 | |
| vpcola | 23:06fac173529e | 448 | return 0; |
| vpcola | 23:06fac173529e | 449 | } |
| vpcola | 23:06fac173529e | 450 | |
| vpcola | 23:06fac173529e | 451 | void MQTTThreadedClient::resetConnectionTimer() |
| vpcola | 23:06fac173529e | 452 | { |
| vpcola | 23:06fac173529e | 453 | if (keepAliveInterval > 0) |
| vpcola | 23:06fac173529e | 454 | { |
| vpcola | 23:06fac173529e | 455 | comTimer.reset(); |
| vpcola | 23:06fac173529e | 456 | comTimer.start(); |
| vpcola | 23:06fac173529e | 457 | } |
| vpcola | 23:06fac173529e | 458 | } |
| vpcola | 23:06fac173529e | 459 | |
| vpcola | 23:06fac173529e | 460 | bool MQTTThreadedClient::hasConnectionTimedOut() |
| vpcola | 23:06fac173529e | 461 | { |
| vpcola | 23:06fac173529e | 462 | if (keepAliveInterval > 0 ) { |
| vpcola | 23:06fac173529e | 463 | // Check connection timer |
| vpcola | 23:06fac173529e | 464 | if (comTimer.read_ms() > keepAliveInterval) |
| vpcola | 23:06fac173529e | 465 | return true; |
| vpcola | 23:06fac173529e | 466 | else |
| vpcola | 23:06fac173529e | 467 | return false; |
| vpcola | 23:06fac173529e | 468 | } |
| vpcola | 23:06fac173529e | 469 | |
| vpcola | 23:06fac173529e | 470 | return false; |
| vpcola | 23:06fac173529e | 471 | } |
| vpcola | 23:06fac173529e | 472 | |
| vpcola | 23:06fac173529e | 473 | void MQTTThreadedClient::sendPingRequest() |
| vpcola | 23:06fac173529e | 474 | { |
| vpcola | 23:06fac173529e | 475 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
| vpcola | 23:06fac173529e | 476 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
| vpcola | 23:06fac173529e | 477 | { |
| vpcola | 23:06fac173529e | 478 | printf("Ping request sent successfully ...\r\n"); |
| vpcola | 23:06fac173529e | 479 | } |
| vpcola | 23:06fac173529e | 480 | } |
| vpcola | 23:06fac173529e | 481 | |
| vpcola | 23:06fac173529e | 482 | void MQTTThreadedClient::startListener() |
| vpcola | 23:06fac173529e | 483 | { |
| vpcola | 23:06fac173529e | 484 | int pType; |
| vpcola | 23:06fac173529e | 485 | // Continuesly listens for packets and dispatch |
| vpcola | 23:06fac173529e | 486 | // message handlers ... |
| vpcola | 23:06fac173529e | 487 | while(true) |
| vpcola | 23:06fac173529e | 488 | { |
| vpcola | 23:06fac173529e | 489 | pType = readPacket(); |
| vpcola | 23:06fac173529e | 490 | switch(pType) |
| vpcola | 23:06fac173529e | 491 | { |
| vpcola | 23:06fac173529e | 492 | case TIMEOUT: |
| vpcola | 23:06fac173529e | 493 | // No data available from the network ... |
| vpcola | 23:06fac173529e | 494 | break; |
| vpcola | 23:06fac173529e | 495 | case FAILURE: |
| vpcola | 23:06fac173529e | 496 | case BUFFER_OVERFLOW: |
| vpcola | 23:06fac173529e | 497 | { |
| vpcola | 23:06fac173529e | 498 | // TODO: Network error, do we disconnect and reconnect? |
| vpcola | 23:06fac173529e | 499 | printf("Failure or buffer overflow problem ... \r\n"); |
| vpcola | 23:06fac173529e | 500 | MBED_ASSERT(false); |
| vpcola | 23:06fac173529e | 501 | } |
| vpcola | 23:06fac173529e | 502 | break; |
| vpcola | 23:06fac173529e | 503 | /** |
| vpcola | 23:06fac173529e | 504 | * The rest of the return codes below (all positive) is about MQTT |
| vpcola | 23:06fac173529e | 505 | * response codes |
| vpcola | 23:06fac173529e | 506 | **/ |
| vpcola | 23:06fac173529e | 507 | case CONNACK: |
| vpcola | 23:06fac173529e | 508 | case PUBACK: |
| vpcola | 23:06fac173529e | 509 | case SUBACK: |
| vpcola | 23:06fac173529e | 510 | break; |
| vpcola | 23:06fac173529e | 511 | case PUBLISH: |
| vpcola | 23:06fac173529e | 512 | { |
| vpcola | 23:06fac173529e | 513 | printf("Publish received!....\r\n"); |
| vpcola | 23:06fac173529e | 514 | // We receive data from the MQTT server .. |
| vpcola | 23:06fac173529e | 515 | if (handlePublishMsg() < 0) |
| vpcola | 23:06fac173529e | 516 | { |
| vpcola | 23:06fac173529e | 517 | printf("Error handling PUBLISH message ... \r\n"); |
| vpcola | 23:06fac173529e | 518 | break; |
| vpcola | 23:06fac173529e | 519 | } |
| vpcola | 23:06fac173529e | 520 | } |
| vpcola | 23:06fac173529e | 521 | break; |
| vpcola | 23:06fac173529e | 522 | case PINGRESP: |
| vpcola | 23:06fac173529e | 523 | { |
| vpcola | 23:06fac173529e | 524 | printf("Got ping response ...\r\n"); |
| vpcola | 23:06fac173529e | 525 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 526 | } |
| vpcola | 23:06fac173529e | 527 | break; |
| vpcola | 23:06fac173529e | 528 | default: |
| vpcola | 23:06fac173529e | 529 | printf("Unknown/Not handled message from server pType[%d]\r\n", pType); |
| vpcola | 23:06fac173529e | 530 | } |
| vpcola | 23:06fac173529e | 531 | |
| vpcola | 23:06fac173529e | 532 | // Check if its time to send a keepAlive packet |
| vpcola | 23:06fac173529e | 533 | if (hasConnectionTimedOut()) |
| vpcola | 23:06fac173529e | 534 | { |
| vpcola | 23:06fac173529e | 535 | // Queue the ping request so that other |
| vpcola | 23:06fac173529e | 536 | // pending operations queued above will go first |
| vpcola | 23:06fac173529e | 537 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
| vpcola | 23:06fac173529e | 538 | } |
| vpcola | 23:06fac173529e | 539 | |
| vpcola | 23:06fac173529e | 540 | // Check if we have messages on the message queue |
| vpcola | 23:06fac173529e | 541 | osEvent evt = mqueue.get(10); |
| vpcola | 23:06fac173529e | 542 | if (evt.status == osEventMessage) { |
| vpcola | 23:06fac173529e | 543 | |
| vpcola | 23:06fac173529e | 544 | printf("Got message to publish! ... \r\n"); |
| vpcola | 23:06fac173529e | 545 | |
| vpcola | 23:06fac173529e | 546 | // Unpack the message |
| vpcola | 23:06fac173529e | 547 | PubMessage * message = (PubMessage *)evt.value.p; |
| vpcola | 23:06fac173529e | 548 | |
| vpcola | 23:06fac173529e | 549 | // Send the packet, do not queue the call |
| vpcola | 23:06fac173529e | 550 | // like the ping above .. |
| vpcola | 23:06fac173529e | 551 | if ( sendPublish(*message) == SUCCESS) |
| vpcola | 23:06fac173529e | 552 | // Reset timers if we have been able to send successfully |
| vpcola | 23:06fac173529e | 553 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 554 | |
| vpcola | 23:06fac173529e | 555 | // Free the message from mempool after using |
| vpcola | 23:06fac173529e | 556 | mpool.free(message); |
| vpcola | 23:06fac173529e | 557 | } |
| vpcola | 23:06fac173529e | 558 | |
| vpcola | 23:06fac173529e | 559 | // Dispatch any queued events ... |
| vpcola | 23:06fac173529e | 560 | queue.dispatch(100); |
| vpcola | 23:06fac173529e | 561 | } |
| vpcola | 23:06fac173529e | 562 | |
| vpcola | 23:06fac173529e | 563 | } |
| vpcola | 23:06fac173529e | 564 |
