A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.cpp@23:06fac173529e, 2017-03-26 (annotated)
- Committer:
- vpcola
- Date:
- Sun Mar 26 04:35:46 2017 +0000
- Revision:
- 23:06fac173529e
- Child:
- 25:326f00faa092
Code is stable now in both publish/subscribe ...
Who changed what in which revision?
| User | Revision | Line number | New contents of line |
|---|---|---|---|
| vpcola | 23:06fac173529e | 1 | #include "mbed.h" |
| vpcola | 23:06fac173529e | 2 | #include "rtos.h" |
| vpcola | 23:06fac173529e | 3 | #include "MQTTThreadedClient.h" |
| vpcola | 23:06fac173529e | 4 | |
| vpcola | 23:06fac173529e | 5 | |
| vpcola | 23:06fac173529e | 6 | static MemoryPool<PubMessage, 16> mpool; |
| vpcola | 23:06fac173529e | 7 | static Queue<PubMessage, 16> mqueue; |
| vpcola | 23:06fac173529e | 8 | |
| vpcola | 23:06fac173529e | 9 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
| vpcola | 23:06fac173529e | 10 | { |
| vpcola | 23:06fac173529e | 11 | int rc; |
| vpcola | 23:06fac173529e | 12 | |
| vpcola | 23:06fac173529e | 13 | if (tcpSocket == NULL) |
| vpcola | 23:06fac173529e | 14 | return -1; |
| vpcola | 23:06fac173529e | 15 | |
| vpcola | 23:06fac173529e | 16 | // non-blocking socket ... |
| vpcola | 23:06fac173529e | 17 | tcpSocket->set_timeout(timeout); |
| vpcola | 23:06fac173529e | 18 | rc = tcpSocket->recv( (void *) buffer, size); |
| vpcola | 23:06fac173529e | 19 | |
| vpcola | 23:06fac173529e | 20 | // return 0 bytes if timeout ... |
| vpcola | 23:06fac173529e | 21 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
| vpcola | 23:06fac173529e | 22 | return TIMEOUT; |
| vpcola | 23:06fac173529e | 23 | else |
| vpcola | 23:06fac173529e | 24 | return rc; // return the number of bytes received or error |
| vpcola | 23:06fac173529e | 25 | } |
| vpcola | 23:06fac173529e | 26 | |
| vpcola | 23:06fac173529e | 27 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
| vpcola | 23:06fac173529e | 28 | { |
| vpcola | 23:06fac173529e | 29 | int rc; |
| vpcola | 23:06fac173529e | 30 | |
| vpcola | 23:06fac173529e | 31 | if (tcpSocket == NULL) |
| vpcola | 23:06fac173529e | 32 | return -1; |
| vpcola | 23:06fac173529e | 33 | |
| vpcola | 23:06fac173529e | 34 | // set the write timeout |
| vpcola | 23:06fac173529e | 35 | tcpSocket->set_timeout(timeout); |
| vpcola | 23:06fac173529e | 36 | rc = tcpSocket->send(buffer, size); |
| vpcola | 23:06fac173529e | 37 | |
| vpcola | 23:06fac173529e | 38 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
| vpcola | 23:06fac173529e | 39 | return TIMEOUT; |
| vpcola | 23:06fac173529e | 40 | else |
| vpcola | 23:06fac173529e | 41 | return rc; |
| vpcola | 23:06fac173529e | 42 | } |
| vpcola | 23:06fac173529e | 43 | |
| vpcola | 23:06fac173529e | 44 | int MQTTThreadedClient::readPacketLength(int* value) |
| vpcola | 23:06fac173529e | 45 | { |
| vpcola | 23:06fac173529e | 46 | int rc = MQTTPACKET_READ_ERROR; |
| vpcola | 23:06fac173529e | 47 | unsigned char c; |
| vpcola | 23:06fac173529e | 48 | int multiplier = 1; |
| vpcola | 23:06fac173529e | 49 | int len = 0; |
| vpcola | 23:06fac173529e | 50 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
| vpcola | 23:06fac173529e | 51 | |
| vpcola | 23:06fac173529e | 52 | *value = 0; |
| vpcola | 23:06fac173529e | 53 | do |
| vpcola | 23:06fac173529e | 54 | { |
| vpcola | 23:06fac173529e | 55 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
| vpcola | 23:06fac173529e | 56 | { |
| vpcola | 23:06fac173529e | 57 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
| vpcola | 23:06fac173529e | 58 | goto exit; |
| vpcola | 23:06fac173529e | 59 | } |
| vpcola | 23:06fac173529e | 60 | |
| vpcola | 23:06fac173529e | 61 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
| vpcola | 23:06fac173529e | 62 | if (rc != 1) |
| vpcola | 23:06fac173529e | 63 | { |
| vpcola | 23:06fac173529e | 64 | rc = MQTTPACKET_READ_ERROR; |
| vpcola | 23:06fac173529e | 65 | goto exit; |
| vpcola | 23:06fac173529e | 66 | } |
| vpcola | 23:06fac173529e | 67 | |
| vpcola | 23:06fac173529e | 68 | *value += (c & 127) * multiplier; |
| vpcola | 23:06fac173529e | 69 | multiplier *= 128; |
| vpcola | 23:06fac173529e | 70 | } while ((c & 128) != 0); |
| vpcola | 23:06fac173529e | 71 | |
| vpcola | 23:06fac173529e | 72 | rc = MQTTPACKET_READ_COMPLETE; |
| vpcola | 23:06fac173529e | 73 | |
| vpcola | 23:06fac173529e | 74 | exit: |
| vpcola | 23:06fac173529e | 75 | if (rc == MQTTPACKET_READ_ERROR ) |
| vpcola | 23:06fac173529e | 76 | len = -1; |
| vpcola | 23:06fac173529e | 77 | |
| vpcola | 23:06fac173529e | 78 | return len; |
| vpcola | 23:06fac173529e | 79 | } |
| vpcola | 23:06fac173529e | 80 | |
| vpcola | 23:06fac173529e | 81 | int MQTTThreadedClient::sendPacket(size_t length) |
| vpcola | 23:06fac173529e | 82 | { |
| vpcola | 23:06fac173529e | 83 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 84 | int sent = 0; |
| vpcola | 23:06fac173529e | 85 | |
| vpcola | 23:06fac173529e | 86 | while (sent < length) |
| vpcola | 23:06fac173529e | 87 | { |
| vpcola | 23:06fac173529e | 88 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
| vpcola | 23:06fac173529e | 89 | if (rc < 0) // there was an error writing the data |
| vpcola | 23:06fac173529e | 90 | break; |
| vpcola | 23:06fac173529e | 91 | sent += rc; |
| vpcola | 23:06fac173529e | 92 | } |
| vpcola | 23:06fac173529e | 93 | |
| vpcola | 23:06fac173529e | 94 | if (sent == length) |
| vpcola | 23:06fac173529e | 95 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 96 | else |
| vpcola | 23:06fac173529e | 97 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 98 | |
| vpcola | 23:06fac173529e | 99 | return rc; |
| vpcola | 23:06fac173529e | 100 | } |
| vpcola | 23:06fac173529e | 101 | /** |
| vpcola | 23:06fac173529e | 102 | * Reads the entire packet to readbuf and returns |
| vpcola | 23:06fac173529e | 103 | * the type of packet when successful, otherwise |
| vpcola | 23:06fac173529e | 104 | * a negative error code is returned. |
| vpcola | 23:06fac173529e | 105 | **/ |
| vpcola | 23:06fac173529e | 106 | int MQTTThreadedClient::readPacket() |
| vpcola | 23:06fac173529e | 107 | { |
| vpcola | 23:06fac173529e | 108 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 109 | MQTTHeader header = {0}; |
| vpcola | 23:06fac173529e | 110 | int len = 0; |
| vpcola | 23:06fac173529e | 111 | int rem_len = 0; |
| vpcola | 23:06fac173529e | 112 | |
| vpcola | 23:06fac173529e | 113 | /* 1. read the header byte. This has the packet type in it */ |
| vpcola | 23:06fac173529e | 114 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
| vpcola | 23:06fac173529e | 115 | goto exit; |
| vpcola | 23:06fac173529e | 116 | |
| vpcola | 23:06fac173529e | 117 | len = 1; |
| vpcola | 23:06fac173529e | 118 | /* 2. read the remaining length. This is variable in itself */ |
| vpcola | 23:06fac173529e | 119 | if ( readPacketLength(&rem_len) < 0 ) |
| vpcola | 23:06fac173529e | 120 | goto exit; |
| vpcola | 23:06fac173529e | 121 | |
| vpcola | 23:06fac173529e | 122 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
| vpcola | 23:06fac173529e | 123 | |
| vpcola | 23:06fac173529e | 124 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
| vpcola | 23:06fac173529e | 125 | { |
| vpcola | 23:06fac173529e | 126 | rc = BUFFER_OVERFLOW; |
| vpcola | 23:06fac173529e | 127 | goto exit; |
| vpcola | 23:06fac173529e | 128 | } |
| vpcola | 23:06fac173529e | 129 | |
| vpcola | 23:06fac173529e | 130 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
| vpcola | 23:06fac173529e | 131 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
| vpcola | 23:06fac173529e | 132 | goto exit; |
| vpcola | 23:06fac173529e | 133 | |
| vpcola | 23:06fac173529e | 134 | // Convert the header to type |
| vpcola | 23:06fac173529e | 135 | // and update rc |
| vpcola | 23:06fac173529e | 136 | header.byte = readbuf[0]; |
| vpcola | 23:06fac173529e | 137 | rc = header.bits.type; |
| vpcola | 23:06fac173529e | 138 | |
| vpcola | 23:06fac173529e | 139 | exit: |
| vpcola | 23:06fac173529e | 140 | |
| vpcola | 23:06fac173529e | 141 | return rc; |
| vpcola | 23:06fac173529e | 142 | } |
| vpcola | 23:06fac173529e | 143 | |
| vpcola | 23:06fac173529e | 144 | /** |
| vpcola | 23:06fac173529e | 145 | * Read until a specified packet type is received, or untill the specified |
| vpcola | 23:06fac173529e | 146 | * timeout dropping packets along the way. |
| vpcola | 23:06fac173529e | 147 | **/ |
| vpcola | 23:06fac173529e | 148 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
| vpcola | 23:06fac173529e | 149 | { |
| vpcola | 23:06fac173529e | 150 | int pType = FAILURE; |
| vpcola | 23:06fac173529e | 151 | Timer timer; |
| vpcola | 23:06fac173529e | 152 | |
| vpcola | 23:06fac173529e | 153 | timer.start(); |
| vpcola | 23:06fac173529e | 154 | do { |
| vpcola | 23:06fac173529e | 155 | pType = readPacket(); |
| vpcola | 23:06fac173529e | 156 | if (pType < 0) |
| vpcola | 23:06fac173529e | 157 | break; |
| vpcola | 23:06fac173529e | 158 | |
| vpcola | 23:06fac173529e | 159 | if (timer.read_ms() > timeout) |
| vpcola | 23:06fac173529e | 160 | { |
| vpcola | 23:06fac173529e | 161 | pType = FAILURE; |
| vpcola | 23:06fac173529e | 162 | break; |
| vpcola | 23:06fac173529e | 163 | } |
| vpcola | 23:06fac173529e | 164 | }while(pType != packetType); |
| vpcola | 23:06fac173529e | 165 | |
| vpcola | 23:06fac173529e | 166 | return pType; |
| vpcola | 23:06fac173529e | 167 | } |
| vpcola | 23:06fac173529e | 168 | |
| vpcola | 23:06fac173529e | 169 | |
| vpcola | 23:06fac173529e | 170 | int MQTTThreadedClient::connect(MQTTPacket_connectData& options) |
| vpcola | 23:06fac173529e | 171 | { |
| vpcola | 23:06fac173529e | 172 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 173 | int len = 0; |
| vpcola | 23:06fac173529e | 174 | |
| vpcola | 23:06fac173529e | 175 | if (isConnected) |
| vpcola | 23:06fac173529e | 176 | { |
| vpcola | 23:06fac173529e | 177 | printf("Session already connected! \r\n"); |
| vpcola | 23:06fac173529e | 178 | return rc; |
| vpcola | 23:06fac173529e | 179 | } |
| vpcola | 23:06fac173529e | 180 | |
| vpcola | 23:06fac173529e | 181 | // Copy the keepAliveInterval value to local |
| vpcola | 23:06fac173529e | 182 | // MQTT specifies in seconds, we have to multiply that |
| vpcola | 23:06fac173529e | 183 | // amount for our 32 bit timers which accepts ms. |
| vpcola | 23:06fac173529e | 184 | keepAliveInterval = (options.keepAliveInterval * 1000); |
| vpcola | 23:06fac173529e | 185 | |
| vpcola | 23:06fac173529e | 186 | printf("Connecting with: \r\n"); |
| vpcola | 23:06fac173529e | 187 | printf("\tUsername: [%s]\r\n", options.username.cstring); |
| vpcola | 23:06fac173529e | 188 | printf("\tPassword: [%s]\r\n", options.password.cstring); |
| vpcola | 23:06fac173529e | 189 | |
| vpcola | 23:06fac173529e | 190 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) |
| vpcola | 23:06fac173529e | 191 | { |
| vpcola | 23:06fac173529e | 192 | printf("Error serializing connect packet ...\r\n"); |
| vpcola | 23:06fac173529e | 193 | return rc; |
| vpcola | 23:06fac173529e | 194 | } |
| vpcola | 23:06fac173529e | 195 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
| vpcola | 23:06fac173529e | 196 | { |
| vpcola | 23:06fac173529e | 197 | printf("Error sending the connect request packet ...\r\n"); |
| vpcola | 23:06fac173529e | 198 | return rc; |
| vpcola | 23:06fac173529e | 199 | } |
| vpcola | 23:06fac173529e | 200 | |
| vpcola | 23:06fac173529e | 201 | // Wait for the CONNACK |
| vpcola | 23:06fac173529e | 202 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
| vpcola | 23:06fac173529e | 203 | { |
| vpcola | 23:06fac173529e | 204 | unsigned char connack_rc = 255; |
| vpcola | 23:06fac173529e | 205 | bool sessionPresent = false; |
| vpcola | 23:06fac173529e | 206 | printf("Connection acknowledgement received ... deserializing respones ...\r\n"); |
| vpcola | 23:06fac173529e | 207 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 208 | rc = connack_rc; |
| vpcola | 23:06fac173529e | 209 | else |
| vpcola | 23:06fac173529e | 210 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 211 | } |
| vpcola | 23:06fac173529e | 212 | else |
| vpcola | 23:06fac173529e | 213 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 214 | |
| vpcola | 23:06fac173529e | 215 | if (rc == SUCCESS) |
| vpcola | 23:06fac173529e | 216 | { |
| vpcola | 23:06fac173529e | 217 | printf("Connected!!! ... starting connection timers ...\r\n"); |
| vpcola | 23:06fac173529e | 218 | isConnected = true; |
| vpcola | 23:06fac173529e | 219 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 220 | }else |
| vpcola | 23:06fac173529e | 221 | { |
| vpcola | 23:06fac173529e | 222 | // TODO: Call socket->disconnect()? |
| vpcola | 23:06fac173529e | 223 | } |
| vpcola | 23:06fac173529e | 224 | |
| vpcola | 23:06fac173529e | 225 | printf("Returning with rc = %d\r\n", rc); |
| vpcola | 23:06fac173529e | 226 | |
| vpcola | 23:06fac173529e | 227 | return rc; |
| vpcola | 23:06fac173529e | 228 | } |
| vpcola | 23:06fac173529e | 229 | |
| vpcola | 23:06fac173529e | 230 | int MQTTThreadedClient::connect(const char * host, uint16_t port, MQTTPacket_connectData & options) |
| vpcola | 23:06fac173529e | 231 | { |
| vpcola | 23:06fac173529e | 232 | int ret; |
| vpcola | 23:06fac173529e | 233 | |
| vpcola | 23:06fac173529e | 234 | tcpSocket->open(network); |
| vpcola | 23:06fac173529e | 235 | if (( ret = tcpSocket->connect(host, port)) < 0 ) |
| vpcola | 23:06fac173529e | 236 | { |
| vpcola | 23:06fac173529e | 237 | |
| vpcola | 23:06fac173529e | 238 | printf("Error connecting to %s:%d with %d\r\n", host, port, ret); |
| vpcola | 23:06fac173529e | 239 | return ret; |
| vpcola | 23:06fac173529e | 240 | } |
| vpcola | 23:06fac173529e | 241 | |
| vpcola | 23:06fac173529e | 242 | return connect(options); |
| vpcola | 23:06fac173529e | 243 | } |
| vpcola | 23:06fac173529e | 244 | |
| vpcola | 23:06fac173529e | 245 | int MQTTThreadedClient::publish(PubMessage& msg) |
| vpcola | 23:06fac173529e | 246 | { |
| vpcola | 23:06fac173529e | 247 | #if 0 |
| vpcola | 23:06fac173529e | 248 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
| vpcola | 23:06fac173529e | 249 | // TODO: handle id values when the function is called later |
| vpcola | 23:06fac173529e | 250 | if (id == 0) |
| vpcola | 23:06fac173529e | 251 | return FAILURE; |
| vpcola | 23:06fac173529e | 252 | else |
| vpcola | 23:06fac173529e | 253 | return SUCCESS; |
| vpcola | 23:06fac173529e | 254 | #endif |
| vpcola | 23:06fac173529e | 255 | PubMessage *message = mpool.alloc(); |
| vpcola | 23:06fac173529e | 256 | // Simple copy |
| vpcola | 23:06fac173529e | 257 | *message = msg; |
| vpcola | 23:06fac173529e | 258 | |
| vpcola | 23:06fac173529e | 259 | // Push the data to the thread |
| vpcola | 23:06fac173529e | 260 | printf("Pushing data to consumer thread ...\r\n"); |
| vpcola | 23:06fac173529e | 261 | mqueue.put(message); |
| vpcola | 23:06fac173529e | 262 | |
| vpcola | 23:06fac173529e | 263 | return SUCCESS; |
| vpcola | 23:06fac173529e | 264 | } |
| vpcola | 23:06fac173529e | 265 | |
| vpcola | 23:06fac173529e | 266 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
| vpcola | 23:06fac173529e | 267 | { |
| vpcola | 23:06fac173529e | 268 | MQTTString topicString = MQTTString_initializer; |
| vpcola | 23:06fac173529e | 269 | |
| vpcola | 23:06fac173529e | 270 | if (!isConnected) |
| vpcola | 23:06fac173529e | 271 | { |
| vpcola | 23:06fac173529e | 272 | printf("Not connected!!! ...\r\n"); |
| vpcola | 23:06fac173529e | 273 | return FAILURE; |
| vpcola | 23:06fac173529e | 274 | } |
| vpcola | 23:06fac173529e | 275 | |
| vpcola | 23:06fac173529e | 276 | topicString.cstring = (char*) &message.topic[0]; |
| vpcola | 23:06fac173529e | 277 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
| vpcola | 23:06fac173529e | 278 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
| vpcola | 23:06fac173529e | 279 | if (len <= 0) |
| vpcola | 23:06fac173529e | 280 | { |
| vpcola | 23:06fac173529e | 281 | printf("Failed serializing message ...\r\n"); |
| vpcola | 23:06fac173529e | 282 | return FAILURE; |
| vpcola | 23:06fac173529e | 283 | } |
| vpcola | 23:06fac173529e | 284 | |
| vpcola | 23:06fac173529e | 285 | if (sendPacket(len) == SUCCESS) |
| vpcola | 23:06fac173529e | 286 | { |
| vpcola | 23:06fac173529e | 287 | printf("Successfully sent publish packet to server ...\r\n"); |
| vpcola | 23:06fac173529e | 288 | return SUCCESS; |
| vpcola | 23:06fac173529e | 289 | } |
| vpcola | 23:06fac173529e | 290 | |
| vpcola | 23:06fac173529e | 291 | printf("Failed to send publish packet to server ...\r\n"); |
| vpcola | 23:06fac173529e | 292 | return FAILURE; |
| vpcola | 23:06fac173529e | 293 | } |
| vpcola | 23:06fac173529e | 294 | |
| vpcola | 23:06fac173529e | 295 | int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &)) |
| vpcola | 23:06fac173529e | 296 | { |
| vpcola | 23:06fac173529e | 297 | int rc = FAILURE; |
| vpcola | 23:06fac173529e | 298 | int len = 0; |
| vpcola | 23:06fac173529e | 299 | |
| vpcola | 23:06fac173529e | 300 | MQTTString topic = {(char*)topicstr, {0, 0}}; |
| vpcola | 23:06fac173529e | 301 | printf("Subscribing to topic [%s]\r\n", topicstr); |
| vpcola | 23:06fac173529e | 302 | |
| vpcola | 23:06fac173529e | 303 | if (!isConnected) |
| vpcola | 23:06fac173529e | 304 | { |
| vpcola | 23:06fac173529e | 305 | printf("Session already connected!!\r\n"); |
| vpcola | 23:06fac173529e | 306 | return rc; |
| vpcola | 23:06fac173529e | 307 | } |
| vpcola | 23:06fac173529e | 308 | |
| vpcola | 23:06fac173529e | 309 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
| vpcola | 23:06fac173529e | 310 | if (len <= 0) |
| vpcola | 23:06fac173529e | 311 | { |
| vpcola | 23:06fac173529e | 312 | printf("Error serializing subscribe packet ...\r\n"); |
| vpcola | 23:06fac173529e | 313 | return rc; |
| vpcola | 23:06fac173529e | 314 | } |
| vpcola | 23:06fac173529e | 315 | |
| vpcola | 23:06fac173529e | 316 | if ((rc = sendPacket(len)) != SUCCESS) |
| vpcola | 23:06fac173529e | 317 | { |
| vpcola | 23:06fac173529e | 318 | printf("Error sending subscribe packet [%d]\r\n", rc); |
| vpcola | 23:06fac173529e | 319 | return rc; |
| vpcola | 23:06fac173529e | 320 | } |
| vpcola | 23:06fac173529e | 321 | |
| vpcola | 23:06fac173529e | 322 | printf("Waiting for subscription ack ...\r\n"); |
| vpcola | 23:06fac173529e | 323 | // Wait for SUBACK, dropping packets read along the way ... |
| vpcola | 23:06fac173529e | 324 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback |
| vpcola | 23:06fac173529e | 325 | { |
| vpcola | 23:06fac173529e | 326 | int count = 0, grantedQoS = -1; |
| vpcola | 23:06fac173529e | 327 | unsigned short mypacketid; |
| vpcola | 23:06fac173529e | 328 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
| vpcola | 23:06fac173529e | 329 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
| vpcola | 23:06fac173529e | 330 | // For as long as we do not get 0x80 .. |
| vpcola | 23:06fac173529e | 331 | if (rc != 0x80) |
| vpcola | 23:06fac173529e | 332 | { |
| vpcola | 23:06fac173529e | 333 | // Add message handlers to the map |
| vpcola | 23:06fac173529e | 334 | FP<void,MessageData &> fp; |
| vpcola | 23:06fac173529e | 335 | fp.attach(function); |
| vpcola | 23:06fac173529e | 336 | |
| vpcola | 23:06fac173529e | 337 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
| vpcola | 23:06fac173529e | 338 | |
| vpcola | 23:06fac173529e | 339 | // Reset connection timers here ... |
| vpcola | 23:06fac173529e | 340 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 341 | |
| vpcola | 23:06fac173529e | 342 | printf("Successfully subscribed to %s ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 343 | rc = SUCCESS; |
| vpcola | 23:06fac173529e | 344 | }else |
| vpcola | 23:06fac173529e | 345 | { |
| vpcola | 23:06fac173529e | 346 | printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr); |
| vpcola | 23:06fac173529e | 347 | } |
| vpcola | 23:06fac173529e | 348 | } |
| vpcola | 23:06fac173529e | 349 | else |
| vpcola | 23:06fac173529e | 350 | { |
| vpcola | 23:06fac173529e | 351 | printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); |
| vpcola | 23:06fac173529e | 352 | rc = FAILURE; |
| vpcola | 23:06fac173529e | 353 | } |
| vpcola | 23:06fac173529e | 354 | |
| vpcola | 23:06fac173529e | 355 | return rc; |
| vpcola | 23:06fac173529e | 356 | |
| vpcola | 23:06fac173529e | 357 | } |
| vpcola | 23:06fac173529e | 358 | |
| vpcola | 23:06fac173529e | 359 | |
| vpcola | 23:06fac173529e | 360 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
| vpcola | 23:06fac173529e | 361 | { |
| vpcola | 23:06fac173529e | 362 | char* curf = topicFilter; |
| vpcola | 23:06fac173529e | 363 | char* curn = topicName.lenstring.data; |
| vpcola | 23:06fac173529e | 364 | char* curn_end = curn + topicName.lenstring.len; |
| vpcola | 23:06fac173529e | 365 | |
| vpcola | 23:06fac173529e | 366 | while (*curf && curn < curn_end) |
| vpcola | 23:06fac173529e | 367 | { |
| vpcola | 23:06fac173529e | 368 | if (*curn == '/' && *curf != '/') |
| vpcola | 23:06fac173529e | 369 | break; |
| vpcola | 23:06fac173529e | 370 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
| vpcola | 23:06fac173529e | 371 | break; |
| vpcola | 23:06fac173529e | 372 | if (*curf == '+') |
| vpcola | 23:06fac173529e | 373 | { // skip until we meet the next separator, or end of string |
| vpcola | 23:06fac173529e | 374 | char* nextpos = curn + 1; |
| vpcola | 23:06fac173529e | 375 | while (nextpos < curn_end && *nextpos != '/') |
| vpcola | 23:06fac173529e | 376 | nextpos = ++curn + 1; |
| vpcola | 23:06fac173529e | 377 | } |
| vpcola | 23:06fac173529e | 378 | else if (*curf == '#') |
| vpcola | 23:06fac173529e | 379 | curn = curn_end - 1; // skip until end of string |
| vpcola | 23:06fac173529e | 380 | curf++; |
| vpcola | 23:06fac173529e | 381 | curn++; |
| vpcola | 23:06fac173529e | 382 | }; |
| vpcola | 23:06fac173529e | 383 | |
| vpcola | 23:06fac173529e | 384 | return (curn == curn_end) && (*curf == '\0'); |
| vpcola | 23:06fac173529e | 385 | } |
| vpcola | 23:06fac173529e | 386 | |
| vpcola | 23:06fac173529e | 387 | int MQTTThreadedClient::handlePublishMsg() |
| vpcola | 23:06fac173529e | 388 | { |
| vpcola | 23:06fac173529e | 389 | MQTTString topicName = MQTTString_initializer; |
| vpcola | 23:06fac173529e | 390 | Message msg; |
| vpcola | 23:06fac173529e | 391 | int intQoS; |
| vpcola | 23:06fac173529e | 392 | printf("Deserializing publish message ...\r\n"); |
| vpcola | 23:06fac173529e | 393 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
| vpcola | 23:06fac173529e | 394 | &intQoS, |
| vpcola | 23:06fac173529e | 395 | (unsigned char*)&msg.retained, |
| vpcola | 23:06fac173529e | 396 | (unsigned short*)&msg.id, |
| vpcola | 23:06fac173529e | 397 | &topicName, |
| vpcola | 23:06fac173529e | 398 | (unsigned char**)&msg.payload, |
| vpcola | 23:06fac173529e | 399 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
| vpcola | 23:06fac173529e | 400 | { |
| vpcola | 23:06fac173529e | 401 | printf("Error deserializing published message ...\r\n"); |
| vpcola | 23:06fac173529e | 402 | return -1; |
| vpcola | 23:06fac173529e | 403 | } |
| vpcola | 23:06fac173529e | 404 | |
| vpcola | 23:06fac173529e | 405 | std::string topic; |
| vpcola | 23:06fac173529e | 406 | if (topicName.lenstring.len > 0) |
| vpcola | 23:06fac173529e | 407 | { |
| vpcola | 23:06fac173529e | 408 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
| vpcola | 23:06fac173529e | 409 | }else |
| vpcola | 23:06fac173529e | 410 | topic = (const char *) topicName.cstring; |
| vpcola | 23:06fac173529e | 411 | |
| vpcola | 23:06fac173529e | 412 | printf("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); |
| vpcola | 23:06fac173529e | 413 | |
| vpcola | 23:06fac173529e | 414 | msg.qos = (QoS) intQoS; |
| vpcola | 23:06fac173529e | 415 | |
| vpcola | 23:06fac173529e | 416 | |
| vpcola | 23:06fac173529e | 417 | // Call the handlers for each topic |
| vpcola | 23:06fac173529e | 418 | if (topicCBMap.find(topic) != topicCBMap.end()) |
| vpcola | 23:06fac173529e | 419 | { |
| vpcola | 23:06fac173529e | 420 | // Call the callback function |
| vpcola | 23:06fac173529e | 421 | if (topicCBMap[topic].attached()) |
| vpcola | 23:06fac173529e | 422 | { |
| vpcola | 23:06fac173529e | 423 | printf("Invoking function handler for topic ...\r\n"); |
| vpcola | 23:06fac173529e | 424 | MessageData md(topicName, msg); |
| vpcola | 23:06fac173529e | 425 | topicCBMap[topic](md); |
| vpcola | 23:06fac173529e | 426 | |
| vpcola | 23:06fac173529e | 427 | return 1; |
| vpcola | 23:06fac173529e | 428 | } |
| vpcola | 23:06fac173529e | 429 | } |
| vpcola | 23:06fac173529e | 430 | |
| vpcola | 23:06fac173529e | 431 | // TODO: depending on the QoS |
| vpcola | 23:06fac173529e | 432 | // we send data to the server = PUBACK or PUBREC |
| vpcola | 23:06fac173529e | 433 | switch(intQoS) |
| vpcola | 23:06fac173529e | 434 | { |
| vpcola | 23:06fac173529e | 435 | case QOS0: |
| vpcola | 23:06fac173529e | 436 | // We send back nothing ... |
| vpcola | 23:06fac173529e | 437 | break; |
| vpcola | 23:06fac173529e | 438 | case QOS1: |
| vpcola | 23:06fac173529e | 439 | // TODO: implement |
| vpcola | 23:06fac173529e | 440 | break; |
| vpcola | 23:06fac173529e | 441 | case QOS2: |
| vpcola | 23:06fac173529e | 442 | // TODO: implement |
| vpcola | 23:06fac173529e | 443 | break; |
| vpcola | 23:06fac173529e | 444 | default: |
| vpcola | 23:06fac173529e | 445 | break; |
| vpcola | 23:06fac173529e | 446 | } |
| vpcola | 23:06fac173529e | 447 | |
| vpcola | 23:06fac173529e | 448 | return 0; |
| vpcola | 23:06fac173529e | 449 | } |
| vpcola | 23:06fac173529e | 450 | |
| vpcola | 23:06fac173529e | 451 | void MQTTThreadedClient::resetConnectionTimer() |
| vpcola | 23:06fac173529e | 452 | { |
| vpcola | 23:06fac173529e | 453 | if (keepAliveInterval > 0) |
| vpcola | 23:06fac173529e | 454 | { |
| vpcola | 23:06fac173529e | 455 | comTimer.reset(); |
| vpcola | 23:06fac173529e | 456 | comTimer.start(); |
| vpcola | 23:06fac173529e | 457 | } |
| vpcola | 23:06fac173529e | 458 | } |
| vpcola | 23:06fac173529e | 459 | |
| vpcola | 23:06fac173529e | 460 | bool MQTTThreadedClient::hasConnectionTimedOut() |
| vpcola | 23:06fac173529e | 461 | { |
| vpcola | 23:06fac173529e | 462 | if (keepAliveInterval > 0 ) { |
| vpcola | 23:06fac173529e | 463 | // Check connection timer |
| vpcola | 23:06fac173529e | 464 | if (comTimer.read_ms() > keepAliveInterval) |
| vpcola | 23:06fac173529e | 465 | return true; |
| vpcola | 23:06fac173529e | 466 | else |
| vpcola | 23:06fac173529e | 467 | return false; |
| vpcola | 23:06fac173529e | 468 | } |
| vpcola | 23:06fac173529e | 469 | |
| vpcola | 23:06fac173529e | 470 | return false; |
| vpcola | 23:06fac173529e | 471 | } |
| vpcola | 23:06fac173529e | 472 | |
| vpcola | 23:06fac173529e | 473 | void MQTTThreadedClient::sendPingRequest() |
| vpcola | 23:06fac173529e | 474 | { |
| vpcola | 23:06fac173529e | 475 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
| vpcola | 23:06fac173529e | 476 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
| vpcola | 23:06fac173529e | 477 | { |
| vpcola | 23:06fac173529e | 478 | printf("Ping request sent successfully ...\r\n"); |
| vpcola | 23:06fac173529e | 479 | } |
| vpcola | 23:06fac173529e | 480 | } |
| vpcola | 23:06fac173529e | 481 | |
| vpcola | 23:06fac173529e | 482 | void MQTTThreadedClient::startListener() |
| vpcola | 23:06fac173529e | 483 | { |
| vpcola | 23:06fac173529e | 484 | int pType; |
| vpcola | 23:06fac173529e | 485 | // Continuesly listens for packets and dispatch |
| vpcola | 23:06fac173529e | 486 | // message handlers ... |
| vpcola | 23:06fac173529e | 487 | while(true) |
| vpcola | 23:06fac173529e | 488 | { |
| vpcola | 23:06fac173529e | 489 | pType = readPacket(); |
| vpcola | 23:06fac173529e | 490 | switch(pType) |
| vpcola | 23:06fac173529e | 491 | { |
| vpcola | 23:06fac173529e | 492 | case TIMEOUT: |
| vpcola | 23:06fac173529e | 493 | // No data available from the network ... |
| vpcola | 23:06fac173529e | 494 | break; |
| vpcola | 23:06fac173529e | 495 | case FAILURE: |
| vpcola | 23:06fac173529e | 496 | case BUFFER_OVERFLOW: |
| vpcola | 23:06fac173529e | 497 | { |
| vpcola | 23:06fac173529e | 498 | // TODO: Network error, do we disconnect and reconnect? |
| vpcola | 23:06fac173529e | 499 | printf("Failure or buffer overflow problem ... \r\n"); |
| vpcola | 23:06fac173529e | 500 | MBED_ASSERT(false); |
| vpcola | 23:06fac173529e | 501 | } |
| vpcola | 23:06fac173529e | 502 | break; |
| vpcola | 23:06fac173529e | 503 | /** |
| vpcola | 23:06fac173529e | 504 | * The rest of the return codes below (all positive) is about MQTT |
| vpcola | 23:06fac173529e | 505 | * response codes |
| vpcola | 23:06fac173529e | 506 | **/ |
| vpcola | 23:06fac173529e | 507 | case CONNACK: |
| vpcola | 23:06fac173529e | 508 | case PUBACK: |
| vpcola | 23:06fac173529e | 509 | case SUBACK: |
| vpcola | 23:06fac173529e | 510 | break; |
| vpcola | 23:06fac173529e | 511 | case PUBLISH: |
| vpcola | 23:06fac173529e | 512 | { |
| vpcola | 23:06fac173529e | 513 | printf("Publish received!....\r\n"); |
| vpcola | 23:06fac173529e | 514 | // We receive data from the MQTT server .. |
| vpcola | 23:06fac173529e | 515 | if (handlePublishMsg() < 0) |
| vpcola | 23:06fac173529e | 516 | { |
| vpcola | 23:06fac173529e | 517 | printf("Error handling PUBLISH message ... \r\n"); |
| vpcola | 23:06fac173529e | 518 | break; |
| vpcola | 23:06fac173529e | 519 | } |
| vpcola | 23:06fac173529e | 520 | } |
| vpcola | 23:06fac173529e | 521 | break; |
| vpcola | 23:06fac173529e | 522 | case PINGRESP: |
| vpcola | 23:06fac173529e | 523 | { |
| vpcola | 23:06fac173529e | 524 | printf("Got ping response ...\r\n"); |
| vpcola | 23:06fac173529e | 525 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 526 | } |
| vpcola | 23:06fac173529e | 527 | break; |
| vpcola | 23:06fac173529e | 528 | default: |
| vpcola | 23:06fac173529e | 529 | printf("Unknown/Not handled message from server pType[%d]\r\n", pType); |
| vpcola | 23:06fac173529e | 530 | } |
| vpcola | 23:06fac173529e | 531 | |
| vpcola | 23:06fac173529e | 532 | // Check if its time to send a keepAlive packet |
| vpcola | 23:06fac173529e | 533 | if (hasConnectionTimedOut()) |
| vpcola | 23:06fac173529e | 534 | { |
| vpcola | 23:06fac173529e | 535 | // Queue the ping request so that other |
| vpcola | 23:06fac173529e | 536 | // pending operations queued above will go first |
| vpcola | 23:06fac173529e | 537 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
| vpcola | 23:06fac173529e | 538 | } |
| vpcola | 23:06fac173529e | 539 | |
| vpcola | 23:06fac173529e | 540 | // Check if we have messages on the message queue |
| vpcola | 23:06fac173529e | 541 | osEvent evt = mqueue.get(10); |
| vpcola | 23:06fac173529e | 542 | if (evt.status == osEventMessage) { |
| vpcola | 23:06fac173529e | 543 | |
| vpcola | 23:06fac173529e | 544 | printf("Got message to publish! ... \r\n"); |
| vpcola | 23:06fac173529e | 545 | |
| vpcola | 23:06fac173529e | 546 | // Unpack the message |
| vpcola | 23:06fac173529e | 547 | PubMessage * message = (PubMessage *)evt.value.p; |
| vpcola | 23:06fac173529e | 548 | |
| vpcola | 23:06fac173529e | 549 | // Send the packet, do not queue the call |
| vpcola | 23:06fac173529e | 550 | // like the ping above .. |
| vpcola | 23:06fac173529e | 551 | if ( sendPublish(*message) == SUCCESS) |
| vpcola | 23:06fac173529e | 552 | // Reset timers if we have been able to send successfully |
| vpcola | 23:06fac173529e | 553 | resetConnectionTimer(); |
| vpcola | 23:06fac173529e | 554 | |
| vpcola | 23:06fac173529e | 555 | // Free the message from mempool after using |
| vpcola | 23:06fac173529e | 556 | mpool.free(message); |
| vpcola | 23:06fac173529e | 557 | } |
| vpcola | 23:06fac173529e | 558 | |
| vpcola | 23:06fac173529e | 559 | // Dispatch any queued events ... |
| vpcola | 23:06fac173529e | 560 | queue.dispatch(100); |
| vpcola | 23:06fac173529e | 561 | } |
| vpcola | 23:06fac173529e | 562 | |
| vpcola | 23:06fac173529e | 563 | } |
| vpcola | 23:06fac173529e | 564 |
