A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.

Dependencies:   FP MQTTPacket

Fork of HelloMQTT by MQTT

Committer:
vpcola
Date:
Tue Mar 28 09:18:49 2017 +0000
Revision:
30:b2aed80037db
Parent:
27:c90092f35d79
Child:
33:38e2e7bf91eb
Added Nucleo L476 to mbed_app.json

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 23:06fac173529e 1 #include "mbed.h"
vpcola 23:06fac173529e 2 #include "rtos.h"
vpcola 23:06fac173529e 3 #include "MQTTThreadedClient.h"
vpcola 25:326f00faa092 4 #include "mbedtls/platform.h"
vpcola 25:326f00faa092 5 #include "mbedtls/ssl.h"
vpcola 25:326f00faa092 6 #include "mbedtls/entropy.h"
vpcola 25:326f00faa092 7 #include "mbedtls/ctr_drbg.h"
vpcola 25:326f00faa092 8 #include "mbedtls/error.h"
vpcola 23:06fac173529e 9
vpcola 30:b2aed80037db 10 static MemoryPool<MQTT::PubMessage, 8> mpool;
vpcola 30:b2aed80037db 11 static Queue<MQTT::PubMessage, 8> mqueue;
vpcola 23:06fac173529e 12
vpcola 25:326f00faa092 13 // SSL/TLS variables
vpcola 25:326f00faa092 14 mbedtls_entropy_context _entropy;
vpcola 25:326f00faa092 15 mbedtls_ctr_drbg_context _ctr_drbg;
vpcola 25:326f00faa092 16 mbedtls_x509_crt _cacert;
vpcola 25:326f00faa092 17 mbedtls_ssl_context _ssl;
vpcola 25:326f00faa092 18 mbedtls_ssl_config _ssl_conf;
vpcola 25:326f00faa092 19 mbedtls_ssl_session saved_session;
vpcola 25:326f00faa092 20
vpcola 30:b2aed80037db 21 namespace MQTT {
vpcola 30:b2aed80037db 22
vpcola 25:326f00faa092 23 /**
vpcola 25:326f00faa092 24 * Receive callback for mbed TLS
vpcola 25:326f00faa092 25 */
vpcola 25:326f00faa092 26 static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
vpcola 25:326f00faa092 27 {
vpcola 25:326f00faa092 28 int recv = -1;
vpcola 25:326f00faa092 29 TCPSocket *socket = static_cast<TCPSocket *>(ctx);
vpcola 25:326f00faa092 30 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
vpcola 25:326f00faa092 31 recv = socket->recv(buf, len);
vpcola 25:326f00faa092 32
vpcola 25:326f00faa092 33 if (NSAPI_ERROR_WOULD_BLOCK == recv) {
vpcola 25:326f00faa092 34 return MBEDTLS_ERR_SSL_WANT_READ;
vpcola 25:326f00faa092 35 } else if (recv < 0) {
vpcola 25:326f00faa092 36 return -1;
vpcola 25:326f00faa092 37 } else {
vpcola 25:326f00faa092 38 return recv;
vpcola 25:326f00faa092 39 }
vpcola 25:326f00faa092 40 }
vpcola 25:326f00faa092 41
vpcola 25:326f00faa092 42 /**
vpcola 25:326f00faa092 43 * Send callback for mbed TLS
vpcola 25:326f00faa092 44 */
vpcola 25:326f00faa092 45 static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
vpcola 25:326f00faa092 46 {
vpcola 25:326f00faa092 47 int sent = -1;
vpcola 25:326f00faa092 48 TCPSocket *socket = static_cast<TCPSocket *>(ctx);
vpcola 25:326f00faa092 49 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
vpcola 25:326f00faa092 50 sent = socket->send(buf, len);
vpcola 25:326f00faa092 51
vpcola 25:326f00faa092 52 if(NSAPI_ERROR_WOULD_BLOCK == sent) {
vpcola 25:326f00faa092 53 return MBEDTLS_ERR_SSL_WANT_WRITE;
vpcola 25:326f00faa092 54 } else if (sent < 0) {
vpcola 25:326f00faa092 55 return -1;
vpcola 25:326f00faa092 56 } else {
vpcola 25:326f00faa092 57 return sent;
vpcola 25:326f00faa092 58 }
vpcola 25:326f00faa092 59 }
vpcola 25:326f00faa092 60
vpcola 25:326f00faa092 61 #if DEBUG_LEVEL > 0
vpcola 25:326f00faa092 62 /**
vpcola 25:326f00faa092 63 * Debug callback for mbed TLS
vpcola 25:326f00faa092 64 * Just prints on the USB serial port
vpcola 25:326f00faa092 65 */
vpcola 25:326f00faa092 66 static void my_debug(void *ctx, int level, const char *file, int line,
vpcola 25:326f00faa092 67 const char *str)
vpcola 25:326f00faa092 68 {
vpcola 25:326f00faa092 69 const char *p, *basename;
vpcola 25:326f00faa092 70 (void) ctx;
vpcola 25:326f00faa092 71
vpcola 25:326f00faa092 72 /* Extract basename from file */
vpcola 25:326f00faa092 73 for(p = basename = file; *p != '\0'; p++) {
vpcola 25:326f00faa092 74 if(*p == '/' || *p == '\\') {
vpcola 25:326f00faa092 75 basename = p + 1;
vpcola 25:326f00faa092 76 }
vpcola 25:326f00faa092 77 }
vpcola 25:326f00faa092 78
vpcola 25:326f00faa092 79 if (_debug) {
vpcola 25:326f00faa092 80 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
vpcola 25:326f00faa092 81 }
vpcola 25:326f00faa092 82 }
vpcola 25:326f00faa092 83
vpcola 25:326f00faa092 84 /**
vpcola 25:326f00faa092 85 * Certificate verification callback for mbed TLS
vpcola 25:326f00faa092 86 * Here we only use it to display information on each cert in the chain
vpcola 25:326f00faa092 87 */
vpcola 25:326f00faa092 88 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
vpcola 25:326f00faa092 89 {
vpcola 25:326f00faa092 90 const uint32_t buf_size = 1024;
vpcola 25:326f00faa092 91 char *buf = new char[buf_size];
vpcola 25:326f00faa092 92 (void) data;
vpcola 25:326f00faa092 93
vpcola 26:4b21de8043a5 94 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
vpcola 25:326f00faa092 95 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt);
vpcola 25:326f00faa092 96 if (_debug) mbedtls_printf("%s", buf);
vpcola 25:326f00faa092 97
vpcola 25:326f00faa092 98 if (*flags == 0)
vpcola 26:4b21de8043a5 99 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
vpcola 25:326f00faa092 100 else {
vpcola 25:326f00faa092 101 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags);
vpcola 25:326f00faa092 102 if (_debug) mbedtls_printf("%s\n", buf);
vpcola 25:326f00faa092 103 }
vpcola 25:326f00faa092 104
vpcola 25:326f00faa092 105 delete[] buf;
vpcola 25:326f00faa092 106 return 0;
vpcola 25:326f00faa092 107 }
vpcola 25:326f00faa092 108 #endif
vpcola 25:326f00faa092 109
vpcola 25:326f00faa092 110
vpcola 25:326f00faa092 111 void MQTTThreadedClient::setupTLS()
vpcola 25:326f00faa092 112 {
vpcola 26:4b21de8043a5 113 if (useTLS)
vpcola 25:326f00faa092 114 {
vpcola 25:326f00faa092 115 mbedtls_entropy_init(&_entropy);
vpcola 25:326f00faa092 116 mbedtls_ctr_drbg_init(&_ctr_drbg);
vpcola 25:326f00faa092 117 mbedtls_x509_crt_init(&_cacert);
vpcola 25:326f00faa092 118 mbedtls_ssl_init(&_ssl);
vpcola 25:326f00faa092 119 mbedtls_ssl_config_init(&_ssl_conf);
vpcola 25:326f00faa092 120 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );
vpcola 25:326f00faa092 121 }
vpcola 25:326f00faa092 122 }
vpcola 25:326f00faa092 123
vpcola 25:326f00faa092 124 void MQTTThreadedClient::freeTLS()
vpcola 25:326f00faa092 125 {
vpcola 26:4b21de8043a5 126 if (useTLS)
vpcola 25:326f00faa092 127 {
vpcola 25:326f00faa092 128 mbedtls_entropy_free(&_entropy);
vpcola 25:326f00faa092 129 mbedtls_ctr_drbg_free(&_ctr_drbg);
vpcola 25:326f00faa092 130 mbedtls_x509_crt_free(&_cacert);
vpcola 25:326f00faa092 131 mbedtls_ssl_free(&_ssl);
vpcola 25:326f00faa092 132 mbedtls_ssl_config_free(&_ssl_conf);
vpcola 25:326f00faa092 133 }
vpcola 25:326f00faa092 134 }
vpcola 25:326f00faa092 135
vpcola 25:326f00faa092 136 int MQTTThreadedClient::initTLS()
vpcola 25:326f00faa092 137 {
vpcola 25:326f00faa092 138 int ret;
vpcola 25:326f00faa092 139
vpcola 30:b2aed80037db 140 DBG("Initializing TLS ...\r\n");
vpcola 30:b2aed80037db 141 DBG("mbedtls_ctr_drdbg_seed ...\r\n");
vpcola 25:326f00faa092 142 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
vpcola 25:326f00faa092 143 (const unsigned char *) DRBG_PERS,
vpcola 25:326f00faa092 144 sizeof (DRBG_PERS))) != 0) {
vpcola 26:4b21de8043a5 145 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
vpcola 25:326f00faa092 146 _error = ret;
vpcola 25:326f00faa092 147 return -1;
vpcola 25:326f00faa092 148 }
vpcola 30:b2aed80037db 149 DBG("mbedtls_x509_crt_parse ...\r\n");
vpcola 25:326f00faa092 150 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
vpcola 25:326f00faa092 151 strlen(ssl_ca_pem) + 1)) != 0) {
vpcola 26:4b21de8043a5 152 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
vpcola 25:326f00faa092 153 _error = ret;
vpcola 25:326f00faa092 154 return -1;
vpcola 25:326f00faa092 155 }
vpcola 25:326f00faa092 156
vpcola 30:b2aed80037db 157 DBG("mbedtls_ssl_config_defaults ...\r\n");
vpcola 25:326f00faa092 158 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
vpcola 25:326f00faa092 159 MBEDTLS_SSL_IS_CLIENT,
vpcola 25:326f00faa092 160 MBEDTLS_SSL_TRANSPORT_STREAM,
vpcola 25:326f00faa092 161 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
vpcola 26:4b21de8043a5 162 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
vpcola 25:326f00faa092 163 _error = ret;
vpcola 25:326f00faa092 164 return -1;
vpcola 25:326f00faa092 165 }
vpcola 25:326f00faa092 166
vpcola 30:b2aed80037db 167 DBG("mbedtls_ssl_config_ca_chain ...\r\n");
vpcola 25:326f00faa092 168 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
vpcola 30:b2aed80037db 169 DBG("mbedtls_ssl_conf_rng ...\r\n");
vpcola 25:326f00faa092 170 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
vpcola 25:326f00faa092 171
vpcola 25:326f00faa092 172 /* It is possible to disable authentication by passing
vpcola 25:326f00faa092 173 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
vpcola 25:326f00faa092 174 */
vpcola 30:b2aed80037db 175 DBG("mbedtls_ssl_conf_authmode ...\r\n");
vpcola 25:326f00faa092 176 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
vpcola 25:326f00faa092 177
vpcola 25:326f00faa092 178 #if DEBUG_LEVEL > 0
vpcola 25:326f00faa092 179 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
vpcola 25:326f00faa092 180 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
vpcola 25:326f00faa092 181 mbedtls_debug_set_threshold(DEBUG_LEVEL);
vpcola 25:326f00faa092 182 #endif
vpcola 25:326f00faa092 183
vpcola 30:b2aed80037db 184 DBG("mbedtls_ssl_setup ...\r\n");
vpcola 25:326f00faa092 185 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
vpcola 26:4b21de8043a5 186 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
vpcola 25:326f00faa092 187 _error = ret;
vpcola 25:326f00faa092 188 return -1;
vpcola 25:326f00faa092 189 }
vpcola 25:326f00faa092 190
vpcola 25:326f00faa092 191 return 0;
vpcola 25:326f00faa092 192 }
vpcola 25:326f00faa092 193
vpcola 25:326f00faa092 194 int MQTTThreadedClient::doTLSHandshake()
vpcola 25:326f00faa092 195 {
vpcola 25:326f00faa092 196 int ret;
vpcola 25:326f00faa092 197
vpcola 25:326f00faa092 198 /* Start the handshake, the rest will be done in onReceive() */
vpcola 30:b2aed80037db 199 DBG("Starting the TLS handshake...\r\n");
vpcola 25:326f00faa092 200 ret = mbedtls_ssl_handshake(&_ssl);
vpcola 25:326f00faa092 201 if (ret < 0)
vpcola 25:326f00faa092 202 {
vpcola 25:326f00faa092 203 if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
vpcola 25:326f00faa092 204 ret != MBEDTLS_ERR_SSL_WANT_WRITE)
vpcola 26:4b21de8043a5 205 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
vpcola 25:326f00faa092 206 else
vpcola 25:326f00faa092 207 {
vpcola 25:326f00faa092 208 // do not close the socket if timed out
vpcola 26:4b21de8043a5 209 ret = TIMEOUT;
vpcola 25:326f00faa092 210 }
vpcola 26:4b21de8043a5 211 return ret;
vpcola 25:326f00faa092 212 }
vpcola 25:326f00faa092 213
vpcola 25:326f00faa092 214 /* Handshake done, time to print info */
vpcola 30:b2aed80037db 215 DBG("TLS connection to %s:%d established\r\n",
vpcola 25:326f00faa092 216 host.c_str(), port);
vpcola 25:326f00faa092 217
vpcola 25:326f00faa092 218 const uint32_t buf_size = 1024;
vpcola 25:326f00faa092 219 char *buf = new char[buf_size];
vpcola 25:326f00faa092 220 mbedtls_x509_crt_info(buf, buf_size, "\r ",
vpcola 25:326f00faa092 221 mbedtls_ssl_get_peer_cert(&_ssl));
vpcola 25:326f00faa092 222
vpcola 30:b2aed80037db 223 DBG("Server certificate:\r\n%s\r", buf);
vpcola 25:326f00faa092 224 // Verify server cert ...
vpcola 25:326f00faa092 225 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
vpcola 25:326f00faa092 226 if( flags != 0 )
vpcola 25:326f00faa092 227 {
vpcola 25:326f00faa092 228 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
vpcola 30:b2aed80037db 229 DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
vpcola 25:326f00faa092 230 // free server cert ... before error return
vpcola 25:326f00faa092 231 delete [] buf;
vpcola 25:326f00faa092 232 return -1;
vpcola 25:326f00faa092 233 }
vpcola 25:326f00faa092 234
vpcola 30:b2aed80037db 235 DBG("Certificate verification passed\r\n\r\n");
vpcola 25:326f00faa092 236 // delete server cert after verification
vpcola 25:326f00faa092 237 delete [] buf;
vpcola 25:326f00faa092 238
vpcola 30:b2aed80037db 239 #if defined(MBEDTLS_SSL_CLI_C)
vpcola 25:326f00faa092 240 // TODO: Save the session here for reconnect.
vpcola 25:326f00faa092 241 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
vpcola 25:326f00faa092 242 {
vpcola 26:4b21de8043a5 243 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
vpcola 26:4b21de8043a5 244 hasSavedSession = false;
vpcola 25:326f00faa092 245 return -1;
vpcola 25:326f00faa092 246 }
vpcola 30:b2aed80037db 247 #endif
vpcola 30:b2aed80037db 248 DBG("Session saved for reconnect ...\r\n");
vpcola 26:4b21de8043a5 249 hasSavedSession = true;
vpcola 26:4b21de8043a5 250
vpcola 25:326f00faa092 251 return 0;
vpcola 25:326f00faa092 252 }
vpcola 25:326f00faa092 253
vpcola 23:06fac173529e 254 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
vpcola 23:06fac173529e 255 {
vpcola 23:06fac173529e 256 int rc;
vpcola 25:326f00faa092 257
vpcola 23:06fac173529e 258 if (tcpSocket == NULL)
vpcola 23:06fac173529e 259 return -1;
vpcola 25:326f00faa092 260
vpcola 26:4b21de8043a5 261 if (useTLS)
vpcola 25:326f00faa092 262 {
vpcola 25:326f00faa092 263 // Do SSL/TLS read
vpcola 25:326f00faa092 264 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
vpcola 25:326f00faa092 265 if (MBEDTLS_ERR_SSL_WANT_READ == rc)
vpcola 25:326f00faa092 266 return TIMEOUT;
vpcola 25:326f00faa092 267 else
vpcola 25:326f00faa092 268 return rc;
vpcola 25:326f00faa092 269 } else {
vpcola 25:326f00faa092 270 // non-blocking socket ...
vpcola 25:326f00faa092 271 tcpSocket->set_timeout(timeout);
vpcola 25:326f00faa092 272 rc = tcpSocket->recv( (void *) buffer, size);
vpcola 25:326f00faa092 273
vpcola 25:326f00faa092 274 // return 0 bytes if timeout ...
vpcola 25:326f00faa092 275 if (NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 25:326f00faa092 276 return TIMEOUT;
vpcola 25:326f00faa092 277 else
vpcola 25:326f00faa092 278 return rc; // return the number of bytes received or error
vpcola 25:326f00faa092 279 }
vpcola 23:06fac173529e 280 }
vpcola 23:06fac173529e 281
vpcola 23:06fac173529e 282 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
vpcola 23:06fac173529e 283 {
vpcola 23:06fac173529e 284 int rc;
vpcola 23:06fac173529e 285
vpcola 23:06fac173529e 286 if (tcpSocket == NULL)
vpcola 23:06fac173529e 287 return -1;
vpcola 23:06fac173529e 288
vpcola 26:4b21de8043a5 289 if (useTLS) {
vpcola 25:326f00faa092 290 // Do SSL/TLS write
vpcola 25:326f00faa092 291 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
vpcola 25:326f00faa092 292 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
vpcola 25:326f00faa092 293 return TIMEOUT;
vpcola 25:326f00faa092 294 else
vpcola 25:326f00faa092 295 return rc;
vpcola 25:326f00faa092 296 } else {
vpcola 25:326f00faa092 297
vpcola 25:326f00faa092 298 // set the write timeout
vpcola 25:326f00faa092 299 tcpSocket->set_timeout(timeout);
vpcola 25:326f00faa092 300 rc = tcpSocket->send(buffer, size);
vpcola 25:326f00faa092 301
vpcola 25:326f00faa092 302 if ( NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 25:326f00faa092 303 return TIMEOUT;
vpcola 25:326f00faa092 304 else
vpcola 25:326f00faa092 305 return rc;
vpcola 25:326f00faa092 306 }
vpcola 23:06fac173529e 307 }
vpcola 23:06fac173529e 308
vpcola 23:06fac173529e 309 int MQTTThreadedClient::readPacketLength(int* value)
vpcola 23:06fac173529e 310 {
vpcola 23:06fac173529e 311 int rc = MQTTPACKET_READ_ERROR;
vpcola 23:06fac173529e 312 unsigned char c;
vpcola 23:06fac173529e 313 int multiplier = 1;
vpcola 23:06fac173529e 314 int len = 0;
vpcola 23:06fac173529e 315 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
vpcola 23:06fac173529e 316
vpcola 23:06fac173529e 317 *value = 0;
vpcola 23:06fac173529e 318 do
vpcola 23:06fac173529e 319 {
vpcola 23:06fac173529e 320 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
vpcola 23:06fac173529e 321 {
vpcola 23:06fac173529e 322 rc = MQTTPACKET_READ_ERROR; /* bad data */
vpcola 23:06fac173529e 323 goto exit;
vpcola 23:06fac173529e 324 }
vpcola 23:06fac173529e 325
vpcola 23:06fac173529e 326 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
vpcola 23:06fac173529e 327 if (rc != 1)
vpcola 23:06fac173529e 328 {
vpcola 23:06fac173529e 329 rc = MQTTPACKET_READ_ERROR;
vpcola 23:06fac173529e 330 goto exit;
vpcola 23:06fac173529e 331 }
vpcola 23:06fac173529e 332
vpcola 23:06fac173529e 333 *value += (c & 127) * multiplier;
vpcola 23:06fac173529e 334 multiplier *= 128;
vpcola 23:06fac173529e 335 } while ((c & 128) != 0);
vpcola 23:06fac173529e 336
vpcola 23:06fac173529e 337 rc = MQTTPACKET_READ_COMPLETE;
vpcola 23:06fac173529e 338
vpcola 23:06fac173529e 339 exit:
vpcola 23:06fac173529e 340 if (rc == MQTTPACKET_READ_ERROR )
vpcola 23:06fac173529e 341 len = -1;
vpcola 23:06fac173529e 342
vpcola 23:06fac173529e 343 return len;
vpcola 23:06fac173529e 344 }
vpcola 23:06fac173529e 345
vpcola 23:06fac173529e 346 int MQTTThreadedClient::sendPacket(size_t length)
vpcola 23:06fac173529e 347 {
vpcola 23:06fac173529e 348 int rc = FAILURE;
vpcola 23:06fac173529e 349 int sent = 0;
vpcola 23:06fac173529e 350
vpcola 23:06fac173529e 351 while (sent < length)
vpcola 23:06fac173529e 352 {
vpcola 23:06fac173529e 353 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
vpcola 23:06fac173529e 354 if (rc < 0) // there was an error writing the data
vpcola 23:06fac173529e 355 break;
vpcola 23:06fac173529e 356 sent += rc;
vpcola 23:06fac173529e 357 }
vpcola 23:06fac173529e 358
vpcola 23:06fac173529e 359 if (sent == length)
vpcola 23:06fac173529e 360 rc = SUCCESS;
vpcola 23:06fac173529e 361 else
vpcola 23:06fac173529e 362 rc = FAILURE;
vpcola 23:06fac173529e 363
vpcola 23:06fac173529e 364 return rc;
vpcola 23:06fac173529e 365 }
vpcola 23:06fac173529e 366 /**
vpcola 23:06fac173529e 367 * Reads the entire packet to readbuf and returns
vpcola 23:06fac173529e 368 * the type of packet when successful, otherwise
vpcola 23:06fac173529e 369 * a negative error code is returned.
vpcola 23:06fac173529e 370 **/
vpcola 23:06fac173529e 371 int MQTTThreadedClient::readPacket()
vpcola 23:06fac173529e 372 {
vpcola 23:06fac173529e 373 int rc = FAILURE;
vpcola 23:06fac173529e 374 MQTTHeader header = {0};
vpcola 23:06fac173529e 375 int len = 0;
vpcola 23:06fac173529e 376 int rem_len = 0;
vpcola 23:06fac173529e 377
vpcola 23:06fac173529e 378 /* 1. read the header byte. This has the packet type in it */
vpcola 23:06fac173529e 379 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
vpcola 23:06fac173529e 380 goto exit;
vpcola 23:06fac173529e 381
vpcola 23:06fac173529e 382 len = 1;
vpcola 23:06fac173529e 383 /* 2. read the remaining length. This is variable in itself */
vpcola 23:06fac173529e 384 if ( readPacketLength(&rem_len) < 0 )
vpcola 23:06fac173529e 385 goto exit;
vpcola 23:06fac173529e 386
vpcola 23:06fac173529e 387 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
vpcola 23:06fac173529e 388
vpcola 23:06fac173529e 389 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
vpcola 23:06fac173529e 390 {
vpcola 23:06fac173529e 391 rc = BUFFER_OVERFLOW;
vpcola 23:06fac173529e 392 goto exit;
vpcola 23:06fac173529e 393 }
vpcola 23:06fac173529e 394
vpcola 23:06fac173529e 395 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
vpcola 23:06fac173529e 396 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
vpcola 23:06fac173529e 397 goto exit;
vpcola 23:06fac173529e 398
vpcola 23:06fac173529e 399 // Convert the header to type
vpcola 23:06fac173529e 400 // and update rc
vpcola 23:06fac173529e 401 header.byte = readbuf[0];
vpcola 23:06fac173529e 402 rc = header.bits.type;
vpcola 23:06fac173529e 403
vpcola 23:06fac173529e 404 exit:
vpcola 23:06fac173529e 405
vpcola 23:06fac173529e 406 return rc;
vpcola 23:06fac173529e 407 }
vpcola 23:06fac173529e 408
vpcola 23:06fac173529e 409 /**
vpcola 23:06fac173529e 410 * Read until a specified packet type is received, or untill the specified
vpcola 23:06fac173529e 411 * timeout dropping packets along the way.
vpcola 23:06fac173529e 412 **/
vpcola 23:06fac173529e 413 int MQTTThreadedClient::readUntil(int packetType, int timeout)
vpcola 23:06fac173529e 414 {
vpcola 23:06fac173529e 415 int pType = FAILURE;
vpcola 23:06fac173529e 416 Timer timer;
vpcola 23:06fac173529e 417
vpcola 23:06fac173529e 418 timer.start();
vpcola 23:06fac173529e 419 do {
vpcola 23:06fac173529e 420 pType = readPacket();
vpcola 23:06fac173529e 421 if (pType < 0)
vpcola 23:06fac173529e 422 break;
vpcola 23:06fac173529e 423
vpcola 23:06fac173529e 424 if (timer.read_ms() > timeout)
vpcola 23:06fac173529e 425 {
vpcola 23:06fac173529e 426 pType = FAILURE;
vpcola 23:06fac173529e 427 break;
vpcola 23:06fac173529e 428 }
vpcola 23:06fac173529e 429 }while(pType != packetType);
vpcola 23:06fac173529e 430
vpcola 23:06fac173529e 431 return pType;
vpcola 23:06fac173529e 432 }
vpcola 23:06fac173529e 433
vpcola 23:06fac173529e 434
vpcola 26:4b21de8043a5 435 int MQTTThreadedClient::login()
vpcola 23:06fac173529e 436 {
vpcola 23:06fac173529e 437 int rc = FAILURE;
vpcola 23:06fac173529e 438 int len = 0;
vpcola 23:06fac173529e 439
vpcola 26:4b21de8043a5 440 if (!isConnected)
vpcola 23:06fac173529e 441 {
vpcola 30:b2aed80037db 442 DBG("Session not connected! \r\n");
vpcola 23:06fac173529e 443 return rc;
vpcola 23:06fac173529e 444 }
vpcola 23:06fac173529e 445
vpcola 23:06fac173529e 446 // Copy the keepAliveInterval value to local
vpcola 23:06fac173529e 447 // MQTT specifies in seconds, we have to multiply that
vpcola 23:06fac173529e 448 // amount for our 32 bit timers which accepts ms.
vpcola 26:4b21de8043a5 449 keepAliveInterval = (connect_options.keepAliveInterval * 1000);
vpcola 23:06fac173529e 450
vpcola 30:b2aed80037db 451 DBG("Login with: \r\n");
vpcola 30:b2aed80037db 452 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
vpcola 30:b2aed80037db 453 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
vpcola 23:06fac173529e 454
vpcola 26:4b21de8043a5 455 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
vpcola 23:06fac173529e 456 {
vpcola 30:b2aed80037db 457 DBG("Error serializing connect packet ...\r\n");
vpcola 23:06fac173529e 458 return rc;
vpcola 23:06fac173529e 459 }
vpcola 23:06fac173529e 460 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
vpcola 23:06fac173529e 461 {
vpcola 30:b2aed80037db 462 DBG("Error sending the connect request packet ...\r\n");
vpcola 23:06fac173529e 463 return rc;
vpcola 23:06fac173529e 464 }
vpcola 23:06fac173529e 465
vpcola 23:06fac173529e 466 // Wait for the CONNACK
vpcola 23:06fac173529e 467 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
vpcola 23:06fac173529e 468 {
vpcola 23:06fac173529e 469 unsigned char connack_rc = 255;
vpcola 23:06fac173529e 470 bool sessionPresent = false;
vpcola 30:b2aed80037db 471 DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
vpcola 23:06fac173529e 472 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 23:06fac173529e 473 rc = connack_rc;
vpcola 23:06fac173529e 474 else
vpcola 23:06fac173529e 475 rc = FAILURE;
vpcola 23:06fac173529e 476 }
vpcola 23:06fac173529e 477 else
vpcola 23:06fac173529e 478 rc = FAILURE;
vpcola 23:06fac173529e 479
vpcola 23:06fac173529e 480 if (rc == SUCCESS)
vpcola 23:06fac173529e 481 {
vpcola 30:b2aed80037db 482 DBG("Connected!!! ... starting connection timers ...\r\n");
vpcola 23:06fac173529e 483 resetConnectionTimer();
vpcola 23:06fac173529e 484 }
vpcola 23:06fac173529e 485
vpcola 30:b2aed80037db 486 DBG("Returning with rc = %d\r\n", rc);
vpcola 23:06fac173529e 487
vpcola 23:06fac173529e 488 return rc;
vpcola 23:06fac173529e 489 }
vpcola 23:06fac173529e 490
vpcola 25:326f00faa092 491
vpcola 25:326f00faa092 492 void MQTTThreadedClient::disconnect()
vpcola 25:326f00faa092 493 {
vpcola 25:326f00faa092 494 if (isConnected)
vpcola 25:326f00faa092 495 {
vpcola 26:4b21de8043a5 496 if( useTLS
vpcola 26:4b21de8043a5 497 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
vpcola 26:4b21de8043a5 498 )
vpcola 26:4b21de8043a5 499 {
vpcola 30:b2aed80037db 500 DBG( "Session reset returned an error \r\n");
vpcola 26:4b21de8043a5 501 }
vpcola 25:326f00faa092 502
vpcola 25:326f00faa092 503 isConnected = false;
vpcola 25:326f00faa092 504 tcpSocket->close();
vpcola 25:326f00faa092 505 }
vpcola 25:326f00faa092 506 }
vpcola 25:326f00faa092 507
vpcola 26:4b21de8043a5 508 int MQTTThreadedClient::connect()
vpcola 23:06fac173529e 509 {
vpcola 25:326f00faa092 510 int ret = FAILURE;
vpcola 26:4b21de8043a5 511
vpcola 26:4b21de8043a5 512 if ((network == NULL) || (tcpSocket == NULL)
vpcola 26:4b21de8043a5 513 || host.empty())
vpcola 26:4b21de8043a5 514 {
vpcola 30:b2aed80037db 515 DBG("Network settings not set! \r\n");
vpcola 26:4b21de8043a5 516 return ret;
vpcola 26:4b21de8043a5 517 }
vpcola 25:326f00faa092 518
vpcola 26:4b21de8043a5 519 if (useTLS)
vpcola 26:4b21de8043a5 520 {
vpcola 26:4b21de8043a5 521 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
vpcola 26:4b21de8043a5 522 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
vpcola 26:4b21de8043a5 523 return ret;
vpcola 26:4b21de8043a5 524 }
vpcola 30:b2aed80037db 525 #if defined(MBEDTLS_SSL_CLI_C)
vpcola 26:4b21de8043a5 526 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
vpcola 26:4b21de8043a5 527 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
vpcola 26:4b21de8043a5 528 return ret;
vpcola 26:4b21de8043a5 529 }
vpcola 30:b2aed80037db 530 #endif
vpcola 26:4b21de8043a5 531 }
vpcola 26:4b21de8043a5 532
vpcola 23:06fac173529e 533 tcpSocket->open(network);
vpcola 26:4b21de8043a5 534 if (useTLS)
vpcola 25:326f00faa092 535 {
vpcola 30:b2aed80037db 536 DBG("mbedtls_ssl_set_hostname ...\r\n");
vpcola 25:326f00faa092 537 mbedtls_ssl_set_hostname(&_ssl, host.c_str());
vpcola 30:b2aed80037db 538 DBG("mbedtls_ssl_set_bio ...\r\n");
vpcola 25:326f00faa092 539 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
vpcola 25:326f00faa092 540 ssl_send, ssl_recv, NULL );
vpcola 25:326f00faa092 541 }
vpcola 25:326f00faa092 542
vpcola 25:326f00faa092 543 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
vpcola 23:06fac173529e 544 {
vpcola 30:b2aed80037db 545 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
vpcola 23:06fac173529e 546 return ret;
vpcola 26:4b21de8043a5 547 }else
vpcola 26:4b21de8043a5 548 isConnected = true;
vpcola 25:326f00faa092 549
vpcola 26:4b21de8043a5 550 if (useTLS)
vpcola 25:326f00faa092 551 {
vpcola 26:4b21de8043a5 552
vpcola 26:4b21de8043a5 553 if (doTLSHandshake() < 0)
vpcola 26:4b21de8043a5 554 {
vpcola 30:b2aed80037db 555 DBG("TLS Handshake failed! \r\n");
vpcola 26:4b21de8043a5 556 return FAILURE;
vpcola 26:4b21de8043a5 557 }else
vpcola 30:b2aed80037db 558 DBG("TLS Handshake complete!! \r\n");
vpcola 25:326f00faa092 559 }
vpcola 25:326f00faa092 560
vpcola 26:4b21de8043a5 561 return login();
vpcola 26:4b21de8043a5 562 }
vpcola 26:4b21de8043a5 563
vpcola 26:4b21de8043a5 564 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
vpcola 26:4b21de8043a5 565 {
vpcola 26:4b21de8043a5 566 // Copy the settings for reconnection
vpcola 26:4b21de8043a5 567 host = chost;
vpcola 26:4b21de8043a5 568 port = cport;
vpcola 26:4b21de8043a5 569 connect_options = options;
vpcola 23:06fac173529e 570 }
vpcola 23:06fac173529e 571
vpcola 23:06fac173529e 572 int MQTTThreadedClient::publish(PubMessage& msg)
vpcola 23:06fac173529e 573 {
vpcola 23:06fac173529e 574 #if 0
vpcola 23:06fac173529e 575 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
vpcola 23:06fac173529e 576 // TODO: handle id values when the function is called later
vpcola 23:06fac173529e 577 if (id == 0)
vpcola 23:06fac173529e 578 return FAILURE;
vpcola 23:06fac173529e 579 else
vpcola 23:06fac173529e 580 return SUCCESS;
vpcola 23:06fac173529e 581 #endif
vpcola 23:06fac173529e 582 PubMessage *message = mpool.alloc();
vpcola 23:06fac173529e 583 // Simple copy
vpcola 23:06fac173529e 584 *message = msg;
vpcola 23:06fac173529e 585
vpcola 23:06fac173529e 586 // Push the data to the thread
vpcola 30:b2aed80037db 587 DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 588 mqueue.put(message);
vpcola 23:06fac173529e 589
vpcola 23:06fac173529e 590 return SUCCESS;
vpcola 23:06fac173529e 591 }
vpcola 23:06fac173529e 592
vpcola 23:06fac173529e 593 int MQTTThreadedClient::sendPublish(PubMessage& message)
vpcola 23:06fac173529e 594 {
vpcola 23:06fac173529e 595 MQTTString topicString = MQTTString_initializer;
vpcola 23:06fac173529e 596
vpcola 23:06fac173529e 597 if (!isConnected)
vpcola 23:06fac173529e 598 {
vpcola 30:b2aed80037db 599 DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 600 return FAILURE;
vpcola 23:06fac173529e 601 }
vpcola 23:06fac173529e 602
vpcola 23:06fac173529e 603 topicString.cstring = (char*) &message.topic[0];
vpcola 23:06fac173529e 604 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
vpcola 23:06fac173529e 605 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
vpcola 23:06fac173529e 606 if (len <= 0)
vpcola 23:06fac173529e 607 {
vpcola 30:b2aed80037db 608 DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 609 return FAILURE;
vpcola 23:06fac173529e 610 }
vpcola 23:06fac173529e 611
vpcola 23:06fac173529e 612 if (sendPacket(len) == SUCCESS)
vpcola 23:06fac173529e 613 {
vpcola 30:b2aed80037db 614 DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 615 return SUCCESS;
vpcola 23:06fac173529e 616 }
vpcola 23:06fac173529e 617
vpcola 30:b2aed80037db 618 DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 619 return FAILURE;
vpcola 23:06fac173529e 620 }
vpcola 25:326f00faa092 621
vpcola 25:326f00faa092 622 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
vpcola 25:326f00faa092 623 {
vpcola 25:326f00faa092 624 // Push the subscription into the map ...
vpcola 25:326f00faa092 625 FP<void,MessageData &> fp;
vpcola 25:326f00faa092 626 fp.attach(function);
vpcola 23:06fac173529e 627
vpcola 26:4b21de8043a5 628 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
vpcola 25:326f00faa092 629 }
vpcola 25:326f00faa092 630
vpcola 25:326f00faa092 631 int MQTTThreadedClient::processSubscriptions()
vpcola 25:326f00faa092 632 {
vpcola 25:326f00faa092 633 int numsubscribed = 0;
vpcola 25:326f00faa092 634
vpcola 25:326f00faa092 635 if (!isConnected)
vpcola 25:326f00faa092 636 {
vpcola 30:b2aed80037db 637 DBG("Session not connected!!\r\n");
vpcola 25:326f00faa092 638 return 0;
vpcola 25:326f00faa092 639 }
vpcola 25:326f00faa092 640
vpcola 30:b2aed80037db 641 DBG("Processing subscribed topics ....\r\n");
vpcola 26:4b21de8043a5 642
vpcola 25:326f00faa092 643 std::map<std::string, FP<void, MessageData &> >::iterator it;
vpcola 26:4b21de8043a5 644 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
vpcola 25:326f00faa092 645 {
vpcola 25:326f00faa092 646 int rc = FAILURE;
vpcola 25:326f00faa092 647 int len = 0;
vpcola 25:326f00faa092 648 //TODO: We only subscribe to QoS = 0 for now
vpcola 25:326f00faa092 649 QoS qos = QOS0;
vpcola 25:326f00faa092 650
vpcola 25:326f00faa092 651 MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
vpcola 30:b2aed80037db 652 DBG("Subscribing to topic [%s]\r\n", topic.cstring);
vpcola 25:326f00faa092 653
vpcola 25:326f00faa092 654
vpcola 25:326f00faa092 655 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
vpcola 25:326f00faa092 656 if (len <= 0) {
vpcola 30:b2aed80037db 657 DBG("Error serializing subscribe packet ...\r\n");
vpcola 25:326f00faa092 658 continue;
vpcola 25:326f00faa092 659 }
vpcola 25:326f00faa092 660
vpcola 25:326f00faa092 661 if ((rc = sendPacket(len)) != SUCCESS) {
vpcola 30:b2aed80037db 662 DBG("Error sending subscribe packet [%d]\r\n", rc);
vpcola 25:326f00faa092 663 continue;
vpcola 25:326f00faa092 664 }
vpcola 25:326f00faa092 665
vpcola 30:b2aed80037db 666 DBG("Waiting for subscription ack ...\r\n");
vpcola 25:326f00faa092 667 // Wait for SUBACK, dropping packets read along the way ...
vpcola 25:326f00faa092 668 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
vpcola 25:326f00faa092 669 int count = 0, grantedQoS = -1;
vpcola 25:326f00faa092 670 unsigned short mypacketid;
vpcola 25:326f00faa092 671 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 25:326f00faa092 672 rc = grantedQoS; // 0, 1, 2 or 0x80
vpcola 25:326f00faa092 673 // For as long as we do not get 0x80 ..
vpcola 25:326f00faa092 674 if (rc != 0x80)
vpcola 25:326f00faa092 675 {
vpcola 25:326f00faa092 676 // Reset connection timers here ...
vpcola 25:326f00faa092 677 resetConnectionTimer();
vpcola 30:b2aed80037db 678 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
vpcola 25:326f00faa092 679 numsubscribed++;
vpcola 25:326f00faa092 680 } else {
vpcola 30:b2aed80037db 681 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
vpcola 25:326f00faa092 682 }
vpcola 25:326f00faa092 683 } else
vpcola 30:b2aed80037db 684 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
vpcola 25:326f00faa092 685 } // end for loop
vpcola 25:326f00faa092 686
vpcola 25:326f00faa092 687 return numsubscribed;
vpcola 25:326f00faa092 688 }
vpcola 25:326f00faa092 689
vpcola 23:06fac173529e 690 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
vpcola 23:06fac173529e 691 {
vpcola 23:06fac173529e 692 char* curf = topicFilter;
vpcola 23:06fac173529e 693 char* curn = topicName.lenstring.data;
vpcola 23:06fac173529e 694 char* curn_end = curn + topicName.lenstring.len;
vpcola 23:06fac173529e 695
vpcola 23:06fac173529e 696 while (*curf && curn < curn_end)
vpcola 23:06fac173529e 697 {
vpcola 23:06fac173529e 698 if (*curn == '/' && *curf != '/')
vpcola 23:06fac173529e 699 break;
vpcola 23:06fac173529e 700 if (*curf != '+' && *curf != '#' && *curf != *curn)
vpcola 23:06fac173529e 701 break;
vpcola 23:06fac173529e 702 if (*curf == '+')
vpcola 23:06fac173529e 703 { // skip until we meet the next separator, or end of string
vpcola 23:06fac173529e 704 char* nextpos = curn + 1;
vpcola 23:06fac173529e 705 while (nextpos < curn_end && *nextpos != '/')
vpcola 23:06fac173529e 706 nextpos = ++curn + 1;
vpcola 23:06fac173529e 707 }
vpcola 23:06fac173529e 708 else if (*curf == '#')
vpcola 23:06fac173529e 709 curn = curn_end - 1; // skip until end of string
vpcola 23:06fac173529e 710 curf++;
vpcola 23:06fac173529e 711 curn++;
vpcola 23:06fac173529e 712 };
vpcola 23:06fac173529e 713
vpcola 23:06fac173529e 714 return (curn == curn_end) && (*curf == '\0');
vpcola 23:06fac173529e 715 }
vpcola 23:06fac173529e 716
vpcola 23:06fac173529e 717 int MQTTThreadedClient::handlePublishMsg()
vpcola 23:06fac173529e 718 {
vpcola 23:06fac173529e 719 MQTTString topicName = MQTTString_initializer;
vpcola 23:06fac173529e 720 Message msg;
vpcola 23:06fac173529e 721 int intQoS;
vpcola 30:b2aed80037db 722 DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 723 if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
vpcola 23:06fac173529e 724 &intQoS,
vpcola 23:06fac173529e 725 (unsigned char*)&msg.retained,
vpcola 23:06fac173529e 726 (unsigned short*)&msg.id,
vpcola 23:06fac173529e 727 &topicName,
vpcola 23:06fac173529e 728 (unsigned char**)&msg.payload,
vpcola 23:06fac173529e 729 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
vpcola 23:06fac173529e 730 {
vpcola 30:b2aed80037db 731 DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 732 return -1;
vpcola 23:06fac173529e 733 }
vpcola 23:06fac173529e 734
vpcola 23:06fac173529e 735 std::string topic;
vpcola 23:06fac173529e 736 if (topicName.lenstring.len > 0)
vpcola 23:06fac173529e 737 {
vpcola 23:06fac173529e 738 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
vpcola 23:06fac173529e 739 }else
vpcola 23:06fac173529e 740 topic = (const char *) topicName.cstring;
vpcola 23:06fac173529e 741
vpcola 30:b2aed80037db 742 DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
vpcola 23:06fac173529e 743
vpcola 23:06fac173529e 744 msg.qos = (QoS) intQoS;
vpcola 23:06fac173529e 745
vpcola 23:06fac173529e 746
vpcola 23:06fac173529e 747 // Call the handlers for each topic
vpcola 23:06fac173529e 748 if (topicCBMap.find(topic) != topicCBMap.end())
vpcola 23:06fac173529e 749 {
vpcola 23:06fac173529e 750 // Call the callback function
vpcola 23:06fac173529e 751 if (topicCBMap[topic].attached())
vpcola 23:06fac173529e 752 {
vpcola 30:b2aed80037db 753 DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 754 MessageData md(topicName, msg);
vpcola 23:06fac173529e 755 topicCBMap[topic](md);
vpcola 23:06fac173529e 756
vpcola 23:06fac173529e 757 return 1;
vpcola 23:06fac173529e 758 }
vpcola 23:06fac173529e 759 }
vpcola 23:06fac173529e 760
vpcola 23:06fac173529e 761 // TODO: depending on the QoS
vpcola 23:06fac173529e 762 // we send data to the server = PUBACK or PUBREC
vpcola 23:06fac173529e 763 switch(intQoS)
vpcola 23:06fac173529e 764 {
vpcola 23:06fac173529e 765 case QOS0:
vpcola 23:06fac173529e 766 // We send back nothing ...
vpcola 23:06fac173529e 767 break;
vpcola 23:06fac173529e 768 case QOS1:
vpcola 23:06fac173529e 769 // TODO: implement
vpcola 23:06fac173529e 770 break;
vpcola 23:06fac173529e 771 case QOS2:
vpcola 23:06fac173529e 772 // TODO: implement
vpcola 23:06fac173529e 773 break;
vpcola 23:06fac173529e 774 default:
vpcola 23:06fac173529e 775 break;
vpcola 23:06fac173529e 776 }
vpcola 23:06fac173529e 777
vpcola 23:06fac173529e 778 return 0;
vpcola 23:06fac173529e 779 }
vpcola 23:06fac173529e 780
vpcola 23:06fac173529e 781 void MQTTThreadedClient::resetConnectionTimer()
vpcola 23:06fac173529e 782 {
vpcola 23:06fac173529e 783 if (keepAliveInterval > 0)
vpcola 23:06fac173529e 784 {
vpcola 23:06fac173529e 785 comTimer.reset();
vpcola 23:06fac173529e 786 comTimer.start();
vpcola 23:06fac173529e 787 }
vpcola 23:06fac173529e 788 }
vpcola 23:06fac173529e 789
vpcola 23:06fac173529e 790 bool MQTTThreadedClient::hasConnectionTimedOut()
vpcola 23:06fac173529e 791 {
vpcola 23:06fac173529e 792 if (keepAliveInterval > 0 ) {
vpcola 23:06fac173529e 793 // Check connection timer
vpcola 23:06fac173529e 794 if (comTimer.read_ms() > keepAliveInterval)
vpcola 23:06fac173529e 795 return true;
vpcola 23:06fac173529e 796 else
vpcola 23:06fac173529e 797 return false;
vpcola 23:06fac173529e 798 }
vpcola 23:06fac173529e 799
vpcola 23:06fac173529e 800 return false;
vpcola 23:06fac173529e 801 }
vpcola 23:06fac173529e 802
vpcola 23:06fac173529e 803 void MQTTThreadedClient::sendPingRequest()
vpcola 23:06fac173529e 804 {
vpcola 23:06fac173529e 805 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
vpcola 23:06fac173529e 806 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
vpcola 23:06fac173529e 807 {
vpcola 30:b2aed80037db 808 DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid());
vpcola 23:06fac173529e 809 }
vpcola 23:06fac173529e 810 }
vpcola 23:06fac173529e 811
vpcola 23:06fac173529e 812 void MQTTThreadedClient::startListener()
vpcola 23:06fac173529e 813 {
vpcola 23:06fac173529e 814 int pType;
vpcola 26:4b21de8043a5 815 int numsubs;
vpcola 23:06fac173529e 816 // Continuesly listens for packets and dispatch
vpcola 23:06fac173529e 817 // message handlers ...
vpcola 26:4b21de8043a5 818 if (useTLS)
vpcola 26:4b21de8043a5 819 {
vpcola 26:4b21de8043a5 820 initTLS();
vpcola 26:4b21de8043a5 821 }
vpcola 26:4b21de8043a5 822
vpcola 26:4b21de8043a5 823 while(true)
vpcola 26:4b21de8043a5 824 {
vpcola 26:4b21de8043a5 825
vpcola 26:4b21de8043a5 826 // Attempt to reconnect and login
vpcola 26:4b21de8043a5 827 if ( connect() < 0 )
vpcola 23:06fac173529e 828 {
vpcola 26:4b21de8043a5 829 disconnect();
vpcola 26:4b21de8043a5 830 // Wait for a few secs and reconnect ...
vpcola 26:4b21de8043a5 831 Thread::wait(6000);
vpcola 26:4b21de8043a5 832 continue;
vpcola 23:06fac173529e 833 }
vpcola 23:06fac173529e 834
vpcola 26:4b21de8043a5 835 numsubs = processSubscriptions();
vpcola 30:b2aed80037db 836 DBG("Subscribed %d topics ...\r\n", numsubs);
vpcola 26:4b21de8043a5 837
vpcola 26:4b21de8043a5 838 // loop read
vpcola 26:4b21de8043a5 839 while(true)
vpcola 26:4b21de8043a5 840 {
vpcola 25:326f00faa092 841 pType = readPacket();
vpcola 26:4b21de8043a5 842 switch(pType)
vpcola 26:4b21de8043a5 843 {
vpcola 25:326f00faa092 844 case TIMEOUT:
vpcola 25:326f00faa092 845 // No data available from the network ...
vpcola 25:326f00faa092 846 break;
vpcola 25:326f00faa092 847 case FAILURE:
vpcola 26:4b21de8043a5 848 {
vpcola 30:b2aed80037db 849 DBG("readPacket returned failure \r\n");
vpcola 26:4b21de8043a5 850 goto reconnect;
vpcola 26:4b21de8043a5 851 }
vpcola 25:326f00faa092 852 case BUFFER_OVERFLOW:
vpcola 25:326f00faa092 853 {
vpcola 25:326f00faa092 854 // TODO: Network error, do we disconnect and reconnect?
vpcola 30:b2aed80037db 855 DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
vpcola 25:326f00faa092 856 MBED_ASSERT(false);
vpcola 25:326f00faa092 857 }
vpcola 25:326f00faa092 858 break;
vpcola 25:326f00faa092 859 /**
vpcola 25:326f00faa092 860 * The rest of the return codes below (all positive) is about MQTT
vpcola 25:326f00faa092 861 * response codes
vpcola 25:326f00faa092 862 **/
vpcola 25:326f00faa092 863 case CONNACK:
vpcola 25:326f00faa092 864 case PUBACK:
vpcola 25:326f00faa092 865 case SUBACK:
vpcola 25:326f00faa092 866 break;
vpcola 25:326f00faa092 867 case PUBLISH:
vpcola 25:326f00faa092 868 {
vpcola 30:b2aed80037db 869 DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
vpcola 25:326f00faa092 870 // We receive data from the MQTT server ..
vpcola 25:326f00faa092 871 if (handlePublishMsg() < 0) {
vpcola 30:b2aed80037db 872 DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
vpcola 25:326f00faa092 873 break;
vpcola 25:326f00faa092 874 }
vpcola 25:326f00faa092 875 }
vpcola 25:326f00faa092 876 break;
vpcola 25:326f00faa092 877 case PINGRESP:
vpcola 25:326f00faa092 878 {
vpcola 30:b2aed80037db 879 DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid());
vpcola 25:326f00faa092 880 resetConnectionTimer();
vpcola 25:326f00faa092 881 }
vpcola 25:326f00faa092 882 break;
vpcola 25:326f00faa092 883 default:
vpcola 30:b2aed80037db 884 DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
vpcola 25:326f00faa092 885 }
vpcola 25:326f00faa092 886
vpcola 25:326f00faa092 887 // Check if its time to send a keepAlive packet
vpcola 25:326f00faa092 888 if (hasConnectionTimedOut()) {
vpcola 25:326f00faa092 889 // Queue the ping request so that other
vpcola 25:326f00faa092 890 // pending operations queued above will go first
vpcola 25:326f00faa092 891 queue.call(this, &MQTTThreadedClient::sendPingRequest);
vpcola 25:326f00faa092 892 }
vpcola 25:326f00faa092 893
vpcola 25:326f00faa092 894 // Check if we have messages on the message queue
vpcola 25:326f00faa092 895 osEvent evt = mqueue.get(10);
vpcola 25:326f00faa092 896 if (evt.status == osEventMessage) {
vpcola 25:326f00faa092 897
vpcola 30:b2aed80037db 898 DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
vpcola 25:326f00faa092 899
vpcola 25:326f00faa092 900 // Unpack the message
vpcola 25:326f00faa092 901 PubMessage * message = (PubMessage *)evt.value.p;
vpcola 25:326f00faa092 902
vpcola 25:326f00faa092 903 // Send the packet, do not queue the call
vpcola 25:326f00faa092 904 // like the ping above ..
vpcola 25:326f00faa092 905 if ( sendPublish(*message) == SUCCESS) {
vpcola 25:326f00faa092 906 // Reset timers if we have been able to send successfully
vpcola 25:326f00faa092 907 resetConnectionTimer();
vpcola 25:326f00faa092 908 } else {
vpcola 25:326f00faa092 909 // Disconnected?
vpcola 25:326f00faa092 910 goto reconnect;
vpcola 25:326f00faa092 911 }
vpcola 25:326f00faa092 912
vpcola 25:326f00faa092 913 // Free the message from mempool after using
vpcola 25:326f00faa092 914 mpool.free(message);
vpcola 25:326f00faa092 915 }
vpcola 25:326f00faa092 916
vpcola 25:326f00faa092 917 // Dispatch any queued events ...
vpcola 25:326f00faa092 918 queue.dispatch(100);
vpcola 25:326f00faa092 919 } // end while loop
vpcola 25:326f00faa092 920
vpcola 25:326f00faa092 921 reconnect:
vpcola 26:4b21de8043a5 922 // reconnect?
vpcola 30:b2aed80037db 923 DBG("Client disconnected!! ... retrying ...\r\n");
vpcola 26:4b21de8043a5 924 disconnect();
vpcola 26:4b21de8043a5 925
vpcola 26:4b21de8043a5 926 };
vpcola 23:06fac173529e 927 }
vpcola 26:4b21de8043a5 928
vpcola 26:4b21de8043a5 929 void MQTTThreadedClient::stopListener()
vpcola 26:4b21de8043a5 930 {
vpcola 26:4b21de8043a5 931 // TODO: Set a signal/flag that the running thread
vpcola 26:4b21de8043a5 932 // will check if its ok to stop ...
vpcola 30:b2aed80037db 933 }
vpcola 30:b2aed80037db 934
vpcola 26:4b21de8043a5 935 }