A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Committer:
vpcola
Date:
Sun Mar 26 04:35:46 2017 +0000
Revision:
23:06fac173529e
Child:
25:326f00faa092
Code is stable now in both publish/subscribe ...

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 23:06fac173529e 1 #include "mbed.h"
vpcola 23:06fac173529e 2 #include "rtos.h"
vpcola 23:06fac173529e 3 #include "MQTTThreadedClient.h"
vpcola 23:06fac173529e 4
vpcola 23:06fac173529e 5
vpcola 23:06fac173529e 6 static MemoryPool<PubMessage, 16> mpool;
vpcola 23:06fac173529e 7 static Queue<PubMessage, 16> mqueue;
vpcola 23:06fac173529e 8
vpcola 23:06fac173529e 9 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
vpcola 23:06fac173529e 10 {
vpcola 23:06fac173529e 11 int rc;
vpcola 23:06fac173529e 12
vpcola 23:06fac173529e 13 if (tcpSocket == NULL)
vpcola 23:06fac173529e 14 return -1;
vpcola 23:06fac173529e 15
vpcola 23:06fac173529e 16 // non-blocking socket ...
vpcola 23:06fac173529e 17 tcpSocket->set_timeout(timeout);
vpcola 23:06fac173529e 18 rc = tcpSocket->recv( (void *) buffer, size);
vpcola 23:06fac173529e 19
vpcola 23:06fac173529e 20 // return 0 bytes if timeout ...
vpcola 23:06fac173529e 21 if (NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 23:06fac173529e 22 return TIMEOUT;
vpcola 23:06fac173529e 23 else
vpcola 23:06fac173529e 24 return rc; // return the number of bytes received or error
vpcola 23:06fac173529e 25 }
vpcola 23:06fac173529e 26
vpcola 23:06fac173529e 27 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
vpcola 23:06fac173529e 28 {
vpcola 23:06fac173529e 29 int rc;
vpcola 23:06fac173529e 30
vpcola 23:06fac173529e 31 if (tcpSocket == NULL)
vpcola 23:06fac173529e 32 return -1;
vpcola 23:06fac173529e 33
vpcola 23:06fac173529e 34 // set the write timeout
vpcola 23:06fac173529e 35 tcpSocket->set_timeout(timeout);
vpcola 23:06fac173529e 36 rc = tcpSocket->send(buffer, size);
vpcola 23:06fac173529e 37
vpcola 23:06fac173529e 38 if ( NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 23:06fac173529e 39 return TIMEOUT;
vpcola 23:06fac173529e 40 else
vpcola 23:06fac173529e 41 return rc;
vpcola 23:06fac173529e 42 }
vpcola 23:06fac173529e 43
vpcola 23:06fac173529e 44 int MQTTThreadedClient::readPacketLength(int* value)
vpcola 23:06fac173529e 45 {
vpcola 23:06fac173529e 46 int rc = MQTTPACKET_READ_ERROR;
vpcola 23:06fac173529e 47 unsigned char c;
vpcola 23:06fac173529e 48 int multiplier = 1;
vpcola 23:06fac173529e 49 int len = 0;
vpcola 23:06fac173529e 50 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
vpcola 23:06fac173529e 51
vpcola 23:06fac173529e 52 *value = 0;
vpcola 23:06fac173529e 53 do
vpcola 23:06fac173529e 54 {
vpcola 23:06fac173529e 55 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
vpcola 23:06fac173529e 56 {
vpcola 23:06fac173529e 57 rc = MQTTPACKET_READ_ERROR; /* bad data */
vpcola 23:06fac173529e 58 goto exit;
vpcola 23:06fac173529e 59 }
vpcola 23:06fac173529e 60
vpcola 23:06fac173529e 61 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
vpcola 23:06fac173529e 62 if (rc != 1)
vpcola 23:06fac173529e 63 {
vpcola 23:06fac173529e 64 rc = MQTTPACKET_READ_ERROR;
vpcola 23:06fac173529e 65 goto exit;
vpcola 23:06fac173529e 66 }
vpcola 23:06fac173529e 67
vpcola 23:06fac173529e 68 *value += (c & 127) * multiplier;
vpcola 23:06fac173529e 69 multiplier *= 128;
vpcola 23:06fac173529e 70 } while ((c & 128) != 0);
vpcola 23:06fac173529e 71
vpcola 23:06fac173529e 72 rc = MQTTPACKET_READ_COMPLETE;
vpcola 23:06fac173529e 73
vpcola 23:06fac173529e 74 exit:
vpcola 23:06fac173529e 75 if (rc == MQTTPACKET_READ_ERROR )
vpcola 23:06fac173529e 76 len = -1;
vpcola 23:06fac173529e 77
vpcola 23:06fac173529e 78 return len;
vpcola 23:06fac173529e 79 }
vpcola 23:06fac173529e 80
vpcola 23:06fac173529e 81 int MQTTThreadedClient::sendPacket(size_t length)
vpcola 23:06fac173529e 82 {
vpcola 23:06fac173529e 83 int rc = FAILURE;
vpcola 23:06fac173529e 84 int sent = 0;
vpcola 23:06fac173529e 85
vpcola 23:06fac173529e 86 while (sent < length)
vpcola 23:06fac173529e 87 {
vpcola 23:06fac173529e 88 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
vpcola 23:06fac173529e 89 if (rc < 0) // there was an error writing the data
vpcola 23:06fac173529e 90 break;
vpcola 23:06fac173529e 91 sent += rc;
vpcola 23:06fac173529e 92 }
vpcola 23:06fac173529e 93
vpcola 23:06fac173529e 94 if (sent == length)
vpcola 23:06fac173529e 95 rc = SUCCESS;
vpcola 23:06fac173529e 96 else
vpcola 23:06fac173529e 97 rc = FAILURE;
vpcola 23:06fac173529e 98
vpcola 23:06fac173529e 99 return rc;
vpcola 23:06fac173529e 100 }
vpcola 23:06fac173529e 101 /**
vpcola 23:06fac173529e 102 * Reads the entire packet to readbuf and returns
vpcola 23:06fac173529e 103 * the type of packet when successful, otherwise
vpcola 23:06fac173529e 104 * a negative error code is returned.
vpcola 23:06fac173529e 105 **/
vpcola 23:06fac173529e 106 int MQTTThreadedClient::readPacket()
vpcola 23:06fac173529e 107 {
vpcola 23:06fac173529e 108 int rc = FAILURE;
vpcola 23:06fac173529e 109 MQTTHeader header = {0};
vpcola 23:06fac173529e 110 int len = 0;
vpcola 23:06fac173529e 111 int rem_len = 0;
vpcola 23:06fac173529e 112
vpcola 23:06fac173529e 113 /* 1. read the header byte. This has the packet type in it */
vpcola 23:06fac173529e 114 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
vpcola 23:06fac173529e 115 goto exit;
vpcola 23:06fac173529e 116
vpcola 23:06fac173529e 117 len = 1;
vpcola 23:06fac173529e 118 /* 2. read the remaining length. This is variable in itself */
vpcola 23:06fac173529e 119 if ( readPacketLength(&rem_len) < 0 )
vpcola 23:06fac173529e 120 goto exit;
vpcola 23:06fac173529e 121
vpcola 23:06fac173529e 122 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
vpcola 23:06fac173529e 123
vpcola 23:06fac173529e 124 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
vpcola 23:06fac173529e 125 {
vpcola 23:06fac173529e 126 rc = BUFFER_OVERFLOW;
vpcola 23:06fac173529e 127 goto exit;
vpcola 23:06fac173529e 128 }
vpcola 23:06fac173529e 129
vpcola 23:06fac173529e 130 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
vpcola 23:06fac173529e 131 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
vpcola 23:06fac173529e 132 goto exit;
vpcola 23:06fac173529e 133
vpcola 23:06fac173529e 134 // Convert the header to type
vpcola 23:06fac173529e 135 // and update rc
vpcola 23:06fac173529e 136 header.byte = readbuf[0];
vpcola 23:06fac173529e 137 rc = header.bits.type;
vpcola 23:06fac173529e 138
vpcola 23:06fac173529e 139 exit:
vpcola 23:06fac173529e 140
vpcola 23:06fac173529e 141 return rc;
vpcola 23:06fac173529e 142 }
vpcola 23:06fac173529e 143
vpcola 23:06fac173529e 144 /**
vpcola 23:06fac173529e 145 * Read until a specified packet type is received, or untill the specified
vpcola 23:06fac173529e 146 * timeout dropping packets along the way.
vpcola 23:06fac173529e 147 **/
vpcola 23:06fac173529e 148 int MQTTThreadedClient::readUntil(int packetType, int timeout)
vpcola 23:06fac173529e 149 {
vpcola 23:06fac173529e 150 int pType = FAILURE;
vpcola 23:06fac173529e 151 Timer timer;
vpcola 23:06fac173529e 152
vpcola 23:06fac173529e 153 timer.start();
vpcola 23:06fac173529e 154 do {
vpcola 23:06fac173529e 155 pType = readPacket();
vpcola 23:06fac173529e 156 if (pType < 0)
vpcola 23:06fac173529e 157 break;
vpcola 23:06fac173529e 158
vpcola 23:06fac173529e 159 if (timer.read_ms() > timeout)
vpcola 23:06fac173529e 160 {
vpcola 23:06fac173529e 161 pType = FAILURE;
vpcola 23:06fac173529e 162 break;
vpcola 23:06fac173529e 163 }
vpcola 23:06fac173529e 164 }while(pType != packetType);
vpcola 23:06fac173529e 165
vpcola 23:06fac173529e 166 return pType;
vpcola 23:06fac173529e 167 }
vpcola 23:06fac173529e 168
vpcola 23:06fac173529e 169
vpcola 23:06fac173529e 170 int MQTTThreadedClient::connect(MQTTPacket_connectData& options)
vpcola 23:06fac173529e 171 {
vpcola 23:06fac173529e 172 int rc = FAILURE;
vpcola 23:06fac173529e 173 int len = 0;
vpcola 23:06fac173529e 174
vpcola 23:06fac173529e 175 if (isConnected)
vpcola 23:06fac173529e 176 {
vpcola 23:06fac173529e 177 printf("Session already connected! \r\n");
vpcola 23:06fac173529e 178 return rc;
vpcola 23:06fac173529e 179 }
vpcola 23:06fac173529e 180
vpcola 23:06fac173529e 181 // Copy the keepAliveInterval value to local
vpcola 23:06fac173529e 182 // MQTT specifies in seconds, we have to multiply that
vpcola 23:06fac173529e 183 // amount for our 32 bit timers which accepts ms.
vpcola 23:06fac173529e 184 keepAliveInterval = (options.keepAliveInterval * 1000);
vpcola 23:06fac173529e 185
vpcola 23:06fac173529e 186 printf("Connecting with: \r\n");
vpcola 23:06fac173529e 187 printf("\tUsername: [%s]\r\n", options.username.cstring);
vpcola 23:06fac173529e 188 printf("\tPassword: [%s]\r\n", options.password.cstring);
vpcola 23:06fac173529e 189
vpcola 23:06fac173529e 190 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0)
vpcola 23:06fac173529e 191 {
vpcola 23:06fac173529e 192 printf("Error serializing connect packet ...\r\n");
vpcola 23:06fac173529e 193 return rc;
vpcola 23:06fac173529e 194 }
vpcola 23:06fac173529e 195 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
vpcola 23:06fac173529e 196 {
vpcola 23:06fac173529e 197 printf("Error sending the connect request packet ...\r\n");
vpcola 23:06fac173529e 198 return rc;
vpcola 23:06fac173529e 199 }
vpcola 23:06fac173529e 200
vpcola 23:06fac173529e 201 // Wait for the CONNACK
vpcola 23:06fac173529e 202 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
vpcola 23:06fac173529e 203 {
vpcola 23:06fac173529e 204 unsigned char connack_rc = 255;
vpcola 23:06fac173529e 205 bool sessionPresent = false;
vpcola 23:06fac173529e 206 printf("Connection acknowledgement received ... deserializing respones ...\r\n");
vpcola 23:06fac173529e 207 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 23:06fac173529e 208 rc = connack_rc;
vpcola 23:06fac173529e 209 else
vpcola 23:06fac173529e 210 rc = FAILURE;
vpcola 23:06fac173529e 211 }
vpcola 23:06fac173529e 212 else
vpcola 23:06fac173529e 213 rc = FAILURE;
vpcola 23:06fac173529e 214
vpcola 23:06fac173529e 215 if (rc == SUCCESS)
vpcola 23:06fac173529e 216 {
vpcola 23:06fac173529e 217 printf("Connected!!! ... starting connection timers ...\r\n");
vpcola 23:06fac173529e 218 isConnected = true;
vpcola 23:06fac173529e 219 resetConnectionTimer();
vpcola 23:06fac173529e 220 }else
vpcola 23:06fac173529e 221 {
vpcola 23:06fac173529e 222 // TODO: Call socket->disconnect()?
vpcola 23:06fac173529e 223 }
vpcola 23:06fac173529e 224
vpcola 23:06fac173529e 225 printf("Returning with rc = %d\r\n", rc);
vpcola 23:06fac173529e 226
vpcola 23:06fac173529e 227 return rc;
vpcola 23:06fac173529e 228 }
vpcola 23:06fac173529e 229
vpcola 23:06fac173529e 230 int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options)
vpcola 23:06fac173529e 231 {
vpcola 23:06fac173529e 232 int ret;
vpcola 23:06fac173529e 233
vpcola 23:06fac173529e 234 tcpSocket->open(network);
vpcola 23:06fac173529e 235 if (( ret = tcpSocket->connect(host, port)) < 0 )
vpcola 23:06fac173529e 236 {
vpcola 23:06fac173529e 237
vpcola 23:06fac173529e 238 printf("Error connecting to %s:%d with %d\r\n", host, port, ret);
vpcola 23:06fac173529e 239 return ret;
vpcola 23:06fac173529e 240 }
vpcola 23:06fac173529e 241
vpcola 23:06fac173529e 242 return connect(options);
vpcola 23:06fac173529e 243 }
vpcola 23:06fac173529e 244
vpcola 23:06fac173529e 245 int MQTTThreadedClient::publish(PubMessage& msg)
vpcola 23:06fac173529e 246 {
vpcola 23:06fac173529e 247 #if 0
vpcola 23:06fac173529e 248 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
vpcola 23:06fac173529e 249 // TODO: handle id values when the function is called later
vpcola 23:06fac173529e 250 if (id == 0)
vpcola 23:06fac173529e 251 return FAILURE;
vpcola 23:06fac173529e 252 else
vpcola 23:06fac173529e 253 return SUCCESS;
vpcola 23:06fac173529e 254 #endif
vpcola 23:06fac173529e 255 PubMessage *message = mpool.alloc();
vpcola 23:06fac173529e 256 // Simple copy
vpcola 23:06fac173529e 257 *message = msg;
vpcola 23:06fac173529e 258
vpcola 23:06fac173529e 259 // Push the data to the thread
vpcola 23:06fac173529e 260 printf("Pushing data to consumer thread ...\r\n");
vpcola 23:06fac173529e 261 mqueue.put(message);
vpcola 23:06fac173529e 262
vpcola 23:06fac173529e 263 return SUCCESS;
vpcola 23:06fac173529e 264 }
vpcola 23:06fac173529e 265
vpcola 23:06fac173529e 266 int MQTTThreadedClient::sendPublish(PubMessage& message)
vpcola 23:06fac173529e 267 {
vpcola 23:06fac173529e 268 MQTTString topicString = MQTTString_initializer;
vpcola 23:06fac173529e 269
vpcola 23:06fac173529e 270 if (!isConnected)
vpcola 23:06fac173529e 271 {
vpcola 23:06fac173529e 272 printf("Not connected!!! ...\r\n");
vpcola 23:06fac173529e 273 return FAILURE;
vpcola 23:06fac173529e 274 }
vpcola 23:06fac173529e 275
vpcola 23:06fac173529e 276 topicString.cstring = (char*) &message.topic[0];
vpcola 23:06fac173529e 277 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
vpcola 23:06fac173529e 278 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
vpcola 23:06fac173529e 279 if (len <= 0)
vpcola 23:06fac173529e 280 {
vpcola 23:06fac173529e 281 printf("Failed serializing message ...\r\n");
vpcola 23:06fac173529e 282 return FAILURE;
vpcola 23:06fac173529e 283 }
vpcola 23:06fac173529e 284
vpcola 23:06fac173529e 285 if (sendPacket(len) == SUCCESS)
vpcola 23:06fac173529e 286 {
vpcola 23:06fac173529e 287 printf("Successfully sent publish packet to server ...\r\n");
vpcola 23:06fac173529e 288 return SUCCESS;
vpcola 23:06fac173529e 289 }
vpcola 23:06fac173529e 290
vpcola 23:06fac173529e 291 printf("Failed to send publish packet to server ...\r\n");
vpcola 23:06fac173529e 292 return FAILURE;
vpcola 23:06fac173529e 293 }
vpcola 23:06fac173529e 294
vpcola 23:06fac173529e 295 int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &))
vpcola 23:06fac173529e 296 {
vpcola 23:06fac173529e 297 int rc = FAILURE;
vpcola 23:06fac173529e 298 int len = 0;
vpcola 23:06fac173529e 299
vpcola 23:06fac173529e 300 MQTTString topic = {(char*)topicstr, {0, 0}};
vpcola 23:06fac173529e 301 printf("Subscribing to topic [%s]\r\n", topicstr);
vpcola 23:06fac173529e 302
vpcola 23:06fac173529e 303 if (!isConnected)
vpcola 23:06fac173529e 304 {
vpcola 23:06fac173529e 305 printf("Session already connected!!\r\n");
vpcola 23:06fac173529e 306 return rc;
vpcola 23:06fac173529e 307 }
vpcola 23:06fac173529e 308
vpcola 23:06fac173529e 309 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
vpcola 23:06fac173529e 310 if (len <= 0)
vpcola 23:06fac173529e 311 {
vpcola 23:06fac173529e 312 printf("Error serializing subscribe packet ...\r\n");
vpcola 23:06fac173529e 313 return rc;
vpcola 23:06fac173529e 314 }
vpcola 23:06fac173529e 315
vpcola 23:06fac173529e 316 if ((rc = sendPacket(len)) != SUCCESS)
vpcola 23:06fac173529e 317 {
vpcola 23:06fac173529e 318 printf("Error sending subscribe packet [%d]\r\n", rc);
vpcola 23:06fac173529e 319 return rc;
vpcola 23:06fac173529e 320 }
vpcola 23:06fac173529e 321
vpcola 23:06fac173529e 322 printf("Waiting for subscription ack ...\r\n");
vpcola 23:06fac173529e 323 // Wait for SUBACK, dropping packets read along the way ...
vpcola 23:06fac173529e 324 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback
vpcola 23:06fac173529e 325 {
vpcola 23:06fac173529e 326 int count = 0, grantedQoS = -1;
vpcola 23:06fac173529e 327 unsigned short mypacketid;
vpcola 23:06fac173529e 328 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 23:06fac173529e 329 rc = grantedQoS; // 0, 1, 2 or 0x80
vpcola 23:06fac173529e 330 // For as long as we do not get 0x80 ..
vpcola 23:06fac173529e 331 if (rc != 0x80)
vpcola 23:06fac173529e 332 {
vpcola 23:06fac173529e 333 // Add message handlers to the map
vpcola 23:06fac173529e 334 FP<void,MessageData &> fp;
vpcola 23:06fac173529e 335 fp.attach(function);
vpcola 23:06fac173529e 336
vpcola 23:06fac173529e 337 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
vpcola 23:06fac173529e 338
vpcola 23:06fac173529e 339 // Reset connection timers here ...
vpcola 23:06fac173529e 340 resetConnectionTimer();
vpcola 23:06fac173529e 341
vpcola 23:06fac173529e 342 printf("Successfully subscribed to %s ...\r\n", topicstr);
vpcola 23:06fac173529e 343 rc = SUCCESS;
vpcola 23:06fac173529e 344 }else
vpcola 23:06fac173529e 345 {
vpcola 23:06fac173529e 346 printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr);
vpcola 23:06fac173529e 347 }
vpcola 23:06fac173529e 348 }
vpcola 23:06fac173529e 349 else
vpcola 23:06fac173529e 350 {
vpcola 23:06fac173529e 351 printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr);
vpcola 23:06fac173529e 352 rc = FAILURE;
vpcola 23:06fac173529e 353 }
vpcola 23:06fac173529e 354
vpcola 23:06fac173529e 355 return rc;
vpcola 23:06fac173529e 356
vpcola 23:06fac173529e 357 }
vpcola 23:06fac173529e 358
vpcola 23:06fac173529e 359
vpcola 23:06fac173529e 360 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
vpcola 23:06fac173529e 361 {
vpcola 23:06fac173529e 362 char* curf = topicFilter;
vpcola 23:06fac173529e 363 char* curn = topicName.lenstring.data;
vpcola 23:06fac173529e 364 char* curn_end = curn + topicName.lenstring.len;
vpcola 23:06fac173529e 365
vpcola 23:06fac173529e 366 while (*curf && curn < curn_end)
vpcola 23:06fac173529e 367 {
vpcola 23:06fac173529e 368 if (*curn == '/' && *curf != '/')
vpcola 23:06fac173529e 369 break;
vpcola 23:06fac173529e 370 if (*curf != '+' && *curf != '#' && *curf != *curn)
vpcola 23:06fac173529e 371 break;
vpcola 23:06fac173529e 372 if (*curf == '+')
vpcola 23:06fac173529e 373 { // skip until we meet the next separator, or end of string
vpcola 23:06fac173529e 374 char* nextpos = curn + 1;
vpcola 23:06fac173529e 375 while (nextpos < curn_end && *nextpos != '/')
vpcola 23:06fac173529e 376 nextpos = ++curn + 1;
vpcola 23:06fac173529e 377 }
vpcola 23:06fac173529e 378 else if (*curf == '#')
vpcola 23:06fac173529e 379 curn = curn_end - 1; // skip until end of string
vpcola 23:06fac173529e 380 curf++;
vpcola 23:06fac173529e 381 curn++;
vpcola 23:06fac173529e 382 };
vpcola 23:06fac173529e 383
vpcola 23:06fac173529e 384 return (curn == curn_end) && (*curf == '\0');
vpcola 23:06fac173529e 385 }
vpcola 23:06fac173529e 386
vpcola 23:06fac173529e 387 int MQTTThreadedClient::handlePublishMsg()
vpcola 23:06fac173529e 388 {
vpcola 23:06fac173529e 389 MQTTString topicName = MQTTString_initializer;
vpcola 23:06fac173529e 390 Message msg;
vpcola 23:06fac173529e 391 int intQoS;
vpcola 23:06fac173529e 392 printf("Deserializing publish message ...\r\n");
vpcola 23:06fac173529e 393 if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
vpcola 23:06fac173529e 394 &intQoS,
vpcola 23:06fac173529e 395 (unsigned char*)&msg.retained,
vpcola 23:06fac173529e 396 (unsigned short*)&msg.id,
vpcola 23:06fac173529e 397 &topicName,
vpcola 23:06fac173529e 398 (unsigned char**)&msg.payload,
vpcola 23:06fac173529e 399 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
vpcola 23:06fac173529e 400 {
vpcola 23:06fac173529e 401 printf("Error deserializing published message ...\r\n");
vpcola 23:06fac173529e 402 return -1;
vpcola 23:06fac173529e 403 }
vpcola 23:06fac173529e 404
vpcola 23:06fac173529e 405 std::string topic;
vpcola 23:06fac173529e 406 if (topicName.lenstring.len > 0)
vpcola 23:06fac173529e 407 {
vpcola 23:06fac173529e 408 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
vpcola 23:06fac173529e 409 }else
vpcola 23:06fac173529e 410 topic = (const char *) topicName.cstring;
vpcola 23:06fac173529e 411
vpcola 23:06fac173529e 412 printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS);
vpcola 23:06fac173529e 413
vpcola 23:06fac173529e 414 msg.qos = (QoS) intQoS;
vpcola 23:06fac173529e 415
vpcola 23:06fac173529e 416
vpcola 23:06fac173529e 417 // Call the handlers for each topic
vpcola 23:06fac173529e 418 if (topicCBMap.find(topic) != topicCBMap.end())
vpcola 23:06fac173529e 419 {
vpcola 23:06fac173529e 420 // Call the callback function
vpcola 23:06fac173529e 421 if (topicCBMap[topic].attached())
vpcola 23:06fac173529e 422 {
vpcola 23:06fac173529e 423 printf("Invoking function handler for topic ...\r\n");
vpcola 23:06fac173529e 424 MessageData md(topicName, msg);
vpcola 23:06fac173529e 425 topicCBMap[topic](md);
vpcola 23:06fac173529e 426
vpcola 23:06fac173529e 427 return 1;
vpcola 23:06fac173529e 428 }
vpcola 23:06fac173529e 429 }
vpcola 23:06fac173529e 430
vpcola 23:06fac173529e 431 // TODO: depending on the QoS
vpcola 23:06fac173529e 432 // we send data to the server = PUBACK or PUBREC
vpcola 23:06fac173529e 433 switch(intQoS)
vpcola 23:06fac173529e 434 {
vpcola 23:06fac173529e 435 case QOS0:
vpcola 23:06fac173529e 436 // We send back nothing ...
vpcola 23:06fac173529e 437 break;
vpcola 23:06fac173529e 438 case QOS1:
vpcola 23:06fac173529e 439 // TODO: implement
vpcola 23:06fac173529e 440 break;
vpcola 23:06fac173529e 441 case QOS2:
vpcola 23:06fac173529e 442 // TODO: implement
vpcola 23:06fac173529e 443 break;
vpcola 23:06fac173529e 444 default:
vpcola 23:06fac173529e 445 break;
vpcola 23:06fac173529e 446 }
vpcola 23:06fac173529e 447
vpcola 23:06fac173529e 448 return 0;
vpcola 23:06fac173529e 449 }
vpcola 23:06fac173529e 450
vpcola 23:06fac173529e 451 void MQTTThreadedClient::resetConnectionTimer()
vpcola 23:06fac173529e 452 {
vpcola 23:06fac173529e 453 if (keepAliveInterval > 0)
vpcola 23:06fac173529e 454 {
vpcola 23:06fac173529e 455 comTimer.reset();
vpcola 23:06fac173529e 456 comTimer.start();
vpcola 23:06fac173529e 457 }
vpcola 23:06fac173529e 458 }
vpcola 23:06fac173529e 459
vpcola 23:06fac173529e 460 bool MQTTThreadedClient::hasConnectionTimedOut()
vpcola 23:06fac173529e 461 {
vpcola 23:06fac173529e 462 if (keepAliveInterval > 0 ) {
vpcola 23:06fac173529e 463 // Check connection timer
vpcola 23:06fac173529e 464 if (comTimer.read_ms() > keepAliveInterval)
vpcola 23:06fac173529e 465 return true;
vpcola 23:06fac173529e 466 else
vpcola 23:06fac173529e 467 return false;
vpcola 23:06fac173529e 468 }
vpcola 23:06fac173529e 469
vpcola 23:06fac173529e 470 return false;
vpcola 23:06fac173529e 471 }
vpcola 23:06fac173529e 472
vpcola 23:06fac173529e 473 void MQTTThreadedClient::sendPingRequest()
vpcola 23:06fac173529e 474 {
vpcola 23:06fac173529e 475 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
vpcola 23:06fac173529e 476 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
vpcola 23:06fac173529e 477 {
vpcola 23:06fac173529e 478 printf("Ping request sent successfully ...\r\n");
vpcola 23:06fac173529e 479 }
vpcola 23:06fac173529e 480 }
vpcola 23:06fac173529e 481
vpcola 23:06fac173529e 482 void MQTTThreadedClient::startListener()
vpcola 23:06fac173529e 483 {
vpcola 23:06fac173529e 484 int pType;
vpcola 23:06fac173529e 485 // Continuesly listens for packets and dispatch
vpcola 23:06fac173529e 486 // message handlers ...
vpcola 23:06fac173529e 487 while(true)
vpcola 23:06fac173529e 488 {
vpcola 23:06fac173529e 489 pType = readPacket();
vpcola 23:06fac173529e 490 switch(pType)
vpcola 23:06fac173529e 491 {
vpcola 23:06fac173529e 492 case TIMEOUT:
vpcola 23:06fac173529e 493 // No data available from the network ...
vpcola 23:06fac173529e 494 break;
vpcola 23:06fac173529e 495 case FAILURE:
vpcola 23:06fac173529e 496 case BUFFER_OVERFLOW:
vpcola 23:06fac173529e 497 {
vpcola 23:06fac173529e 498 // TODO: Network error, do we disconnect and reconnect?
vpcola 23:06fac173529e 499 printf("Failure or buffer overflow problem ... \r\n");
vpcola 23:06fac173529e 500 MBED_ASSERT(false);
vpcola 23:06fac173529e 501 }
vpcola 23:06fac173529e 502 break;
vpcola 23:06fac173529e 503 /**
vpcola 23:06fac173529e 504 * The rest of the return codes below (all positive) is about MQTT
vpcola 23:06fac173529e 505 * response codes
vpcola 23:06fac173529e 506 **/
vpcola 23:06fac173529e 507 case CONNACK:
vpcola 23:06fac173529e 508 case PUBACK:
vpcola 23:06fac173529e 509 case SUBACK:
vpcola 23:06fac173529e 510 break;
vpcola 23:06fac173529e 511 case PUBLISH:
vpcola 23:06fac173529e 512 {
vpcola 23:06fac173529e 513 printf("Publish received!....\r\n");
vpcola 23:06fac173529e 514 // We receive data from the MQTT server ..
vpcola 23:06fac173529e 515 if (handlePublishMsg() < 0)
vpcola 23:06fac173529e 516 {
vpcola 23:06fac173529e 517 printf("Error handling PUBLISH message ... \r\n");
vpcola 23:06fac173529e 518 break;
vpcola 23:06fac173529e 519 }
vpcola 23:06fac173529e 520 }
vpcola 23:06fac173529e 521 break;
vpcola 23:06fac173529e 522 case PINGRESP:
vpcola 23:06fac173529e 523 {
vpcola 23:06fac173529e 524 printf("Got ping response ...\r\n");
vpcola 23:06fac173529e 525 resetConnectionTimer();
vpcola 23:06fac173529e 526 }
vpcola 23:06fac173529e 527 break;
vpcola 23:06fac173529e 528 default:
vpcola 23:06fac173529e 529 printf("Unknown/Not handled message from server pType[%d]\r\n", pType);
vpcola 23:06fac173529e 530 }
vpcola 23:06fac173529e 531
vpcola 23:06fac173529e 532 // Check if its time to send a keepAlive packet
vpcola 23:06fac173529e 533 if (hasConnectionTimedOut())
vpcola 23:06fac173529e 534 {
vpcola 23:06fac173529e 535 // Queue the ping request so that other
vpcola 23:06fac173529e 536 // pending operations queued above will go first
vpcola 23:06fac173529e 537 queue.call(this, &MQTTThreadedClient::sendPingRequest);
vpcola 23:06fac173529e 538 }
vpcola 23:06fac173529e 539
vpcola 23:06fac173529e 540 // Check if we have messages on the message queue
vpcola 23:06fac173529e 541 osEvent evt = mqueue.get(10);
vpcola 23:06fac173529e 542 if (evt.status == osEventMessage) {
vpcola 23:06fac173529e 543
vpcola 23:06fac173529e 544 printf("Got message to publish! ... \r\n");
vpcola 23:06fac173529e 545
vpcola 23:06fac173529e 546 // Unpack the message
vpcola 23:06fac173529e 547 PubMessage * message = (PubMessage *)evt.value.p;
vpcola 23:06fac173529e 548
vpcola 23:06fac173529e 549 // Send the packet, do not queue the call
vpcola 23:06fac173529e 550 // like the ping above ..
vpcola 23:06fac173529e 551 if ( sendPublish(*message) == SUCCESS)
vpcola 23:06fac173529e 552 // Reset timers if we have been able to send successfully
vpcola 23:06fac173529e 553 resetConnectionTimer();
vpcola 23:06fac173529e 554
vpcola 23:06fac173529e 555 // Free the message from mempool after using
vpcola 23:06fac173529e 556 mpool.free(message);
vpcola 23:06fac173529e 557 }
vpcola 23:06fac173529e 558
vpcola 23:06fac173529e 559 // Dispatch any queued events ...
vpcola 23:06fac173529e 560 queue.dispatch(100);
vpcola 23:06fac173529e 561 }
vpcola 23:06fac173529e 562
vpcola 23:06fac173529e 563 }
vpcola 23:06fac173529e 564