Fork of my original MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:43:14 2017 +0000
Revision:
0:a1734fe1ec4b
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:a1734fe1ec4b 1 #include "mbed.h"
vpcola 0:a1734fe1ec4b 2 #include "rtos.h"
vpcola 0:a1734fe1ec4b 3 #include "MQTTThreadedClient.h"
vpcola 0:a1734fe1ec4b 4 #include "mbedtls/platform.h"
vpcola 0:a1734fe1ec4b 5 #include "mbedtls/ssl.h"
vpcola 0:a1734fe1ec4b 6 #include "mbedtls/entropy.h"
vpcola 0:a1734fe1ec4b 7 #include "mbedtls/ctr_drbg.h"
vpcola 0:a1734fe1ec4b 8 #include "mbedtls/error.h"
vpcola 0:a1734fe1ec4b 9
vpcola 0:a1734fe1ec4b 10 #ifdef DEBUG
vpcola 0:a1734fe1ec4b 11 #define DBG(fmt, args...) printf(fmt, ## args)
vpcola 0:a1734fe1ec4b 12 #else
vpcola 0:a1734fe1ec4b 13 #define DBG(fmt, args...) /* Don't do anything in release builds */
vpcola 0:a1734fe1ec4b 14 #endif
vpcola 0:a1734fe1ec4b 15
vpcola 0:a1734fe1ec4b 16 static MemoryPool<MQTT::PubMessage, 4> mpool;
vpcola 0:a1734fe1ec4b 17 static Queue<MQTT::PubMessage, 4> mqueue;
vpcola 0:a1734fe1ec4b 18
vpcola 0:a1734fe1ec4b 19 // SSL/TLS variables
vpcola 0:a1734fe1ec4b 20 mbedtls_entropy_context _entropy;
vpcola 0:a1734fe1ec4b 21 mbedtls_ctr_drbg_context _ctr_drbg;
vpcola 0:a1734fe1ec4b 22 mbedtls_x509_crt _cacert;
vpcola 0:a1734fe1ec4b 23 mbedtls_ssl_context _ssl;
vpcola 0:a1734fe1ec4b 24 mbedtls_ssl_config _ssl_conf;
vpcola 0:a1734fe1ec4b 25 mbedtls_ssl_session saved_session;
vpcola 0:a1734fe1ec4b 26
vpcola 0:a1734fe1ec4b 27 namespace MQTT {
vpcola 0:a1734fe1ec4b 28
vpcola 0:a1734fe1ec4b 29 /**
vpcola 0:a1734fe1ec4b 30 * Receive callback for mbed TLS
vpcola 0:a1734fe1ec4b 31 */
vpcola 0:a1734fe1ec4b 32 static int ssl_recv(void *ctx, unsigned char *buf, size_t len)
vpcola 0:a1734fe1ec4b 33 {
vpcola 0:a1734fe1ec4b 34 int recv = -1;
vpcola 0:a1734fe1ec4b 35 TCPSocket *socket = static_cast<TCPSocket *>(ctx);
vpcola 0:a1734fe1ec4b 36 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
vpcola 0:a1734fe1ec4b 37 recv = socket->recv(buf, len);
vpcola 0:a1734fe1ec4b 38
vpcola 0:a1734fe1ec4b 39 if (NSAPI_ERROR_WOULD_BLOCK == recv) {
vpcola 0:a1734fe1ec4b 40 return MBEDTLS_ERR_SSL_WANT_READ;
vpcola 0:a1734fe1ec4b 41 } else if (recv < 0) {
vpcola 0:a1734fe1ec4b 42 return -1;
vpcola 0:a1734fe1ec4b 43 } else {
vpcola 0:a1734fe1ec4b 44 return recv;
vpcola 0:a1734fe1ec4b 45 }
vpcola 0:a1734fe1ec4b 46 }
vpcola 0:a1734fe1ec4b 47
vpcola 0:a1734fe1ec4b 48 /**
vpcola 0:a1734fe1ec4b 49 * Send callback for mbed TLS
vpcola 0:a1734fe1ec4b 50 */
vpcola 0:a1734fe1ec4b 51 static int ssl_send(void *ctx, const unsigned char *buf, size_t len)
vpcola 0:a1734fe1ec4b 52 {
vpcola 0:a1734fe1ec4b 53 int sent = -1;
vpcola 0:a1734fe1ec4b 54 TCPSocket *socket = static_cast<TCPSocket *>(ctx);
vpcola 0:a1734fe1ec4b 55 socket->set_timeout(DEFAULT_SOCKET_TIMEOUT);
vpcola 0:a1734fe1ec4b 56 sent = socket->send(buf, len);
vpcola 0:a1734fe1ec4b 57
vpcola 0:a1734fe1ec4b 58 if(NSAPI_ERROR_WOULD_BLOCK == sent) {
vpcola 0:a1734fe1ec4b 59 return MBEDTLS_ERR_SSL_WANT_WRITE;
vpcola 0:a1734fe1ec4b 60 } else if (sent < 0) {
vpcola 0:a1734fe1ec4b 61 return -1;
vpcola 0:a1734fe1ec4b 62 } else {
vpcola 0:a1734fe1ec4b 63 return sent;
vpcola 0:a1734fe1ec4b 64 }
vpcola 0:a1734fe1ec4b 65 }
vpcola 0:a1734fe1ec4b 66
vpcola 0:a1734fe1ec4b 67 #if DEBUG_LEVEL > 0
vpcola 0:a1734fe1ec4b 68 /**
vpcola 0:a1734fe1ec4b 69 * Debug callback for mbed TLS
vpcola 0:a1734fe1ec4b 70 * Just prints on the USB serial port
vpcola 0:a1734fe1ec4b 71 */
vpcola 0:a1734fe1ec4b 72 static void my_debug(void *ctx, int level, const char *file, int line,
vpcola 0:a1734fe1ec4b 73 const char *str)
vpcola 0:a1734fe1ec4b 74 {
vpcola 0:a1734fe1ec4b 75 const char *p, *basename;
vpcola 0:a1734fe1ec4b 76 (void) ctx;
vpcola 0:a1734fe1ec4b 77
vpcola 0:a1734fe1ec4b 78 /* Extract basename from file */
vpcola 0:a1734fe1ec4b 79 for(p = basename = file; *p != '\0'; p++) {
vpcola 0:a1734fe1ec4b 80 if(*p == '/' || *p == '\\') {
vpcola 0:a1734fe1ec4b 81 basename = p + 1;
vpcola 0:a1734fe1ec4b 82 }
vpcola 0:a1734fe1ec4b 83 }
vpcola 0:a1734fe1ec4b 84
vpcola 0:a1734fe1ec4b 85 if (_debug) {
vpcola 0:a1734fe1ec4b 86 mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str);
vpcola 0:a1734fe1ec4b 87 }
vpcola 0:a1734fe1ec4b 88 }
vpcola 0:a1734fe1ec4b 89
vpcola 0:a1734fe1ec4b 90 /**
vpcola 0:a1734fe1ec4b 91 * Certificate verification callback for mbed TLS
vpcola 0:a1734fe1ec4b 92 * Here we only use it to display information on each cert in the chain
vpcola 0:a1734fe1ec4b 93 */
vpcola 0:a1734fe1ec4b 94 static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags)
vpcola 0:a1734fe1ec4b 95 {
vpcola 0:a1734fe1ec4b 96 const uint32_t buf_size = 1024;
vpcola 0:a1734fe1ec4b 97 char *buf = new char[buf_size];
vpcola 0:a1734fe1ec4b 98 (void) data;
vpcola 0:a1734fe1ec4b 99
vpcola 0:a1734fe1ec4b 100 if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth);
vpcola 0:a1734fe1ec4b 101 mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt);
vpcola 0:a1734fe1ec4b 102 if (_debug) mbedtls_printf("%s", buf);
vpcola 0:a1734fe1ec4b 103
vpcola 0:a1734fe1ec4b 104 if (*flags == 0)
vpcola 0:a1734fe1ec4b 105 if (_debug) mbedtls_printf("No verification issue for this certificate\r\n");
vpcola 0:a1734fe1ec4b 106 else {
vpcola 0:a1734fe1ec4b 107 mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags);
vpcola 0:a1734fe1ec4b 108 if (_debug) mbedtls_printf("%s\n", buf);
vpcola 0:a1734fe1ec4b 109 }
vpcola 0:a1734fe1ec4b 110
vpcola 0:a1734fe1ec4b 111 delete[] buf;
vpcola 0:a1734fe1ec4b 112 return 0;
vpcola 0:a1734fe1ec4b 113 }
vpcola 0:a1734fe1ec4b 114 #endif
vpcola 0:a1734fe1ec4b 115
vpcola 0:a1734fe1ec4b 116
vpcola 0:a1734fe1ec4b 117 void MQTTThreadedClient::setupTLS()
vpcola 0:a1734fe1ec4b 118 {
vpcola 0:a1734fe1ec4b 119 if (useTLS)
vpcola 0:a1734fe1ec4b 120 {
vpcola 0:a1734fe1ec4b 121 mbedtls_entropy_init(&_entropy);
vpcola 0:a1734fe1ec4b 122 mbedtls_ctr_drbg_init(&_ctr_drbg);
vpcola 0:a1734fe1ec4b 123 mbedtls_x509_crt_init(&_cacert);
vpcola 0:a1734fe1ec4b 124 mbedtls_ssl_init(&_ssl);
vpcola 0:a1734fe1ec4b 125 mbedtls_ssl_config_init(&_ssl_conf);
vpcola 0:a1734fe1ec4b 126 memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) );
vpcola 0:a1734fe1ec4b 127 }
vpcola 0:a1734fe1ec4b 128 }
vpcola 0:a1734fe1ec4b 129
vpcola 0:a1734fe1ec4b 130 void MQTTThreadedClient::freeTLS()
vpcola 0:a1734fe1ec4b 131 {
vpcola 0:a1734fe1ec4b 132 if (useTLS)
vpcola 0:a1734fe1ec4b 133 {
vpcola 0:a1734fe1ec4b 134 mbedtls_entropy_free(&_entropy);
vpcola 0:a1734fe1ec4b 135 mbedtls_ctr_drbg_free(&_ctr_drbg);
vpcola 0:a1734fe1ec4b 136 mbedtls_x509_crt_free(&_cacert);
vpcola 0:a1734fe1ec4b 137 mbedtls_ssl_free(&_ssl);
vpcola 0:a1734fe1ec4b 138 mbedtls_ssl_config_free(&_ssl_conf);
vpcola 0:a1734fe1ec4b 139 }
vpcola 0:a1734fe1ec4b 140 }
vpcola 0:a1734fe1ec4b 141
vpcola 0:a1734fe1ec4b 142 int MQTTThreadedClient::initTLS()
vpcola 0:a1734fe1ec4b 143 {
vpcola 0:a1734fe1ec4b 144 int ret;
vpcola 0:a1734fe1ec4b 145
vpcola 0:a1734fe1ec4b 146 DBG("Initializing TLS ...\r\n");
vpcola 0:a1734fe1ec4b 147 DBG("mbedtls_ctr_drdbg_seed ...\r\n");
vpcola 0:a1734fe1ec4b 148 if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy,
vpcola 0:a1734fe1ec4b 149 (const unsigned char *) DRBG_PERS,
vpcola 0:a1734fe1ec4b 150 sizeof (DRBG_PERS))) != 0) {
vpcola 0:a1734fe1ec4b 151 mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret);
vpcola 0:a1734fe1ec4b 152 _error = ret;
vpcola 0:a1734fe1ec4b 153 return -1;
vpcola 0:a1734fe1ec4b 154 }
vpcola 0:a1734fe1ec4b 155 DBG("mbedtls_x509_crt_parse ...\r\n");
vpcola 0:a1734fe1ec4b 156 if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem,
vpcola 0:a1734fe1ec4b 157 strlen(ssl_ca_pem) + 1)) != 0) {
vpcola 0:a1734fe1ec4b 158 mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret);
vpcola 0:a1734fe1ec4b 159 _error = ret;
vpcola 0:a1734fe1ec4b 160 return -1;
vpcola 0:a1734fe1ec4b 161 }
vpcola 0:a1734fe1ec4b 162
vpcola 0:a1734fe1ec4b 163 DBG("mbedtls_ssl_config_defaults ...\r\n");
vpcola 0:a1734fe1ec4b 164 if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf,
vpcola 0:a1734fe1ec4b 165 MBEDTLS_SSL_IS_CLIENT,
vpcola 0:a1734fe1ec4b 166 MBEDTLS_SSL_TRANSPORT_STREAM,
vpcola 0:a1734fe1ec4b 167 MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
vpcola 0:a1734fe1ec4b 168 mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret);
vpcola 0:a1734fe1ec4b 169 _error = ret;
vpcola 0:a1734fe1ec4b 170 return -1;
vpcola 0:a1734fe1ec4b 171 }
vpcola 0:a1734fe1ec4b 172
vpcola 0:a1734fe1ec4b 173 DBG("mbedtls_ssl_config_ca_chain ...\r\n");
vpcola 0:a1734fe1ec4b 174 mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL);
vpcola 0:a1734fe1ec4b 175 DBG("mbedtls_ssl_conf_rng ...\r\n");
vpcola 0:a1734fe1ec4b 176 mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg);
vpcola 0:a1734fe1ec4b 177
vpcola 0:a1734fe1ec4b 178 /* It is possible to disable authentication by passing
vpcola 0:a1734fe1ec4b 179 * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode()
vpcola 0:a1734fe1ec4b 180 */
vpcola 0:a1734fe1ec4b 181 DBG("mbedtls_ssl_conf_authmode ...\r\n");
vpcola 0:a1734fe1ec4b 182 mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED);
vpcola 0:a1734fe1ec4b 183
vpcola 0:a1734fe1ec4b 184 #if DEBUG_LEVEL > 0
vpcola 0:a1734fe1ec4b 185 mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL);
vpcola 0:a1734fe1ec4b 186 mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL);
vpcola 0:a1734fe1ec4b 187 mbedtls_debug_set_threshold(DEBUG_LEVEL);
vpcola 0:a1734fe1ec4b 188 #endif
vpcola 0:a1734fe1ec4b 189
vpcola 0:a1734fe1ec4b 190 DBG("mbedtls_ssl_setup ...\r\n");
vpcola 0:a1734fe1ec4b 191 if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) {
vpcola 0:a1734fe1ec4b 192 mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret);
vpcola 0:a1734fe1ec4b 193 _error = ret;
vpcola 0:a1734fe1ec4b 194 return -1;
vpcola 0:a1734fe1ec4b 195 }
vpcola 0:a1734fe1ec4b 196
vpcola 0:a1734fe1ec4b 197 return 0;
vpcola 0:a1734fe1ec4b 198 }
vpcola 0:a1734fe1ec4b 199
vpcola 0:a1734fe1ec4b 200 int MQTTThreadedClient::doTLSHandshake()
vpcola 0:a1734fe1ec4b 201 {
vpcola 0:a1734fe1ec4b 202 int ret;
vpcola 0:a1734fe1ec4b 203
vpcola 0:a1734fe1ec4b 204 /* Start the handshake, the rest will be done in onReceive() */
vpcola 0:a1734fe1ec4b 205 DBG("Starting the TLS handshake...\r\n");
vpcola 0:a1734fe1ec4b 206 ret = mbedtls_ssl_handshake(&_ssl);
vpcola 0:a1734fe1ec4b 207 if (ret < 0)
vpcola 0:a1734fe1ec4b 208 {
vpcola 0:a1734fe1ec4b 209 if (ret != MBEDTLS_ERR_SSL_WANT_READ &&
vpcola 0:a1734fe1ec4b 210 ret != MBEDTLS_ERR_SSL_WANT_WRITE)
vpcola 0:a1734fe1ec4b 211 mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret);
vpcola 0:a1734fe1ec4b 212 else
vpcola 0:a1734fe1ec4b 213 {
vpcola 0:a1734fe1ec4b 214 // do not close the socket if timed out
vpcola 0:a1734fe1ec4b 215 ret = TIMEOUT;
vpcola 0:a1734fe1ec4b 216 }
vpcola 0:a1734fe1ec4b 217 return ret;
vpcola 0:a1734fe1ec4b 218 }
vpcola 0:a1734fe1ec4b 219
vpcola 0:a1734fe1ec4b 220 /* Handshake done, time to print info */
vpcola 0:a1734fe1ec4b 221 DBG("TLS connection to %s:%d established\r\n",
vpcola 0:a1734fe1ec4b 222 host.c_str(), port);
vpcola 0:a1734fe1ec4b 223
vpcola 0:a1734fe1ec4b 224 const uint32_t buf_size = 1024;
vpcola 0:a1734fe1ec4b 225 char *buf = new char[buf_size];
vpcola 0:a1734fe1ec4b 226 mbedtls_x509_crt_info(buf, buf_size, "\r ",
vpcola 0:a1734fe1ec4b 227 mbedtls_ssl_get_peer_cert(&_ssl));
vpcola 0:a1734fe1ec4b 228
vpcola 0:a1734fe1ec4b 229 DBG("Server certificate:\r\n%s\r", buf);
vpcola 0:a1734fe1ec4b 230 // Verify server cert ...
vpcola 0:a1734fe1ec4b 231 uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl);
vpcola 0:a1734fe1ec4b 232 if( flags != 0 )
vpcola 0:a1734fe1ec4b 233 {
vpcola 0:a1734fe1ec4b 234 mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags);
vpcola 0:a1734fe1ec4b 235 DBG("Certificate verification failed:\r\n%s\r\r\n", buf);
vpcola 0:a1734fe1ec4b 236 // free server cert ... before error return
vpcola 0:a1734fe1ec4b 237 delete [] buf;
vpcola 0:a1734fe1ec4b 238 return -1;
vpcola 0:a1734fe1ec4b 239 }
vpcola 0:a1734fe1ec4b 240
vpcola 0:a1734fe1ec4b 241 DBG("Certificate verification passed\r\n\r\n");
vpcola 0:a1734fe1ec4b 242 // delete server cert after verification
vpcola 0:a1734fe1ec4b 243 delete [] buf;
vpcola 0:a1734fe1ec4b 244
vpcola 0:a1734fe1ec4b 245 #if defined(MBEDTLS_SSL_CLI_C)
vpcola 0:a1734fe1ec4b 246 // TODO: Save the session here for reconnect.
vpcola 0:a1734fe1ec4b 247 if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 )
vpcola 0:a1734fe1ec4b 248 {
vpcola 0:a1734fe1ec4b 249 mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret );
vpcola 0:a1734fe1ec4b 250 hasSavedSession = false;
vpcola 0:a1734fe1ec4b 251 return -1;
vpcola 0:a1734fe1ec4b 252 }
vpcola 0:a1734fe1ec4b 253 #endif
vpcola 0:a1734fe1ec4b 254 DBG("Session saved for reconnect ...\r\n");
vpcola 0:a1734fe1ec4b 255 hasSavedSession = true;
vpcola 0:a1734fe1ec4b 256
vpcola 0:a1734fe1ec4b 257 return 0;
vpcola 0:a1734fe1ec4b 258 }
vpcola 0:a1734fe1ec4b 259
vpcola 0:a1734fe1ec4b 260 int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout)
vpcola 0:a1734fe1ec4b 261 {
vpcola 0:a1734fe1ec4b 262 int rc;
vpcola 0:a1734fe1ec4b 263
vpcola 0:a1734fe1ec4b 264 if (tcpSocket == NULL)
vpcola 0:a1734fe1ec4b 265 return -1;
vpcola 0:a1734fe1ec4b 266
vpcola 0:a1734fe1ec4b 267 if (useTLS)
vpcola 0:a1734fe1ec4b 268 {
vpcola 0:a1734fe1ec4b 269 // Do SSL/TLS read
vpcola 0:a1734fe1ec4b 270 rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size);
vpcola 0:a1734fe1ec4b 271 if (MBEDTLS_ERR_SSL_WANT_READ == rc)
vpcola 0:a1734fe1ec4b 272 return TIMEOUT;
vpcola 0:a1734fe1ec4b 273 else
vpcola 0:a1734fe1ec4b 274 return rc;
vpcola 0:a1734fe1ec4b 275 } else {
vpcola 0:a1734fe1ec4b 276 // non-blocking socket ...
vpcola 0:a1734fe1ec4b 277 tcpSocket->set_timeout(timeout);
vpcola 0:a1734fe1ec4b 278 rc = tcpSocket->recv( (void *) buffer, size);
vpcola 0:a1734fe1ec4b 279
vpcola 0:a1734fe1ec4b 280 // return 0 bytes if timeout ...
vpcola 0:a1734fe1ec4b 281 if (NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 0:a1734fe1ec4b 282 return TIMEOUT;
vpcola 0:a1734fe1ec4b 283 else
vpcola 0:a1734fe1ec4b 284 return rc; // return the number of bytes received or error
vpcola 0:a1734fe1ec4b 285 }
vpcola 0:a1734fe1ec4b 286 }
vpcola 0:a1734fe1ec4b 287
vpcola 0:a1734fe1ec4b 288 int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout)
vpcola 0:a1734fe1ec4b 289 {
vpcola 0:a1734fe1ec4b 290 int rc;
vpcola 0:a1734fe1ec4b 291
vpcola 0:a1734fe1ec4b 292 if (tcpSocket == NULL)
vpcola 0:a1734fe1ec4b 293 return -1;
vpcola 0:a1734fe1ec4b 294
vpcola 0:a1734fe1ec4b 295 if (useTLS) {
vpcola 0:a1734fe1ec4b 296 // Do SSL/TLS write
vpcola 0:a1734fe1ec4b 297 rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size);
vpcola 0:a1734fe1ec4b 298 if (MBEDTLS_ERR_SSL_WANT_WRITE == rc)
vpcola 0:a1734fe1ec4b 299 return TIMEOUT;
vpcola 0:a1734fe1ec4b 300 else
vpcola 0:a1734fe1ec4b 301 return rc;
vpcola 0:a1734fe1ec4b 302 } else {
vpcola 0:a1734fe1ec4b 303
vpcola 0:a1734fe1ec4b 304 // set the write timeout
vpcola 0:a1734fe1ec4b 305 tcpSocket->set_timeout(timeout);
vpcola 0:a1734fe1ec4b 306 rc = tcpSocket->send(buffer, size);
vpcola 0:a1734fe1ec4b 307
vpcola 0:a1734fe1ec4b 308 if ( NSAPI_ERROR_WOULD_BLOCK == rc)
vpcola 0:a1734fe1ec4b 309 return TIMEOUT;
vpcola 0:a1734fe1ec4b 310 else
vpcola 0:a1734fe1ec4b 311 return rc;
vpcola 0:a1734fe1ec4b 312 }
vpcola 0:a1734fe1ec4b 313 }
vpcola 0:a1734fe1ec4b 314
vpcola 0:a1734fe1ec4b 315 int MQTTThreadedClient::readPacketLength(int* value)
vpcola 0:a1734fe1ec4b 316 {
vpcola 0:a1734fe1ec4b 317 int rc = MQTTPACKET_READ_ERROR;
vpcola 0:a1734fe1ec4b 318 unsigned char c;
vpcola 0:a1734fe1ec4b 319 int multiplier = 1;
vpcola 0:a1734fe1ec4b 320 int len = 0;
vpcola 0:a1734fe1ec4b 321 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
vpcola 0:a1734fe1ec4b 322
vpcola 0:a1734fe1ec4b 323 *value = 0;
vpcola 0:a1734fe1ec4b 324 do
vpcola 0:a1734fe1ec4b 325 {
vpcola 0:a1734fe1ec4b 326 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
vpcola 0:a1734fe1ec4b 327 {
vpcola 0:a1734fe1ec4b 328 rc = MQTTPACKET_READ_ERROR; /* bad data */
vpcola 0:a1734fe1ec4b 329 goto exit;
vpcola 0:a1734fe1ec4b 330 }
vpcola 0:a1734fe1ec4b 331
vpcola 0:a1734fe1ec4b 332 rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT);
vpcola 0:a1734fe1ec4b 333 if (rc != 1)
vpcola 0:a1734fe1ec4b 334 {
vpcola 0:a1734fe1ec4b 335 rc = MQTTPACKET_READ_ERROR;
vpcola 0:a1734fe1ec4b 336 goto exit;
vpcola 0:a1734fe1ec4b 337 }
vpcola 0:a1734fe1ec4b 338
vpcola 0:a1734fe1ec4b 339 *value += (c & 127) * multiplier;
vpcola 0:a1734fe1ec4b 340 multiplier *= 128;
vpcola 0:a1734fe1ec4b 341 } while ((c & 128) != 0);
vpcola 0:a1734fe1ec4b 342
vpcola 0:a1734fe1ec4b 343 rc = MQTTPACKET_READ_COMPLETE;
vpcola 0:a1734fe1ec4b 344
vpcola 0:a1734fe1ec4b 345 exit:
vpcola 0:a1734fe1ec4b 346 if (rc == MQTTPACKET_READ_ERROR )
vpcola 0:a1734fe1ec4b 347 len = -1;
vpcola 0:a1734fe1ec4b 348
vpcola 0:a1734fe1ec4b 349 return len;
vpcola 0:a1734fe1ec4b 350 }
vpcola 0:a1734fe1ec4b 351
vpcola 0:a1734fe1ec4b 352 int MQTTThreadedClient::sendPacket(size_t length)
vpcola 0:a1734fe1ec4b 353 {
vpcola 0:a1734fe1ec4b 354 int rc = FAILURE;
vpcola 0:a1734fe1ec4b 355 int sent = 0;
vpcola 0:a1734fe1ec4b 356
vpcola 0:a1734fe1ec4b 357 while (sent < length)
vpcola 0:a1734fe1ec4b 358 {
vpcola 0:a1734fe1ec4b 359 rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT);
vpcola 0:a1734fe1ec4b 360 if (rc < 0) // there was an error writing the data
vpcola 0:a1734fe1ec4b 361 break;
vpcola 0:a1734fe1ec4b 362 sent += rc;
vpcola 0:a1734fe1ec4b 363 }
vpcola 0:a1734fe1ec4b 364
vpcola 0:a1734fe1ec4b 365 if (sent == length)
vpcola 0:a1734fe1ec4b 366 rc = SUCCESS;
vpcola 0:a1734fe1ec4b 367 else
vpcola 0:a1734fe1ec4b 368 rc = FAILURE;
vpcola 0:a1734fe1ec4b 369
vpcola 0:a1734fe1ec4b 370 return rc;
vpcola 0:a1734fe1ec4b 371 }
vpcola 0:a1734fe1ec4b 372 /**
vpcola 0:a1734fe1ec4b 373 * Reads the entire packet to readbuf and returns
vpcola 0:a1734fe1ec4b 374 * the type of packet when successful, otherwise
vpcola 0:a1734fe1ec4b 375 * a negative error code is returned.
vpcola 0:a1734fe1ec4b 376 **/
vpcola 0:a1734fe1ec4b 377 int MQTTThreadedClient::readPacket()
vpcola 0:a1734fe1ec4b 378 {
vpcola 0:a1734fe1ec4b 379 int rc = FAILURE;
vpcola 0:a1734fe1ec4b 380 MQTTHeader header = {0};
vpcola 0:a1734fe1ec4b 381 int len = 0;
vpcola 0:a1734fe1ec4b 382 int rem_len = 0;
vpcola 0:a1734fe1ec4b 383
vpcola 0:a1734fe1ec4b 384 /* 1. read the header byte. This has the packet type in it */
vpcola 0:a1734fe1ec4b 385 if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1)
vpcola 0:a1734fe1ec4b 386 goto exit;
vpcola 0:a1734fe1ec4b 387
vpcola 0:a1734fe1ec4b 388 len = 1;
vpcola 0:a1734fe1ec4b 389 /* 2. read the remaining length. This is variable in itself */
vpcola 0:a1734fe1ec4b 390 if ( readPacketLength(&rem_len) < 0 )
vpcola 0:a1734fe1ec4b 391 goto exit;
vpcola 0:a1734fe1ec4b 392
vpcola 0:a1734fe1ec4b 393 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */
vpcola 0:a1734fe1ec4b 394
vpcola 0:a1734fe1ec4b 395 if (rem_len > (MAX_MQTT_PACKET_SIZE - len))
vpcola 0:a1734fe1ec4b 396 {
vpcola 0:a1734fe1ec4b 397 rc = BUFFER_OVERFLOW;
vpcola 0:a1734fe1ec4b 398 goto exit;
vpcola 0:a1734fe1ec4b 399 }
vpcola 0:a1734fe1ec4b 400
vpcola 0:a1734fe1ec4b 401 /* 3. read the rest of the buffer using a callback to supply the rest of the data */
vpcola 0:a1734fe1ec4b 402 if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len))
vpcola 0:a1734fe1ec4b 403 goto exit;
vpcola 0:a1734fe1ec4b 404
vpcola 0:a1734fe1ec4b 405 // Convert the header to type
vpcola 0:a1734fe1ec4b 406 // and update rc
vpcola 0:a1734fe1ec4b 407 header.byte = readbuf[0];
vpcola 0:a1734fe1ec4b 408 rc = header.bits.type;
vpcola 0:a1734fe1ec4b 409
vpcola 0:a1734fe1ec4b 410 exit:
vpcola 0:a1734fe1ec4b 411
vpcola 0:a1734fe1ec4b 412 return rc;
vpcola 0:a1734fe1ec4b 413 }
vpcola 0:a1734fe1ec4b 414
vpcola 0:a1734fe1ec4b 415 /**
vpcola 0:a1734fe1ec4b 416 * Read until a specified packet type is received, or untill the specified
vpcola 0:a1734fe1ec4b 417 * timeout dropping packets along the way.
vpcola 0:a1734fe1ec4b 418 **/
vpcola 0:a1734fe1ec4b 419 int MQTTThreadedClient::readUntil(int packetType, int timeout)
vpcola 0:a1734fe1ec4b 420 {
vpcola 0:a1734fe1ec4b 421 int pType = FAILURE;
vpcola 0:a1734fe1ec4b 422 Timer timer;
vpcola 0:a1734fe1ec4b 423
vpcola 0:a1734fe1ec4b 424 timer.start();
vpcola 0:a1734fe1ec4b 425 do {
vpcola 0:a1734fe1ec4b 426 pType = readPacket();
vpcola 0:a1734fe1ec4b 427 if (pType < 0)
vpcola 0:a1734fe1ec4b 428 break;
vpcola 0:a1734fe1ec4b 429
vpcola 0:a1734fe1ec4b 430 if (timer.read_ms() > timeout)
vpcola 0:a1734fe1ec4b 431 {
vpcola 0:a1734fe1ec4b 432 pType = FAILURE;
vpcola 0:a1734fe1ec4b 433 break;
vpcola 0:a1734fe1ec4b 434 }
vpcola 0:a1734fe1ec4b 435 }while(pType != packetType);
vpcola 0:a1734fe1ec4b 436
vpcola 0:a1734fe1ec4b 437 return pType;
vpcola 0:a1734fe1ec4b 438 }
vpcola 0:a1734fe1ec4b 439
vpcola 0:a1734fe1ec4b 440
vpcola 0:a1734fe1ec4b 441 int MQTTThreadedClient::login()
vpcola 0:a1734fe1ec4b 442 {
vpcola 0:a1734fe1ec4b 443 int rc = FAILURE;
vpcola 0:a1734fe1ec4b 444 int len = 0;
vpcola 0:a1734fe1ec4b 445
vpcola 0:a1734fe1ec4b 446 if (!isConnected)
vpcola 0:a1734fe1ec4b 447 {
vpcola 0:a1734fe1ec4b 448 DBG("Session not connected! \r\n");
vpcola 0:a1734fe1ec4b 449 return rc;
vpcola 0:a1734fe1ec4b 450 }
vpcola 0:a1734fe1ec4b 451
vpcola 0:a1734fe1ec4b 452 // Copy the keepAliveInterval value to local
vpcola 0:a1734fe1ec4b 453 // MQTT specifies in seconds, we have to multiply that
vpcola 0:a1734fe1ec4b 454 // amount for our 32 bit timers which accepts ms.
vpcola 0:a1734fe1ec4b 455 keepAliveInterval = (connect_options.keepAliveInterval * 1000);
vpcola 0:a1734fe1ec4b 456
vpcola 0:a1734fe1ec4b 457 DBG("Login with: \r\n");
vpcola 0:a1734fe1ec4b 458 DBG("\tUsername: [%s]\r\n", connect_options.username.cstring);
vpcola 0:a1734fe1ec4b 459 DBG("\tPassword: [%s]\r\n", connect_options.password.cstring);
vpcola 0:a1734fe1ec4b 460
vpcola 0:a1734fe1ec4b 461 if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0)
vpcola 0:a1734fe1ec4b 462 {
vpcola 0:a1734fe1ec4b 463 DBG("Error serializing connect packet ...\r\n");
vpcola 0:a1734fe1ec4b 464 return rc;
vpcola 0:a1734fe1ec4b 465 }
vpcola 0:a1734fe1ec4b 466 if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet
vpcola 0:a1734fe1ec4b 467 {
vpcola 0:a1734fe1ec4b 468 DBG("Error sending the connect request packet ...\r\n");
vpcola 0:a1734fe1ec4b 469 return rc;
vpcola 0:a1734fe1ec4b 470 }
vpcola 0:a1734fe1ec4b 471
vpcola 0:a1734fe1ec4b 472 // Wait for the CONNACK
vpcola 0:a1734fe1ec4b 473 if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK)
vpcola 0:a1734fe1ec4b 474 {
vpcola 0:a1734fe1ec4b 475 unsigned char connack_rc = 255;
vpcola 0:a1734fe1ec4b 476 bool sessionPresent = false;
vpcola 0:a1734fe1ec4b 477 DBG("Connection acknowledgement received ... deserializing respones ...\r\n");
vpcola 0:a1734fe1ec4b 478 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 0:a1734fe1ec4b 479 rc = connack_rc;
vpcola 0:a1734fe1ec4b 480 else
vpcola 0:a1734fe1ec4b 481 rc = FAILURE;
vpcola 0:a1734fe1ec4b 482 }
vpcola 0:a1734fe1ec4b 483 else
vpcola 0:a1734fe1ec4b 484 rc = FAILURE;
vpcola 0:a1734fe1ec4b 485
vpcola 0:a1734fe1ec4b 486 if (rc == SUCCESS)
vpcola 0:a1734fe1ec4b 487 {
vpcola 0:a1734fe1ec4b 488 DBG("Connected!!! ... starting connection timers ...\r\n");
vpcola 0:a1734fe1ec4b 489 resetConnectionTimer();
vpcola 0:a1734fe1ec4b 490 }
vpcola 0:a1734fe1ec4b 491
vpcola 0:a1734fe1ec4b 492 DBG("Returning with rc = %d\r\n", rc);
vpcola 0:a1734fe1ec4b 493
vpcola 0:a1734fe1ec4b 494 return rc;
vpcola 0:a1734fe1ec4b 495 }
vpcola 0:a1734fe1ec4b 496
vpcola 0:a1734fe1ec4b 497
vpcola 0:a1734fe1ec4b 498 void MQTTThreadedClient::disconnect()
vpcola 0:a1734fe1ec4b 499 {
vpcola 0:a1734fe1ec4b 500 if (isConnected)
vpcola 0:a1734fe1ec4b 501 {
vpcola 0:a1734fe1ec4b 502 if( useTLS
vpcola 0:a1734fe1ec4b 503 && ( mbedtls_ssl_session_reset( &_ssl ) != 0 )
vpcola 0:a1734fe1ec4b 504 )
vpcola 0:a1734fe1ec4b 505 {
vpcola 0:a1734fe1ec4b 506 DBG( "Session reset returned an error \r\n");
vpcola 0:a1734fe1ec4b 507 }
vpcola 0:a1734fe1ec4b 508
vpcola 0:a1734fe1ec4b 509 isConnected = false;
vpcola 0:a1734fe1ec4b 510 tcpSocket->close();
vpcola 0:a1734fe1ec4b 511 }
vpcola 0:a1734fe1ec4b 512 }
vpcola 0:a1734fe1ec4b 513
vpcola 0:a1734fe1ec4b 514 int MQTTThreadedClient::connect()
vpcola 0:a1734fe1ec4b 515 {
vpcola 0:a1734fe1ec4b 516 int ret = FAILURE;
vpcola 0:a1734fe1ec4b 517
vpcola 0:a1734fe1ec4b 518 if ((network == NULL) || (tcpSocket == NULL)
vpcola 0:a1734fe1ec4b 519 || host.empty())
vpcola 0:a1734fe1ec4b 520 {
vpcola 0:a1734fe1ec4b 521 DBG("Network settings not set! \r\n");
vpcola 0:a1734fe1ec4b 522 return ret;
vpcola 0:a1734fe1ec4b 523 }
vpcola 0:a1734fe1ec4b 524
vpcola 0:a1734fe1ec4b 525 if (useTLS)
vpcola 0:a1734fe1ec4b 526 {
vpcola 0:a1734fe1ec4b 527 if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) {
vpcola 0:a1734fe1ec4b 528 mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret );
vpcola 0:a1734fe1ec4b 529 return ret;
vpcola 0:a1734fe1ec4b 530 }
vpcola 0:a1734fe1ec4b 531 #if defined(MBEDTLS_SSL_CLI_C)
vpcola 0:a1734fe1ec4b 532 if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) {
vpcola 0:a1734fe1ec4b 533 mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret );
vpcola 0:a1734fe1ec4b 534 return ret;
vpcola 0:a1734fe1ec4b 535 }
vpcola 0:a1734fe1ec4b 536 #endif
vpcola 0:a1734fe1ec4b 537 }
vpcola 0:a1734fe1ec4b 538
vpcola 0:a1734fe1ec4b 539 tcpSocket->open(network);
vpcola 0:a1734fe1ec4b 540 if (useTLS)
vpcola 0:a1734fe1ec4b 541 {
vpcola 0:a1734fe1ec4b 542 DBG("mbedtls_ssl_set_hostname ...\r\n");
vpcola 0:a1734fe1ec4b 543 mbedtls_ssl_set_hostname(&_ssl, host.c_str());
vpcola 0:a1734fe1ec4b 544 DBG("mbedtls_ssl_set_bio ...\r\n");
vpcola 0:a1734fe1ec4b 545 mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket),
vpcola 0:a1734fe1ec4b 546 ssl_send, ssl_recv, NULL );
vpcola 0:a1734fe1ec4b 547 }
vpcola 0:a1734fe1ec4b 548
vpcola 0:a1734fe1ec4b 549 if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 )
vpcola 0:a1734fe1ec4b 550 {
vpcola 0:a1734fe1ec4b 551 DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret);
vpcola 0:a1734fe1ec4b 552 return ret;
vpcola 0:a1734fe1ec4b 553 }else
vpcola 0:a1734fe1ec4b 554 isConnected = true;
vpcola 0:a1734fe1ec4b 555
vpcola 0:a1734fe1ec4b 556 if (useTLS)
vpcola 0:a1734fe1ec4b 557 {
vpcola 0:a1734fe1ec4b 558
vpcola 0:a1734fe1ec4b 559 if (doTLSHandshake() < 0)
vpcola 0:a1734fe1ec4b 560 {
vpcola 0:a1734fe1ec4b 561 DBG("TLS Handshake failed! \r\n");
vpcola 0:a1734fe1ec4b 562 return FAILURE;
vpcola 0:a1734fe1ec4b 563 }else
vpcola 0:a1734fe1ec4b 564 DBG("TLS Handshake complete!! \r\n");
vpcola 0:a1734fe1ec4b 565 }
vpcola 0:a1734fe1ec4b 566
vpcola 0:a1734fe1ec4b 567 return login();
vpcola 0:a1734fe1ec4b 568 }
vpcola 0:a1734fe1ec4b 569
vpcola 0:a1734fe1ec4b 570 void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options)
vpcola 0:a1734fe1ec4b 571 {
vpcola 0:a1734fe1ec4b 572 // Copy the settings for reconnection
vpcola 0:a1734fe1ec4b 573 host = chost;
vpcola 0:a1734fe1ec4b 574 port = cport;
vpcola 0:a1734fe1ec4b 575 connect_options = options;
vpcola 0:a1734fe1ec4b 576 }
vpcola 0:a1734fe1ec4b 577
vpcola 0:a1734fe1ec4b 578 int MQTTThreadedClient::publish(PubMessage& msg)
vpcola 0:a1734fe1ec4b 579 {
vpcola 0:a1734fe1ec4b 580 #if 0
vpcola 0:a1734fe1ec4b 581 int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message);
vpcola 0:a1734fe1ec4b 582 // TODO: handle id values when the function is called later
vpcola 0:a1734fe1ec4b 583 if (id == 0)
vpcola 0:a1734fe1ec4b 584 return FAILURE;
vpcola 0:a1734fe1ec4b 585 else
vpcola 0:a1734fe1ec4b 586 return SUCCESS;
vpcola 0:a1734fe1ec4b 587 #endif
vpcola 0:a1734fe1ec4b 588 PubMessage *message = mpool.alloc();
vpcola 0:a1734fe1ec4b 589 // Simple copy
vpcola 0:a1734fe1ec4b 590 *message = msg;
vpcola 0:a1734fe1ec4b 591
vpcola 0:a1734fe1ec4b 592 // Push the data to the thread
vpcola 0:a1734fe1ec4b 593 DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 594 mqueue.put(message);
vpcola 0:a1734fe1ec4b 595
vpcola 0:a1734fe1ec4b 596 return SUCCESS;
vpcola 0:a1734fe1ec4b 597 }
vpcola 0:a1734fe1ec4b 598
vpcola 0:a1734fe1ec4b 599 int MQTTThreadedClient::sendPublish(PubMessage& message)
vpcola 0:a1734fe1ec4b 600 {
vpcola 0:a1734fe1ec4b 601 MQTTString topicString = MQTTString_initializer;
vpcola 0:a1734fe1ec4b 602
vpcola 0:a1734fe1ec4b 603 if (!isConnected)
vpcola 0:a1734fe1ec4b 604 {
vpcola 0:a1734fe1ec4b 605 DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 606 return FAILURE;
vpcola 0:a1734fe1ec4b 607 }
vpcola 0:a1734fe1ec4b 608
vpcola 0:a1734fe1ec4b 609 topicString.cstring = (char*) &message.topic[0];
vpcola 0:a1734fe1ec4b 610 int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id,
vpcola 0:a1734fe1ec4b 611 topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen);
vpcola 0:a1734fe1ec4b 612 if (len <= 0)
vpcola 0:a1734fe1ec4b 613 {
vpcola 0:a1734fe1ec4b 614 DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 615 return FAILURE;
vpcola 0:a1734fe1ec4b 616 }
vpcola 0:a1734fe1ec4b 617
vpcola 0:a1734fe1ec4b 618 if (sendPacket(len) == SUCCESS)
vpcola 0:a1734fe1ec4b 619 {
vpcola 0:a1734fe1ec4b 620 DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 621 return SUCCESS;
vpcola 0:a1734fe1ec4b 622 }
vpcola 0:a1734fe1ec4b 623
vpcola 0:a1734fe1ec4b 624 DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 625 return FAILURE;
vpcola 0:a1734fe1ec4b 626 }
vpcola 0:a1734fe1ec4b 627
vpcola 0:a1734fe1ec4b 628 void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &))
vpcola 0:a1734fe1ec4b 629 {
vpcola 0:a1734fe1ec4b 630 // Push the subscription into the map ...
vpcola 0:a1734fe1ec4b 631 FP<void,MessageData &> fp;
vpcola 0:a1734fe1ec4b 632 fp.attach(function);
vpcola 0:a1734fe1ec4b 633
vpcola 0:a1734fe1ec4b 634 topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp));
vpcola 0:a1734fe1ec4b 635 }
vpcola 0:a1734fe1ec4b 636
vpcola 0:a1734fe1ec4b 637 int MQTTThreadedClient::processSubscriptions()
vpcola 0:a1734fe1ec4b 638 {
vpcola 0:a1734fe1ec4b 639 int numsubscribed = 0;
vpcola 0:a1734fe1ec4b 640
vpcola 0:a1734fe1ec4b 641 if (!isConnected)
vpcola 0:a1734fe1ec4b 642 {
vpcola 0:a1734fe1ec4b 643 DBG("Session not connected!!\r\n");
vpcola 0:a1734fe1ec4b 644 return 0;
vpcola 0:a1734fe1ec4b 645 }
vpcola 0:a1734fe1ec4b 646
vpcola 0:a1734fe1ec4b 647 DBG("Processing subscribed topics ....\r\n");
vpcola 0:a1734fe1ec4b 648
vpcola 0:a1734fe1ec4b 649 std::map<std::string, FP<void, MessageData &> >::iterator it;
vpcola 0:a1734fe1ec4b 650 for(it = topicCBMap.begin(); it != topicCBMap.end(); it++)
vpcola 0:a1734fe1ec4b 651 {
vpcola 0:a1734fe1ec4b 652 int rc = FAILURE;
vpcola 0:a1734fe1ec4b 653 int len = 0;
vpcola 0:a1734fe1ec4b 654 //TODO: We only subscribe to QoS = 0 for now
vpcola 0:a1734fe1ec4b 655 QoS qos = QOS0;
vpcola 0:a1734fe1ec4b 656
vpcola 0:a1734fe1ec4b 657 MQTTString topic = {(char*)it->first.c_str(), {0, 0}};
vpcola 0:a1734fe1ec4b 658 DBG("Subscribing to topic [%s]\r\n", topic.cstring);
vpcola 0:a1734fe1ec4b 659
vpcola 0:a1734fe1ec4b 660
vpcola 0:a1734fe1ec4b 661 len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos);
vpcola 0:a1734fe1ec4b 662 if (len <= 0) {
vpcola 0:a1734fe1ec4b 663 DBG("Error serializing subscribe packet ...\r\n");
vpcola 0:a1734fe1ec4b 664 continue;
vpcola 0:a1734fe1ec4b 665 }
vpcola 0:a1734fe1ec4b 666
vpcola 0:a1734fe1ec4b 667 if ((rc = sendPacket(len)) != SUCCESS) {
vpcola 0:a1734fe1ec4b 668 DBG("Error sending subscribe packet [%d]\r\n", rc);
vpcola 0:a1734fe1ec4b 669 continue;
vpcola 0:a1734fe1ec4b 670 }
vpcola 0:a1734fe1ec4b 671
vpcola 0:a1734fe1ec4b 672 DBG("Waiting for subscription ack ...\r\n");
vpcola 0:a1734fe1ec4b 673 // Wait for SUBACK, dropping packets read along the way ...
vpcola 0:a1734fe1ec4b 674 if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback
vpcola 0:a1734fe1ec4b 675 int count = 0, grantedQoS = -1;
vpcola 0:a1734fe1ec4b 676 unsigned short mypacketid;
vpcola 0:a1734fe1ec4b 677 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1)
vpcola 0:a1734fe1ec4b 678 rc = grantedQoS; // 0, 1, 2 or 0x80
vpcola 0:a1734fe1ec4b 679 // For as long as we do not get 0x80 ..
vpcola 0:a1734fe1ec4b 680 if (rc != 0x80)
vpcola 0:a1734fe1ec4b 681 {
vpcola 0:a1734fe1ec4b 682 // Reset connection timers here ...
vpcola 0:a1734fe1ec4b 683 resetConnectionTimer();
vpcola 0:a1734fe1ec4b 684 DBG("Successfully subscribed to %s ...\r\n", it->first.c_str());
vpcola 0:a1734fe1ec4b 685 numsubscribed++;
vpcola 0:a1734fe1ec4b 686 } else {
vpcola 0:a1734fe1ec4b 687 DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str());
vpcola 0:a1734fe1ec4b 688 }
vpcola 0:a1734fe1ec4b 689 } else
vpcola 0:a1734fe1ec4b 690 DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str());
vpcola 0:a1734fe1ec4b 691 } // end for loop
vpcola 0:a1734fe1ec4b 692
vpcola 0:a1734fe1ec4b 693 return numsubscribed;
vpcola 0:a1734fe1ec4b 694 }
vpcola 0:a1734fe1ec4b 695
vpcola 0:a1734fe1ec4b 696 bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName)
vpcola 0:a1734fe1ec4b 697 {
vpcola 0:a1734fe1ec4b 698 char* curf = topicFilter;
vpcola 0:a1734fe1ec4b 699 char* curn = topicName.lenstring.data;
vpcola 0:a1734fe1ec4b 700 char* curn_end = curn + topicName.lenstring.len;
vpcola 0:a1734fe1ec4b 701
vpcola 0:a1734fe1ec4b 702 while (*curf && curn < curn_end)
vpcola 0:a1734fe1ec4b 703 {
vpcola 0:a1734fe1ec4b 704 if (*curn == '/' && *curf != '/')
vpcola 0:a1734fe1ec4b 705 break;
vpcola 0:a1734fe1ec4b 706 if (*curf != '+' && *curf != '#' && *curf != *curn)
vpcola 0:a1734fe1ec4b 707 break;
vpcola 0:a1734fe1ec4b 708 if (*curf == '+')
vpcola 0:a1734fe1ec4b 709 { // skip until we meet the next separator, or end of string
vpcola 0:a1734fe1ec4b 710 char* nextpos = curn + 1;
vpcola 0:a1734fe1ec4b 711 while (nextpos < curn_end && *nextpos != '/')
vpcola 0:a1734fe1ec4b 712 nextpos = ++curn + 1;
vpcola 0:a1734fe1ec4b 713 }
vpcola 0:a1734fe1ec4b 714 else if (*curf == '#')
vpcola 0:a1734fe1ec4b 715 curn = curn_end - 1; // skip until end of string
vpcola 0:a1734fe1ec4b 716 curf++;
vpcola 0:a1734fe1ec4b 717 curn++;
vpcola 0:a1734fe1ec4b 718 };
vpcola 0:a1734fe1ec4b 719
vpcola 0:a1734fe1ec4b 720 return (curn == curn_end) && (*curf == '\0');
vpcola 0:a1734fe1ec4b 721 }
vpcola 0:a1734fe1ec4b 722
vpcola 0:a1734fe1ec4b 723 int MQTTThreadedClient::handlePublishMsg()
vpcola 0:a1734fe1ec4b 724 {
vpcola 0:a1734fe1ec4b 725 MQTTString topicName = MQTTString_initializer;
vpcola 0:a1734fe1ec4b 726 Message msg;
vpcola 0:a1734fe1ec4b 727 int intQoS;
vpcola 0:a1734fe1ec4b 728 DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 729 if (MQTTDeserialize_publish((unsigned char*)&msg.dup,
vpcola 0:a1734fe1ec4b 730 &intQoS,
vpcola 0:a1734fe1ec4b 731 (unsigned char*)&msg.retained,
vpcola 0:a1734fe1ec4b 732 (unsigned short*)&msg.id,
vpcola 0:a1734fe1ec4b 733 &topicName,
vpcola 0:a1734fe1ec4b 734 (unsigned char**)&msg.payload,
vpcola 0:a1734fe1ec4b 735 (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1)
vpcola 0:a1734fe1ec4b 736 {
vpcola 0:a1734fe1ec4b 737 DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 738 return -1;
vpcola 0:a1734fe1ec4b 739 }
vpcola 0:a1734fe1ec4b 740
vpcola 0:a1734fe1ec4b 741 std::string topic;
vpcola 0:a1734fe1ec4b 742 if (topicName.lenstring.len > 0)
vpcola 0:a1734fe1ec4b 743 {
vpcola 0:a1734fe1ec4b 744 topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len);
vpcola 0:a1734fe1ec4b 745 }else
vpcola 0:a1734fe1ec4b 746 topic = (const char *) topicName.cstring;
vpcola 0:a1734fe1ec4b 747
vpcola 0:a1734fe1ec4b 748 DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS);
vpcola 0:a1734fe1ec4b 749
vpcola 0:a1734fe1ec4b 750 msg.qos = (QoS) intQoS;
vpcola 0:a1734fe1ec4b 751
vpcola 0:a1734fe1ec4b 752
vpcola 0:a1734fe1ec4b 753 // Call the handlers for each topic
vpcola 0:a1734fe1ec4b 754 if (topicCBMap.find(topic) != topicCBMap.end())
vpcola 0:a1734fe1ec4b 755 {
vpcola 0:a1734fe1ec4b 756 // Call the callback function
vpcola 0:a1734fe1ec4b 757 if (topicCBMap[topic].attached())
vpcola 0:a1734fe1ec4b 758 {
vpcola 0:a1734fe1ec4b 759 DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 760 MessageData md(topicName, msg);
vpcola 0:a1734fe1ec4b 761 topicCBMap[topic](md);
vpcola 0:a1734fe1ec4b 762
vpcola 0:a1734fe1ec4b 763 return 1;
vpcola 0:a1734fe1ec4b 764 }
vpcola 0:a1734fe1ec4b 765 }
vpcola 0:a1734fe1ec4b 766
vpcola 0:a1734fe1ec4b 767 // TODO: depending on the QoS
vpcola 0:a1734fe1ec4b 768 // we send data to the server = PUBACK or PUBREC
vpcola 0:a1734fe1ec4b 769 switch(intQoS)
vpcola 0:a1734fe1ec4b 770 {
vpcola 0:a1734fe1ec4b 771 case QOS0:
vpcola 0:a1734fe1ec4b 772 // We send back nothing ...
vpcola 0:a1734fe1ec4b 773 break;
vpcola 0:a1734fe1ec4b 774 case QOS1:
vpcola 0:a1734fe1ec4b 775 // TODO: implement
vpcola 0:a1734fe1ec4b 776 break;
vpcola 0:a1734fe1ec4b 777 case QOS2:
vpcola 0:a1734fe1ec4b 778 // TODO: implement
vpcola 0:a1734fe1ec4b 779 break;
vpcola 0:a1734fe1ec4b 780 default:
vpcola 0:a1734fe1ec4b 781 break;
vpcola 0:a1734fe1ec4b 782 }
vpcola 0:a1734fe1ec4b 783
vpcola 0:a1734fe1ec4b 784 return 0;
vpcola 0:a1734fe1ec4b 785 }
vpcola 0:a1734fe1ec4b 786
vpcola 0:a1734fe1ec4b 787 void MQTTThreadedClient::resetConnectionTimer()
vpcola 0:a1734fe1ec4b 788 {
vpcola 0:a1734fe1ec4b 789 if (keepAliveInterval > 0)
vpcola 0:a1734fe1ec4b 790 {
vpcola 0:a1734fe1ec4b 791 comTimer.reset();
vpcola 0:a1734fe1ec4b 792 comTimer.start();
vpcola 0:a1734fe1ec4b 793 }
vpcola 0:a1734fe1ec4b 794 }
vpcola 0:a1734fe1ec4b 795
vpcola 0:a1734fe1ec4b 796 bool MQTTThreadedClient::hasConnectionTimedOut()
vpcola 0:a1734fe1ec4b 797 {
vpcola 0:a1734fe1ec4b 798 if (keepAliveInterval > 0 ) {
vpcola 0:a1734fe1ec4b 799 // Check connection timer
vpcola 0:a1734fe1ec4b 800 if (comTimer.read_ms() > keepAliveInterval)
vpcola 0:a1734fe1ec4b 801 return true;
vpcola 0:a1734fe1ec4b 802 else
vpcola 0:a1734fe1ec4b 803 return false;
vpcola 0:a1734fe1ec4b 804 }
vpcola 0:a1734fe1ec4b 805
vpcola 0:a1734fe1ec4b 806 return false;
vpcola 0:a1734fe1ec4b 807 }
vpcola 0:a1734fe1ec4b 808
vpcola 0:a1734fe1ec4b 809 void MQTTThreadedClient::sendPingRequest()
vpcola 0:a1734fe1ec4b 810 {
vpcola 0:a1734fe1ec4b 811 int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE);
vpcola 0:a1734fe1ec4b 812 if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet
vpcola 0:a1734fe1ec4b 813 {
vpcola 0:a1734fe1ec4b 814 printf("MQTT Ping request sent successfully ...\r\n");
vpcola 0:a1734fe1ec4b 815 }
vpcola 0:a1734fe1ec4b 816 }
vpcola 0:a1734fe1ec4b 817
vpcola 0:a1734fe1ec4b 818 void MQTTThreadedClient::startListener()
vpcola 0:a1734fe1ec4b 819 {
vpcola 0:a1734fe1ec4b 820 int pType;
vpcola 0:a1734fe1ec4b 821 int numsubs;
vpcola 0:a1734fe1ec4b 822 // Continuesly listens for packets and dispatch
vpcola 0:a1734fe1ec4b 823 // message handlers ...
vpcola 0:a1734fe1ec4b 824 if (useTLS)
vpcola 0:a1734fe1ec4b 825 {
vpcola 0:a1734fe1ec4b 826 initTLS();
vpcola 0:a1734fe1ec4b 827 }
vpcola 0:a1734fe1ec4b 828
vpcola 0:a1734fe1ec4b 829 while(true)
vpcola 0:a1734fe1ec4b 830 {
vpcola 0:a1734fe1ec4b 831
vpcola 0:a1734fe1ec4b 832 // Attempt to reconnect and login
vpcola 0:a1734fe1ec4b 833 if ( connect() < 0 )
vpcola 0:a1734fe1ec4b 834 {
vpcola 0:a1734fe1ec4b 835 disconnect();
vpcola 0:a1734fe1ec4b 836 // Wait for a few secs and reconnect ...
vpcola 0:a1734fe1ec4b 837 Thread::wait(6000);
vpcola 0:a1734fe1ec4b 838 continue;
vpcola 0:a1734fe1ec4b 839 }
vpcola 0:a1734fe1ec4b 840
vpcola 0:a1734fe1ec4b 841 numsubs = processSubscriptions();
vpcola 0:a1734fe1ec4b 842 DBG("Subscribed %d topics ...\r\n", numsubs);
vpcola 0:a1734fe1ec4b 843
vpcola 0:a1734fe1ec4b 844 // loop read
vpcola 0:a1734fe1ec4b 845 while(true)
vpcola 0:a1734fe1ec4b 846 {
vpcola 0:a1734fe1ec4b 847 pType = readPacket();
vpcola 0:a1734fe1ec4b 848 switch(pType)
vpcola 0:a1734fe1ec4b 849 {
vpcola 0:a1734fe1ec4b 850 case TIMEOUT:
vpcola 0:a1734fe1ec4b 851 // No data available from the network ...
vpcola 0:a1734fe1ec4b 852 break;
vpcola 0:a1734fe1ec4b 853 case FAILURE:
vpcola 0:a1734fe1ec4b 854 {
vpcola 0:a1734fe1ec4b 855 DBG("readPacket returned failure \r\n");
vpcola 0:a1734fe1ec4b 856 goto reconnect;
vpcola 0:a1734fe1ec4b 857 }
vpcola 0:a1734fe1ec4b 858 case BUFFER_OVERFLOW:
vpcola 0:a1734fe1ec4b 859 {
vpcola 0:a1734fe1ec4b 860 // TODO: Network error, do we disconnect and reconnect?
vpcola 0:a1734fe1ec4b 861 DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 862 MBED_ASSERT(false);
vpcola 0:a1734fe1ec4b 863 }
vpcola 0:a1734fe1ec4b 864 break;
vpcola 0:a1734fe1ec4b 865 /**
vpcola 0:a1734fe1ec4b 866 * The rest of the return codes below (all positive) is about MQTT
vpcola 0:a1734fe1ec4b 867 * response codes
vpcola 0:a1734fe1ec4b 868 **/
vpcola 0:a1734fe1ec4b 869 case CONNACK:
vpcola 0:a1734fe1ec4b 870 case PUBACK:
vpcola 0:a1734fe1ec4b 871 case SUBACK:
vpcola 0:a1734fe1ec4b 872 break;
vpcola 0:a1734fe1ec4b 873 case PUBLISH:
vpcola 0:a1734fe1ec4b 874 {
vpcola 0:a1734fe1ec4b 875 DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 876 // We receive data from the MQTT server ..
vpcola 0:a1734fe1ec4b 877 if (handlePublishMsg() < 0) {
vpcola 0:a1734fe1ec4b 878 DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 879 break;
vpcola 0:a1734fe1ec4b 880 }
vpcola 0:a1734fe1ec4b 881 }
vpcola 0:a1734fe1ec4b 882 break;
vpcola 0:a1734fe1ec4b 883 case PINGRESP:
vpcola 0:a1734fe1ec4b 884 {
vpcola 0:a1734fe1ec4b 885 printf("MQTT Got ping response ...\r\n");
vpcola 0:a1734fe1ec4b 886 resetConnectionTimer();
vpcola 0:a1734fe1ec4b 887 }
vpcola 0:a1734fe1ec4b 888 break;
vpcola 0:a1734fe1ec4b 889 default:
vpcola 0:a1734fe1ec4b 890 DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType);
vpcola 0:a1734fe1ec4b 891 }
vpcola 0:a1734fe1ec4b 892
vpcola 0:a1734fe1ec4b 893 // Check if its time to send a keepAlive packet
vpcola 0:a1734fe1ec4b 894 if (hasConnectionTimedOut()) {
vpcola 0:a1734fe1ec4b 895 // Queue the ping request so that other
vpcola 0:a1734fe1ec4b 896 // pending operations queued above will go first
vpcola 0:a1734fe1ec4b 897 queue.call(this, &MQTTThreadedClient::sendPingRequest);
vpcola 0:a1734fe1ec4b 898 }
vpcola 0:a1734fe1ec4b 899
vpcola 0:a1734fe1ec4b 900 // Check if we have messages on the message queue
vpcola 0:a1734fe1ec4b 901 osEvent evt = mqueue.get(10);
vpcola 0:a1734fe1ec4b 902 if (evt.status == osEventMessage) {
vpcola 0:a1734fe1ec4b 903
vpcola 0:a1734fe1ec4b 904 DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid());
vpcola 0:a1734fe1ec4b 905
vpcola 0:a1734fe1ec4b 906 // Unpack the message
vpcola 0:a1734fe1ec4b 907 PubMessage * message = (PubMessage *)evt.value.p;
vpcola 0:a1734fe1ec4b 908
vpcola 0:a1734fe1ec4b 909 // Send the packet, do not queue the call
vpcola 0:a1734fe1ec4b 910 // like the ping above ..
vpcola 0:a1734fe1ec4b 911 if ( sendPublish(*message) == SUCCESS) {
vpcola 0:a1734fe1ec4b 912 // Reset timers if we have been able to send successfully
vpcola 0:a1734fe1ec4b 913 resetConnectionTimer();
vpcola 0:a1734fe1ec4b 914 } else {
vpcola 0:a1734fe1ec4b 915 // Disconnected?
vpcola 0:a1734fe1ec4b 916 goto reconnect;
vpcola 0:a1734fe1ec4b 917 }
vpcola 0:a1734fe1ec4b 918
vpcola 0:a1734fe1ec4b 919 // Free the message from mempool after using
vpcola 0:a1734fe1ec4b 920 mpool.free(message);
vpcola 0:a1734fe1ec4b 921 }
vpcola 0:a1734fe1ec4b 922
vpcola 0:a1734fe1ec4b 923 // Dispatch any queued events ...
vpcola 0:a1734fe1ec4b 924 queue.dispatch(100);
vpcola 0:a1734fe1ec4b 925 } // end while loop
vpcola 0:a1734fe1ec4b 926
vpcola 0:a1734fe1ec4b 927 reconnect:
vpcola 0:a1734fe1ec4b 928 // reconnect?
vpcola 0:a1734fe1ec4b 929 DBG("Client disconnected!! ... retrying ...\r\n");
vpcola 0:a1734fe1ec4b 930 disconnect();
vpcola 0:a1734fe1ec4b 931
vpcola 0:a1734fe1ec4b 932 };
vpcola 0:a1734fe1ec4b 933 }
vpcola 0:a1734fe1ec4b 934
vpcola 0:a1734fe1ec4b 935 void MQTTThreadedClient::stopListener()
vpcola 0:a1734fe1ec4b 936 {
vpcola 0:a1734fe1ec4b 937 // TODO: Set a signal/flag that the running thread
vpcola 0:a1734fe1ec4b 938 // will check if its ok to stop ...
vpcola 0:a1734fe1ec4b 939 }
vpcola 0:a1734fe1ec4b 940
vpcola 0:a1734fe1ec4b 941 }