Vergil Cola
/
MQTTGateway2
Fork of my original MQTTGateway
MQTTSManager/MQTTThreadedClient.cpp@0:a1734fe1ec4b, 2017-04-08 (annotated)
- Committer:
- vpcola
- Date:
- Sat Apr 08 14:43:14 2017 +0000
- Revision:
- 0:a1734fe1ec4b
Initial commit
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
vpcola | 0:a1734fe1ec4b | 1 | #include "mbed.h" |
vpcola | 0:a1734fe1ec4b | 2 | #include "rtos.h" |
vpcola | 0:a1734fe1ec4b | 3 | #include "MQTTThreadedClient.h" |
vpcola | 0:a1734fe1ec4b | 4 | #include "mbedtls/platform.h" |
vpcola | 0:a1734fe1ec4b | 5 | #include "mbedtls/ssl.h" |
vpcola | 0:a1734fe1ec4b | 6 | #include "mbedtls/entropy.h" |
vpcola | 0:a1734fe1ec4b | 7 | #include "mbedtls/ctr_drbg.h" |
vpcola | 0:a1734fe1ec4b | 8 | #include "mbedtls/error.h" |
vpcola | 0:a1734fe1ec4b | 9 | |
vpcola | 0:a1734fe1ec4b | 10 | #ifdef DEBUG |
vpcola | 0:a1734fe1ec4b | 11 | #define DBG(fmt, args...) printf(fmt, ## args) |
vpcola | 0:a1734fe1ec4b | 12 | #else |
vpcola | 0:a1734fe1ec4b | 13 | #define DBG(fmt, args...) /* Don't do anything in release builds */ |
vpcola | 0:a1734fe1ec4b | 14 | #endif |
vpcola | 0:a1734fe1ec4b | 15 | |
vpcola | 0:a1734fe1ec4b | 16 | static MemoryPool<MQTT::PubMessage, 4> mpool; |
vpcola | 0:a1734fe1ec4b | 17 | static Queue<MQTT::PubMessage, 4> mqueue; |
vpcola | 0:a1734fe1ec4b | 18 | |
vpcola | 0:a1734fe1ec4b | 19 | // SSL/TLS variables |
vpcola | 0:a1734fe1ec4b | 20 | mbedtls_entropy_context _entropy; |
vpcola | 0:a1734fe1ec4b | 21 | mbedtls_ctr_drbg_context _ctr_drbg; |
vpcola | 0:a1734fe1ec4b | 22 | mbedtls_x509_crt _cacert; |
vpcola | 0:a1734fe1ec4b | 23 | mbedtls_ssl_context _ssl; |
vpcola | 0:a1734fe1ec4b | 24 | mbedtls_ssl_config _ssl_conf; |
vpcola | 0:a1734fe1ec4b | 25 | mbedtls_ssl_session saved_session; |
vpcola | 0:a1734fe1ec4b | 26 | |
vpcola | 0:a1734fe1ec4b | 27 | namespace MQTT { |
vpcola | 0:a1734fe1ec4b | 28 | |
vpcola | 0:a1734fe1ec4b | 29 | /** |
vpcola | 0:a1734fe1ec4b | 30 | * Receive callback for mbed TLS |
vpcola | 0:a1734fe1ec4b | 31 | */ |
vpcola | 0:a1734fe1ec4b | 32 | static int ssl_recv(void *ctx, unsigned char *buf, size_t len) |
vpcola | 0:a1734fe1ec4b | 33 | { |
vpcola | 0:a1734fe1ec4b | 34 | int recv = -1; |
vpcola | 0:a1734fe1ec4b | 35 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 0:a1734fe1ec4b | 36 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 0:a1734fe1ec4b | 37 | recv = socket->recv(buf, len); |
vpcola | 0:a1734fe1ec4b | 38 | |
vpcola | 0:a1734fe1ec4b | 39 | if (NSAPI_ERROR_WOULD_BLOCK == recv) { |
vpcola | 0:a1734fe1ec4b | 40 | return MBEDTLS_ERR_SSL_WANT_READ; |
vpcola | 0:a1734fe1ec4b | 41 | } else if (recv < 0) { |
vpcola | 0:a1734fe1ec4b | 42 | return -1; |
vpcola | 0:a1734fe1ec4b | 43 | } else { |
vpcola | 0:a1734fe1ec4b | 44 | return recv; |
vpcola | 0:a1734fe1ec4b | 45 | } |
vpcola | 0:a1734fe1ec4b | 46 | } |
vpcola | 0:a1734fe1ec4b | 47 | |
vpcola | 0:a1734fe1ec4b | 48 | /** |
vpcola | 0:a1734fe1ec4b | 49 | * Send callback for mbed TLS |
vpcola | 0:a1734fe1ec4b | 50 | */ |
vpcola | 0:a1734fe1ec4b | 51 | static int ssl_send(void *ctx, const unsigned char *buf, size_t len) |
vpcola | 0:a1734fe1ec4b | 52 | { |
vpcola | 0:a1734fe1ec4b | 53 | int sent = -1; |
vpcola | 0:a1734fe1ec4b | 54 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 0:a1734fe1ec4b | 55 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 0:a1734fe1ec4b | 56 | sent = socket->send(buf, len); |
vpcola | 0:a1734fe1ec4b | 57 | |
vpcola | 0:a1734fe1ec4b | 58 | if(NSAPI_ERROR_WOULD_BLOCK == sent) { |
vpcola | 0:a1734fe1ec4b | 59 | return MBEDTLS_ERR_SSL_WANT_WRITE; |
vpcola | 0:a1734fe1ec4b | 60 | } else if (sent < 0) { |
vpcola | 0:a1734fe1ec4b | 61 | return -1; |
vpcola | 0:a1734fe1ec4b | 62 | } else { |
vpcola | 0:a1734fe1ec4b | 63 | return sent; |
vpcola | 0:a1734fe1ec4b | 64 | } |
vpcola | 0:a1734fe1ec4b | 65 | } |
vpcola | 0:a1734fe1ec4b | 66 | |
vpcola | 0:a1734fe1ec4b | 67 | #if DEBUG_LEVEL > 0 |
vpcola | 0:a1734fe1ec4b | 68 | /** |
vpcola | 0:a1734fe1ec4b | 69 | * Debug callback for mbed TLS |
vpcola | 0:a1734fe1ec4b | 70 | * Just prints on the USB serial port |
vpcola | 0:a1734fe1ec4b | 71 | */ |
vpcola | 0:a1734fe1ec4b | 72 | static void my_debug(void *ctx, int level, const char *file, int line, |
vpcola | 0:a1734fe1ec4b | 73 | const char *str) |
vpcola | 0:a1734fe1ec4b | 74 | { |
vpcola | 0:a1734fe1ec4b | 75 | const char *p, *basename; |
vpcola | 0:a1734fe1ec4b | 76 | (void) ctx; |
vpcola | 0:a1734fe1ec4b | 77 | |
vpcola | 0:a1734fe1ec4b | 78 | /* Extract basename from file */ |
vpcola | 0:a1734fe1ec4b | 79 | for(p = basename = file; *p != '\0'; p++) { |
vpcola | 0:a1734fe1ec4b | 80 | if(*p == '/' || *p == '\\') { |
vpcola | 0:a1734fe1ec4b | 81 | basename = p + 1; |
vpcola | 0:a1734fe1ec4b | 82 | } |
vpcola | 0:a1734fe1ec4b | 83 | } |
vpcola | 0:a1734fe1ec4b | 84 | |
vpcola | 0:a1734fe1ec4b | 85 | if (_debug) { |
vpcola | 0:a1734fe1ec4b | 86 | mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); |
vpcola | 0:a1734fe1ec4b | 87 | } |
vpcola | 0:a1734fe1ec4b | 88 | } |
vpcola | 0:a1734fe1ec4b | 89 | |
vpcola | 0:a1734fe1ec4b | 90 | /** |
vpcola | 0:a1734fe1ec4b | 91 | * Certificate verification callback for mbed TLS |
vpcola | 0:a1734fe1ec4b | 92 | * Here we only use it to display information on each cert in the chain |
vpcola | 0:a1734fe1ec4b | 93 | */ |
vpcola | 0:a1734fe1ec4b | 94 | static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) |
vpcola | 0:a1734fe1ec4b | 95 | { |
vpcola | 0:a1734fe1ec4b | 96 | const uint32_t buf_size = 1024; |
vpcola | 0:a1734fe1ec4b | 97 | char *buf = new char[buf_size]; |
vpcola | 0:a1734fe1ec4b | 98 | (void) data; |
vpcola | 0:a1734fe1ec4b | 99 | |
vpcola | 0:a1734fe1ec4b | 100 | if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); |
vpcola | 0:a1734fe1ec4b | 101 | mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); |
vpcola | 0:a1734fe1ec4b | 102 | if (_debug) mbedtls_printf("%s", buf); |
vpcola | 0:a1734fe1ec4b | 103 | |
vpcola | 0:a1734fe1ec4b | 104 | if (*flags == 0) |
vpcola | 0:a1734fe1ec4b | 105 | if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); |
vpcola | 0:a1734fe1ec4b | 106 | else { |
vpcola | 0:a1734fe1ec4b | 107 | mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); |
vpcola | 0:a1734fe1ec4b | 108 | if (_debug) mbedtls_printf("%s\n", buf); |
vpcola | 0:a1734fe1ec4b | 109 | } |
vpcola | 0:a1734fe1ec4b | 110 | |
vpcola | 0:a1734fe1ec4b | 111 | delete[] buf; |
vpcola | 0:a1734fe1ec4b | 112 | return 0; |
vpcola | 0:a1734fe1ec4b | 113 | } |
vpcola | 0:a1734fe1ec4b | 114 | #endif |
vpcola | 0:a1734fe1ec4b | 115 | |
vpcola | 0:a1734fe1ec4b | 116 | |
vpcola | 0:a1734fe1ec4b | 117 | void MQTTThreadedClient::setupTLS() |
vpcola | 0:a1734fe1ec4b | 118 | { |
vpcola | 0:a1734fe1ec4b | 119 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 120 | { |
vpcola | 0:a1734fe1ec4b | 121 | mbedtls_entropy_init(&_entropy); |
vpcola | 0:a1734fe1ec4b | 122 | mbedtls_ctr_drbg_init(&_ctr_drbg); |
vpcola | 0:a1734fe1ec4b | 123 | mbedtls_x509_crt_init(&_cacert); |
vpcola | 0:a1734fe1ec4b | 124 | mbedtls_ssl_init(&_ssl); |
vpcola | 0:a1734fe1ec4b | 125 | mbedtls_ssl_config_init(&_ssl_conf); |
vpcola | 0:a1734fe1ec4b | 126 | memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); |
vpcola | 0:a1734fe1ec4b | 127 | } |
vpcola | 0:a1734fe1ec4b | 128 | } |
vpcola | 0:a1734fe1ec4b | 129 | |
vpcola | 0:a1734fe1ec4b | 130 | void MQTTThreadedClient::freeTLS() |
vpcola | 0:a1734fe1ec4b | 131 | { |
vpcola | 0:a1734fe1ec4b | 132 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 133 | { |
vpcola | 0:a1734fe1ec4b | 134 | mbedtls_entropy_free(&_entropy); |
vpcola | 0:a1734fe1ec4b | 135 | mbedtls_ctr_drbg_free(&_ctr_drbg); |
vpcola | 0:a1734fe1ec4b | 136 | mbedtls_x509_crt_free(&_cacert); |
vpcola | 0:a1734fe1ec4b | 137 | mbedtls_ssl_free(&_ssl); |
vpcola | 0:a1734fe1ec4b | 138 | mbedtls_ssl_config_free(&_ssl_conf); |
vpcola | 0:a1734fe1ec4b | 139 | } |
vpcola | 0:a1734fe1ec4b | 140 | } |
vpcola | 0:a1734fe1ec4b | 141 | |
vpcola | 0:a1734fe1ec4b | 142 | int MQTTThreadedClient::initTLS() |
vpcola | 0:a1734fe1ec4b | 143 | { |
vpcola | 0:a1734fe1ec4b | 144 | int ret; |
vpcola | 0:a1734fe1ec4b | 145 | |
vpcola | 0:a1734fe1ec4b | 146 | DBG("Initializing TLS ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 147 | DBG("mbedtls_ctr_drdbg_seed ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 148 | if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, |
vpcola | 0:a1734fe1ec4b | 149 | (const unsigned char *) DRBG_PERS, |
vpcola | 0:a1734fe1ec4b | 150 | sizeof (DRBG_PERS))) != 0) { |
vpcola | 0:a1734fe1ec4b | 151 | mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); |
vpcola | 0:a1734fe1ec4b | 152 | _error = ret; |
vpcola | 0:a1734fe1ec4b | 153 | return -1; |
vpcola | 0:a1734fe1ec4b | 154 | } |
vpcola | 0:a1734fe1ec4b | 155 | DBG("mbedtls_x509_crt_parse ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 156 | if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, |
vpcola | 0:a1734fe1ec4b | 157 | strlen(ssl_ca_pem) + 1)) != 0) { |
vpcola | 0:a1734fe1ec4b | 158 | mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); |
vpcola | 0:a1734fe1ec4b | 159 | _error = ret; |
vpcola | 0:a1734fe1ec4b | 160 | return -1; |
vpcola | 0:a1734fe1ec4b | 161 | } |
vpcola | 0:a1734fe1ec4b | 162 | |
vpcola | 0:a1734fe1ec4b | 163 | DBG("mbedtls_ssl_config_defaults ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 164 | if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, |
vpcola | 0:a1734fe1ec4b | 165 | MBEDTLS_SSL_IS_CLIENT, |
vpcola | 0:a1734fe1ec4b | 166 | MBEDTLS_SSL_TRANSPORT_STREAM, |
vpcola | 0:a1734fe1ec4b | 167 | MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { |
vpcola | 0:a1734fe1ec4b | 168 | mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); |
vpcola | 0:a1734fe1ec4b | 169 | _error = ret; |
vpcola | 0:a1734fe1ec4b | 170 | return -1; |
vpcola | 0:a1734fe1ec4b | 171 | } |
vpcola | 0:a1734fe1ec4b | 172 | |
vpcola | 0:a1734fe1ec4b | 173 | DBG("mbedtls_ssl_config_ca_chain ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 174 | mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); |
vpcola | 0:a1734fe1ec4b | 175 | DBG("mbedtls_ssl_conf_rng ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 176 | mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); |
vpcola | 0:a1734fe1ec4b | 177 | |
vpcola | 0:a1734fe1ec4b | 178 | /* It is possible to disable authentication by passing |
vpcola | 0:a1734fe1ec4b | 179 | * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() |
vpcola | 0:a1734fe1ec4b | 180 | */ |
vpcola | 0:a1734fe1ec4b | 181 | DBG("mbedtls_ssl_conf_authmode ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 182 | mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); |
vpcola | 0:a1734fe1ec4b | 183 | |
vpcola | 0:a1734fe1ec4b | 184 | #if DEBUG_LEVEL > 0 |
vpcola | 0:a1734fe1ec4b | 185 | mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); |
vpcola | 0:a1734fe1ec4b | 186 | mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); |
vpcola | 0:a1734fe1ec4b | 187 | mbedtls_debug_set_threshold(DEBUG_LEVEL); |
vpcola | 0:a1734fe1ec4b | 188 | #endif |
vpcola | 0:a1734fe1ec4b | 189 | |
vpcola | 0:a1734fe1ec4b | 190 | DBG("mbedtls_ssl_setup ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 191 | if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { |
vpcola | 0:a1734fe1ec4b | 192 | mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); |
vpcola | 0:a1734fe1ec4b | 193 | _error = ret; |
vpcola | 0:a1734fe1ec4b | 194 | return -1; |
vpcola | 0:a1734fe1ec4b | 195 | } |
vpcola | 0:a1734fe1ec4b | 196 | |
vpcola | 0:a1734fe1ec4b | 197 | return 0; |
vpcola | 0:a1734fe1ec4b | 198 | } |
vpcola | 0:a1734fe1ec4b | 199 | |
vpcola | 0:a1734fe1ec4b | 200 | int MQTTThreadedClient::doTLSHandshake() |
vpcola | 0:a1734fe1ec4b | 201 | { |
vpcola | 0:a1734fe1ec4b | 202 | int ret; |
vpcola | 0:a1734fe1ec4b | 203 | |
vpcola | 0:a1734fe1ec4b | 204 | /* Start the handshake, the rest will be done in onReceive() */ |
vpcola | 0:a1734fe1ec4b | 205 | DBG("Starting the TLS handshake...\r\n"); |
vpcola | 0:a1734fe1ec4b | 206 | ret = mbedtls_ssl_handshake(&_ssl); |
vpcola | 0:a1734fe1ec4b | 207 | if (ret < 0) |
vpcola | 0:a1734fe1ec4b | 208 | { |
vpcola | 0:a1734fe1ec4b | 209 | if (ret != MBEDTLS_ERR_SSL_WANT_READ && |
vpcola | 0:a1734fe1ec4b | 210 | ret != MBEDTLS_ERR_SSL_WANT_WRITE) |
vpcola | 0:a1734fe1ec4b | 211 | mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); |
vpcola | 0:a1734fe1ec4b | 212 | else |
vpcola | 0:a1734fe1ec4b | 213 | { |
vpcola | 0:a1734fe1ec4b | 214 | // do not close the socket if timed out |
vpcola | 0:a1734fe1ec4b | 215 | ret = TIMEOUT; |
vpcola | 0:a1734fe1ec4b | 216 | } |
vpcola | 0:a1734fe1ec4b | 217 | return ret; |
vpcola | 0:a1734fe1ec4b | 218 | } |
vpcola | 0:a1734fe1ec4b | 219 | |
vpcola | 0:a1734fe1ec4b | 220 | /* Handshake done, time to print info */ |
vpcola | 0:a1734fe1ec4b | 221 | DBG("TLS connection to %s:%d established\r\n", |
vpcola | 0:a1734fe1ec4b | 222 | host.c_str(), port); |
vpcola | 0:a1734fe1ec4b | 223 | |
vpcola | 0:a1734fe1ec4b | 224 | const uint32_t buf_size = 1024; |
vpcola | 0:a1734fe1ec4b | 225 | char *buf = new char[buf_size]; |
vpcola | 0:a1734fe1ec4b | 226 | mbedtls_x509_crt_info(buf, buf_size, "\r ", |
vpcola | 0:a1734fe1ec4b | 227 | mbedtls_ssl_get_peer_cert(&_ssl)); |
vpcola | 0:a1734fe1ec4b | 228 | |
vpcola | 0:a1734fe1ec4b | 229 | DBG("Server certificate:\r\n%s\r", buf); |
vpcola | 0:a1734fe1ec4b | 230 | // Verify server cert ... |
vpcola | 0:a1734fe1ec4b | 231 | uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); |
vpcola | 0:a1734fe1ec4b | 232 | if( flags != 0 ) |
vpcola | 0:a1734fe1ec4b | 233 | { |
vpcola | 0:a1734fe1ec4b | 234 | mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); |
vpcola | 0:a1734fe1ec4b | 235 | DBG("Certificate verification failed:\r\n%s\r\r\n", buf); |
vpcola | 0:a1734fe1ec4b | 236 | // free server cert ... before error return |
vpcola | 0:a1734fe1ec4b | 237 | delete [] buf; |
vpcola | 0:a1734fe1ec4b | 238 | return -1; |
vpcola | 0:a1734fe1ec4b | 239 | } |
vpcola | 0:a1734fe1ec4b | 240 | |
vpcola | 0:a1734fe1ec4b | 241 | DBG("Certificate verification passed\r\n\r\n"); |
vpcola | 0:a1734fe1ec4b | 242 | // delete server cert after verification |
vpcola | 0:a1734fe1ec4b | 243 | delete [] buf; |
vpcola | 0:a1734fe1ec4b | 244 | |
vpcola | 0:a1734fe1ec4b | 245 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola | 0:a1734fe1ec4b | 246 | // TODO: Save the session here for reconnect. |
vpcola | 0:a1734fe1ec4b | 247 | if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) |
vpcola | 0:a1734fe1ec4b | 248 | { |
vpcola | 0:a1734fe1ec4b | 249 | mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); |
vpcola | 0:a1734fe1ec4b | 250 | hasSavedSession = false; |
vpcola | 0:a1734fe1ec4b | 251 | return -1; |
vpcola | 0:a1734fe1ec4b | 252 | } |
vpcola | 0:a1734fe1ec4b | 253 | #endif |
vpcola | 0:a1734fe1ec4b | 254 | DBG("Session saved for reconnect ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 255 | hasSavedSession = true; |
vpcola | 0:a1734fe1ec4b | 256 | |
vpcola | 0:a1734fe1ec4b | 257 | return 0; |
vpcola | 0:a1734fe1ec4b | 258 | } |
vpcola | 0:a1734fe1ec4b | 259 | |
vpcola | 0:a1734fe1ec4b | 260 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
vpcola | 0:a1734fe1ec4b | 261 | { |
vpcola | 0:a1734fe1ec4b | 262 | int rc; |
vpcola | 0:a1734fe1ec4b | 263 | |
vpcola | 0:a1734fe1ec4b | 264 | if (tcpSocket == NULL) |
vpcola | 0:a1734fe1ec4b | 265 | return -1; |
vpcola | 0:a1734fe1ec4b | 266 | |
vpcola | 0:a1734fe1ec4b | 267 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 268 | { |
vpcola | 0:a1734fe1ec4b | 269 | // Do SSL/TLS read |
vpcola | 0:a1734fe1ec4b | 270 | rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); |
vpcola | 0:a1734fe1ec4b | 271 | if (MBEDTLS_ERR_SSL_WANT_READ == rc) |
vpcola | 0:a1734fe1ec4b | 272 | return TIMEOUT; |
vpcola | 0:a1734fe1ec4b | 273 | else |
vpcola | 0:a1734fe1ec4b | 274 | return rc; |
vpcola | 0:a1734fe1ec4b | 275 | } else { |
vpcola | 0:a1734fe1ec4b | 276 | // non-blocking socket ... |
vpcola | 0:a1734fe1ec4b | 277 | tcpSocket->set_timeout(timeout); |
vpcola | 0:a1734fe1ec4b | 278 | rc = tcpSocket->recv( (void *) buffer, size); |
vpcola | 0:a1734fe1ec4b | 279 | |
vpcola | 0:a1734fe1ec4b | 280 | // return 0 bytes if timeout ... |
vpcola | 0:a1734fe1ec4b | 281 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 0:a1734fe1ec4b | 282 | return TIMEOUT; |
vpcola | 0:a1734fe1ec4b | 283 | else |
vpcola | 0:a1734fe1ec4b | 284 | return rc; // return the number of bytes received or error |
vpcola | 0:a1734fe1ec4b | 285 | } |
vpcola | 0:a1734fe1ec4b | 286 | } |
vpcola | 0:a1734fe1ec4b | 287 | |
vpcola | 0:a1734fe1ec4b | 288 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
vpcola | 0:a1734fe1ec4b | 289 | { |
vpcola | 0:a1734fe1ec4b | 290 | int rc; |
vpcola | 0:a1734fe1ec4b | 291 | |
vpcola | 0:a1734fe1ec4b | 292 | if (tcpSocket == NULL) |
vpcola | 0:a1734fe1ec4b | 293 | return -1; |
vpcola | 0:a1734fe1ec4b | 294 | |
vpcola | 0:a1734fe1ec4b | 295 | if (useTLS) { |
vpcola | 0:a1734fe1ec4b | 296 | // Do SSL/TLS write |
vpcola | 0:a1734fe1ec4b | 297 | rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); |
vpcola | 0:a1734fe1ec4b | 298 | if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) |
vpcola | 0:a1734fe1ec4b | 299 | return TIMEOUT; |
vpcola | 0:a1734fe1ec4b | 300 | else |
vpcola | 0:a1734fe1ec4b | 301 | return rc; |
vpcola | 0:a1734fe1ec4b | 302 | } else { |
vpcola | 0:a1734fe1ec4b | 303 | |
vpcola | 0:a1734fe1ec4b | 304 | // set the write timeout |
vpcola | 0:a1734fe1ec4b | 305 | tcpSocket->set_timeout(timeout); |
vpcola | 0:a1734fe1ec4b | 306 | rc = tcpSocket->send(buffer, size); |
vpcola | 0:a1734fe1ec4b | 307 | |
vpcola | 0:a1734fe1ec4b | 308 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 0:a1734fe1ec4b | 309 | return TIMEOUT; |
vpcola | 0:a1734fe1ec4b | 310 | else |
vpcola | 0:a1734fe1ec4b | 311 | return rc; |
vpcola | 0:a1734fe1ec4b | 312 | } |
vpcola | 0:a1734fe1ec4b | 313 | } |
vpcola | 0:a1734fe1ec4b | 314 | |
vpcola | 0:a1734fe1ec4b | 315 | int MQTTThreadedClient::readPacketLength(int* value) |
vpcola | 0:a1734fe1ec4b | 316 | { |
vpcola | 0:a1734fe1ec4b | 317 | int rc = MQTTPACKET_READ_ERROR; |
vpcola | 0:a1734fe1ec4b | 318 | unsigned char c; |
vpcola | 0:a1734fe1ec4b | 319 | int multiplier = 1; |
vpcola | 0:a1734fe1ec4b | 320 | int len = 0; |
vpcola | 0:a1734fe1ec4b | 321 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
vpcola | 0:a1734fe1ec4b | 322 | |
vpcola | 0:a1734fe1ec4b | 323 | *value = 0; |
vpcola | 0:a1734fe1ec4b | 324 | do |
vpcola | 0:a1734fe1ec4b | 325 | { |
vpcola | 0:a1734fe1ec4b | 326 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
vpcola | 0:a1734fe1ec4b | 327 | { |
vpcola | 0:a1734fe1ec4b | 328 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
vpcola | 0:a1734fe1ec4b | 329 | goto exit; |
vpcola | 0:a1734fe1ec4b | 330 | } |
vpcola | 0:a1734fe1ec4b | 331 | |
vpcola | 0:a1734fe1ec4b | 332 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 0:a1734fe1ec4b | 333 | if (rc != 1) |
vpcola | 0:a1734fe1ec4b | 334 | { |
vpcola | 0:a1734fe1ec4b | 335 | rc = MQTTPACKET_READ_ERROR; |
vpcola | 0:a1734fe1ec4b | 336 | goto exit; |
vpcola | 0:a1734fe1ec4b | 337 | } |
vpcola | 0:a1734fe1ec4b | 338 | |
vpcola | 0:a1734fe1ec4b | 339 | *value += (c & 127) * multiplier; |
vpcola | 0:a1734fe1ec4b | 340 | multiplier *= 128; |
vpcola | 0:a1734fe1ec4b | 341 | } while ((c & 128) != 0); |
vpcola | 0:a1734fe1ec4b | 342 | |
vpcola | 0:a1734fe1ec4b | 343 | rc = MQTTPACKET_READ_COMPLETE; |
vpcola | 0:a1734fe1ec4b | 344 | |
vpcola | 0:a1734fe1ec4b | 345 | exit: |
vpcola | 0:a1734fe1ec4b | 346 | if (rc == MQTTPACKET_READ_ERROR ) |
vpcola | 0:a1734fe1ec4b | 347 | len = -1; |
vpcola | 0:a1734fe1ec4b | 348 | |
vpcola | 0:a1734fe1ec4b | 349 | return len; |
vpcola | 0:a1734fe1ec4b | 350 | } |
vpcola | 0:a1734fe1ec4b | 351 | |
vpcola | 0:a1734fe1ec4b | 352 | int MQTTThreadedClient::sendPacket(size_t length) |
vpcola | 0:a1734fe1ec4b | 353 | { |
vpcola | 0:a1734fe1ec4b | 354 | int rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 355 | int sent = 0; |
vpcola | 0:a1734fe1ec4b | 356 | |
vpcola | 0:a1734fe1ec4b | 357 | while (sent < length) |
vpcola | 0:a1734fe1ec4b | 358 | { |
vpcola | 0:a1734fe1ec4b | 359 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 0:a1734fe1ec4b | 360 | if (rc < 0) // there was an error writing the data |
vpcola | 0:a1734fe1ec4b | 361 | break; |
vpcola | 0:a1734fe1ec4b | 362 | sent += rc; |
vpcola | 0:a1734fe1ec4b | 363 | } |
vpcola | 0:a1734fe1ec4b | 364 | |
vpcola | 0:a1734fe1ec4b | 365 | if (sent == length) |
vpcola | 0:a1734fe1ec4b | 366 | rc = SUCCESS; |
vpcola | 0:a1734fe1ec4b | 367 | else |
vpcola | 0:a1734fe1ec4b | 368 | rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 369 | |
vpcola | 0:a1734fe1ec4b | 370 | return rc; |
vpcola | 0:a1734fe1ec4b | 371 | } |
vpcola | 0:a1734fe1ec4b | 372 | /** |
vpcola | 0:a1734fe1ec4b | 373 | * Reads the entire packet to readbuf and returns |
vpcola | 0:a1734fe1ec4b | 374 | * the type of packet when successful, otherwise |
vpcola | 0:a1734fe1ec4b | 375 | * a negative error code is returned. |
vpcola | 0:a1734fe1ec4b | 376 | **/ |
vpcola | 0:a1734fe1ec4b | 377 | int MQTTThreadedClient::readPacket() |
vpcola | 0:a1734fe1ec4b | 378 | { |
vpcola | 0:a1734fe1ec4b | 379 | int rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 380 | MQTTHeader header = {0}; |
vpcola | 0:a1734fe1ec4b | 381 | int len = 0; |
vpcola | 0:a1734fe1ec4b | 382 | int rem_len = 0; |
vpcola | 0:a1734fe1ec4b | 383 | |
vpcola | 0:a1734fe1ec4b | 384 | /* 1. read the header byte. This has the packet type in it */ |
vpcola | 0:a1734fe1ec4b | 385 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
vpcola | 0:a1734fe1ec4b | 386 | goto exit; |
vpcola | 0:a1734fe1ec4b | 387 | |
vpcola | 0:a1734fe1ec4b | 388 | len = 1; |
vpcola | 0:a1734fe1ec4b | 389 | /* 2. read the remaining length. This is variable in itself */ |
vpcola | 0:a1734fe1ec4b | 390 | if ( readPacketLength(&rem_len) < 0 ) |
vpcola | 0:a1734fe1ec4b | 391 | goto exit; |
vpcola | 0:a1734fe1ec4b | 392 | |
vpcola | 0:a1734fe1ec4b | 393 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
vpcola | 0:a1734fe1ec4b | 394 | |
vpcola | 0:a1734fe1ec4b | 395 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
vpcola | 0:a1734fe1ec4b | 396 | { |
vpcola | 0:a1734fe1ec4b | 397 | rc = BUFFER_OVERFLOW; |
vpcola | 0:a1734fe1ec4b | 398 | goto exit; |
vpcola | 0:a1734fe1ec4b | 399 | } |
vpcola | 0:a1734fe1ec4b | 400 | |
vpcola | 0:a1734fe1ec4b | 401 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
vpcola | 0:a1734fe1ec4b | 402 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
vpcola | 0:a1734fe1ec4b | 403 | goto exit; |
vpcola | 0:a1734fe1ec4b | 404 | |
vpcola | 0:a1734fe1ec4b | 405 | // Convert the header to type |
vpcola | 0:a1734fe1ec4b | 406 | // and update rc |
vpcola | 0:a1734fe1ec4b | 407 | header.byte = readbuf[0]; |
vpcola | 0:a1734fe1ec4b | 408 | rc = header.bits.type; |
vpcola | 0:a1734fe1ec4b | 409 | |
vpcola | 0:a1734fe1ec4b | 410 | exit: |
vpcola | 0:a1734fe1ec4b | 411 | |
vpcola | 0:a1734fe1ec4b | 412 | return rc; |
vpcola | 0:a1734fe1ec4b | 413 | } |
vpcola | 0:a1734fe1ec4b | 414 | |
vpcola | 0:a1734fe1ec4b | 415 | /** |
vpcola | 0:a1734fe1ec4b | 416 | * Read until a specified packet type is received, or untill the specified |
vpcola | 0:a1734fe1ec4b | 417 | * timeout dropping packets along the way. |
vpcola | 0:a1734fe1ec4b | 418 | **/ |
vpcola | 0:a1734fe1ec4b | 419 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
vpcola | 0:a1734fe1ec4b | 420 | { |
vpcola | 0:a1734fe1ec4b | 421 | int pType = FAILURE; |
vpcola | 0:a1734fe1ec4b | 422 | Timer timer; |
vpcola | 0:a1734fe1ec4b | 423 | |
vpcola | 0:a1734fe1ec4b | 424 | timer.start(); |
vpcola | 0:a1734fe1ec4b | 425 | do { |
vpcola | 0:a1734fe1ec4b | 426 | pType = readPacket(); |
vpcola | 0:a1734fe1ec4b | 427 | if (pType < 0) |
vpcola | 0:a1734fe1ec4b | 428 | break; |
vpcola | 0:a1734fe1ec4b | 429 | |
vpcola | 0:a1734fe1ec4b | 430 | if (timer.read_ms() > timeout) |
vpcola | 0:a1734fe1ec4b | 431 | { |
vpcola | 0:a1734fe1ec4b | 432 | pType = FAILURE; |
vpcola | 0:a1734fe1ec4b | 433 | break; |
vpcola | 0:a1734fe1ec4b | 434 | } |
vpcola | 0:a1734fe1ec4b | 435 | }while(pType != packetType); |
vpcola | 0:a1734fe1ec4b | 436 | |
vpcola | 0:a1734fe1ec4b | 437 | return pType; |
vpcola | 0:a1734fe1ec4b | 438 | } |
vpcola | 0:a1734fe1ec4b | 439 | |
vpcola | 0:a1734fe1ec4b | 440 | |
vpcola | 0:a1734fe1ec4b | 441 | int MQTTThreadedClient::login() |
vpcola | 0:a1734fe1ec4b | 442 | { |
vpcola | 0:a1734fe1ec4b | 443 | int rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 444 | int len = 0; |
vpcola | 0:a1734fe1ec4b | 445 | |
vpcola | 0:a1734fe1ec4b | 446 | if (!isConnected) |
vpcola | 0:a1734fe1ec4b | 447 | { |
vpcola | 0:a1734fe1ec4b | 448 | DBG("Session not connected! \r\n"); |
vpcola | 0:a1734fe1ec4b | 449 | return rc; |
vpcola | 0:a1734fe1ec4b | 450 | } |
vpcola | 0:a1734fe1ec4b | 451 | |
vpcola | 0:a1734fe1ec4b | 452 | // Copy the keepAliveInterval value to local |
vpcola | 0:a1734fe1ec4b | 453 | // MQTT specifies in seconds, we have to multiply that |
vpcola | 0:a1734fe1ec4b | 454 | // amount for our 32 bit timers which accepts ms. |
vpcola | 0:a1734fe1ec4b | 455 | keepAliveInterval = (connect_options.keepAliveInterval * 1000); |
vpcola | 0:a1734fe1ec4b | 456 | |
vpcola | 0:a1734fe1ec4b | 457 | DBG("Login with: \r\n"); |
vpcola | 0:a1734fe1ec4b | 458 | DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); |
vpcola | 0:a1734fe1ec4b | 459 | DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); |
vpcola | 0:a1734fe1ec4b | 460 | |
vpcola | 0:a1734fe1ec4b | 461 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) |
vpcola | 0:a1734fe1ec4b | 462 | { |
vpcola | 0:a1734fe1ec4b | 463 | DBG("Error serializing connect packet ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 464 | return rc; |
vpcola | 0:a1734fe1ec4b | 465 | } |
vpcola | 0:a1734fe1ec4b | 466 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
vpcola | 0:a1734fe1ec4b | 467 | { |
vpcola | 0:a1734fe1ec4b | 468 | DBG("Error sending the connect request packet ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 469 | return rc; |
vpcola | 0:a1734fe1ec4b | 470 | } |
vpcola | 0:a1734fe1ec4b | 471 | |
vpcola | 0:a1734fe1ec4b | 472 | // Wait for the CONNACK |
vpcola | 0:a1734fe1ec4b | 473 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
vpcola | 0:a1734fe1ec4b | 474 | { |
vpcola | 0:a1734fe1ec4b | 475 | unsigned char connack_rc = 255; |
vpcola | 0:a1734fe1ec4b | 476 | bool sessionPresent = false; |
vpcola | 0:a1734fe1ec4b | 477 | DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 478 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 0:a1734fe1ec4b | 479 | rc = connack_rc; |
vpcola | 0:a1734fe1ec4b | 480 | else |
vpcola | 0:a1734fe1ec4b | 481 | rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 482 | } |
vpcola | 0:a1734fe1ec4b | 483 | else |
vpcola | 0:a1734fe1ec4b | 484 | rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 485 | |
vpcola | 0:a1734fe1ec4b | 486 | if (rc == SUCCESS) |
vpcola | 0:a1734fe1ec4b | 487 | { |
vpcola | 0:a1734fe1ec4b | 488 | DBG("Connected!!! ... starting connection timers ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 489 | resetConnectionTimer(); |
vpcola | 0:a1734fe1ec4b | 490 | } |
vpcola | 0:a1734fe1ec4b | 491 | |
vpcola | 0:a1734fe1ec4b | 492 | DBG("Returning with rc = %d\r\n", rc); |
vpcola | 0:a1734fe1ec4b | 493 | |
vpcola | 0:a1734fe1ec4b | 494 | return rc; |
vpcola | 0:a1734fe1ec4b | 495 | } |
vpcola | 0:a1734fe1ec4b | 496 | |
vpcola | 0:a1734fe1ec4b | 497 | |
vpcola | 0:a1734fe1ec4b | 498 | void MQTTThreadedClient::disconnect() |
vpcola | 0:a1734fe1ec4b | 499 | { |
vpcola | 0:a1734fe1ec4b | 500 | if (isConnected) |
vpcola | 0:a1734fe1ec4b | 501 | { |
vpcola | 0:a1734fe1ec4b | 502 | if( useTLS |
vpcola | 0:a1734fe1ec4b | 503 | && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) |
vpcola | 0:a1734fe1ec4b | 504 | ) |
vpcola | 0:a1734fe1ec4b | 505 | { |
vpcola | 0:a1734fe1ec4b | 506 | DBG( "Session reset returned an error \r\n"); |
vpcola | 0:a1734fe1ec4b | 507 | } |
vpcola | 0:a1734fe1ec4b | 508 | |
vpcola | 0:a1734fe1ec4b | 509 | isConnected = false; |
vpcola | 0:a1734fe1ec4b | 510 | tcpSocket->close(); |
vpcola | 0:a1734fe1ec4b | 511 | } |
vpcola | 0:a1734fe1ec4b | 512 | } |
vpcola | 0:a1734fe1ec4b | 513 | |
vpcola | 0:a1734fe1ec4b | 514 | int MQTTThreadedClient::connect() |
vpcola | 0:a1734fe1ec4b | 515 | { |
vpcola | 0:a1734fe1ec4b | 516 | int ret = FAILURE; |
vpcola | 0:a1734fe1ec4b | 517 | |
vpcola | 0:a1734fe1ec4b | 518 | if ((network == NULL) || (tcpSocket == NULL) |
vpcola | 0:a1734fe1ec4b | 519 | || host.empty()) |
vpcola | 0:a1734fe1ec4b | 520 | { |
vpcola | 0:a1734fe1ec4b | 521 | DBG("Network settings not set! \r\n"); |
vpcola | 0:a1734fe1ec4b | 522 | return ret; |
vpcola | 0:a1734fe1ec4b | 523 | } |
vpcola | 0:a1734fe1ec4b | 524 | |
vpcola | 0:a1734fe1ec4b | 525 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 526 | { |
vpcola | 0:a1734fe1ec4b | 527 | if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { |
vpcola | 0:a1734fe1ec4b | 528 | mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); |
vpcola | 0:a1734fe1ec4b | 529 | return ret; |
vpcola | 0:a1734fe1ec4b | 530 | } |
vpcola | 0:a1734fe1ec4b | 531 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola | 0:a1734fe1ec4b | 532 | if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { |
vpcola | 0:a1734fe1ec4b | 533 | mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); |
vpcola | 0:a1734fe1ec4b | 534 | return ret; |
vpcola | 0:a1734fe1ec4b | 535 | } |
vpcola | 0:a1734fe1ec4b | 536 | #endif |
vpcola | 0:a1734fe1ec4b | 537 | } |
vpcola | 0:a1734fe1ec4b | 538 | |
vpcola | 0:a1734fe1ec4b | 539 | tcpSocket->open(network); |
vpcola | 0:a1734fe1ec4b | 540 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 541 | { |
vpcola | 0:a1734fe1ec4b | 542 | DBG("mbedtls_ssl_set_hostname ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 543 | mbedtls_ssl_set_hostname(&_ssl, host.c_str()); |
vpcola | 0:a1734fe1ec4b | 544 | DBG("mbedtls_ssl_set_bio ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 545 | mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), |
vpcola | 0:a1734fe1ec4b | 546 | ssl_send, ssl_recv, NULL ); |
vpcola | 0:a1734fe1ec4b | 547 | } |
vpcola | 0:a1734fe1ec4b | 548 | |
vpcola | 0:a1734fe1ec4b | 549 | if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) |
vpcola | 0:a1734fe1ec4b | 550 | { |
vpcola | 0:a1734fe1ec4b | 551 | DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); |
vpcola | 0:a1734fe1ec4b | 552 | return ret; |
vpcola | 0:a1734fe1ec4b | 553 | }else |
vpcola | 0:a1734fe1ec4b | 554 | isConnected = true; |
vpcola | 0:a1734fe1ec4b | 555 | |
vpcola | 0:a1734fe1ec4b | 556 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 557 | { |
vpcola | 0:a1734fe1ec4b | 558 | |
vpcola | 0:a1734fe1ec4b | 559 | if (doTLSHandshake() < 0) |
vpcola | 0:a1734fe1ec4b | 560 | { |
vpcola | 0:a1734fe1ec4b | 561 | DBG("TLS Handshake failed! \r\n"); |
vpcola | 0:a1734fe1ec4b | 562 | return FAILURE; |
vpcola | 0:a1734fe1ec4b | 563 | }else |
vpcola | 0:a1734fe1ec4b | 564 | DBG("TLS Handshake complete!! \r\n"); |
vpcola | 0:a1734fe1ec4b | 565 | } |
vpcola | 0:a1734fe1ec4b | 566 | |
vpcola | 0:a1734fe1ec4b | 567 | return login(); |
vpcola | 0:a1734fe1ec4b | 568 | } |
vpcola | 0:a1734fe1ec4b | 569 | |
vpcola | 0:a1734fe1ec4b | 570 | void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) |
vpcola | 0:a1734fe1ec4b | 571 | { |
vpcola | 0:a1734fe1ec4b | 572 | // Copy the settings for reconnection |
vpcola | 0:a1734fe1ec4b | 573 | host = chost; |
vpcola | 0:a1734fe1ec4b | 574 | port = cport; |
vpcola | 0:a1734fe1ec4b | 575 | connect_options = options; |
vpcola | 0:a1734fe1ec4b | 576 | } |
vpcola | 0:a1734fe1ec4b | 577 | |
vpcola | 0:a1734fe1ec4b | 578 | int MQTTThreadedClient::publish(PubMessage& msg) |
vpcola | 0:a1734fe1ec4b | 579 | { |
vpcola | 0:a1734fe1ec4b | 580 | #if 0 |
vpcola | 0:a1734fe1ec4b | 581 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
vpcola | 0:a1734fe1ec4b | 582 | // TODO: handle id values when the function is called later |
vpcola | 0:a1734fe1ec4b | 583 | if (id == 0) |
vpcola | 0:a1734fe1ec4b | 584 | return FAILURE; |
vpcola | 0:a1734fe1ec4b | 585 | else |
vpcola | 0:a1734fe1ec4b | 586 | return SUCCESS; |
vpcola | 0:a1734fe1ec4b | 587 | #endif |
vpcola | 0:a1734fe1ec4b | 588 | PubMessage *message = mpool.alloc(); |
vpcola | 0:a1734fe1ec4b | 589 | // Simple copy |
vpcola | 0:a1734fe1ec4b | 590 | *message = msg; |
vpcola | 0:a1734fe1ec4b | 591 | |
vpcola | 0:a1734fe1ec4b | 592 | // Push the data to the thread |
vpcola | 0:a1734fe1ec4b | 593 | DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 594 | mqueue.put(message); |
vpcola | 0:a1734fe1ec4b | 595 | |
vpcola | 0:a1734fe1ec4b | 596 | return SUCCESS; |
vpcola | 0:a1734fe1ec4b | 597 | } |
vpcola | 0:a1734fe1ec4b | 598 | |
vpcola | 0:a1734fe1ec4b | 599 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
vpcola | 0:a1734fe1ec4b | 600 | { |
vpcola | 0:a1734fe1ec4b | 601 | MQTTString topicString = MQTTString_initializer; |
vpcola | 0:a1734fe1ec4b | 602 | |
vpcola | 0:a1734fe1ec4b | 603 | if (!isConnected) |
vpcola | 0:a1734fe1ec4b | 604 | { |
vpcola | 0:a1734fe1ec4b | 605 | DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 606 | return FAILURE; |
vpcola | 0:a1734fe1ec4b | 607 | } |
vpcola | 0:a1734fe1ec4b | 608 | |
vpcola | 0:a1734fe1ec4b | 609 | topicString.cstring = (char*) &message.topic[0]; |
vpcola | 0:a1734fe1ec4b | 610 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
vpcola | 0:a1734fe1ec4b | 611 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
vpcola | 0:a1734fe1ec4b | 612 | if (len <= 0) |
vpcola | 0:a1734fe1ec4b | 613 | { |
vpcola | 0:a1734fe1ec4b | 614 | DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 615 | return FAILURE; |
vpcola | 0:a1734fe1ec4b | 616 | } |
vpcola | 0:a1734fe1ec4b | 617 | |
vpcola | 0:a1734fe1ec4b | 618 | if (sendPacket(len) == SUCCESS) |
vpcola | 0:a1734fe1ec4b | 619 | { |
vpcola | 0:a1734fe1ec4b | 620 | DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 621 | return SUCCESS; |
vpcola | 0:a1734fe1ec4b | 622 | } |
vpcola | 0:a1734fe1ec4b | 623 | |
vpcola | 0:a1734fe1ec4b | 624 | DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 625 | return FAILURE; |
vpcola | 0:a1734fe1ec4b | 626 | } |
vpcola | 0:a1734fe1ec4b | 627 | |
vpcola | 0:a1734fe1ec4b | 628 | void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) |
vpcola | 0:a1734fe1ec4b | 629 | { |
vpcola | 0:a1734fe1ec4b | 630 | // Push the subscription into the map ... |
vpcola | 0:a1734fe1ec4b | 631 | FP<void,MessageData &> fp; |
vpcola | 0:a1734fe1ec4b | 632 | fp.attach(function); |
vpcola | 0:a1734fe1ec4b | 633 | |
vpcola | 0:a1734fe1ec4b | 634 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
vpcola | 0:a1734fe1ec4b | 635 | } |
vpcola | 0:a1734fe1ec4b | 636 | |
vpcola | 0:a1734fe1ec4b | 637 | int MQTTThreadedClient::processSubscriptions() |
vpcola | 0:a1734fe1ec4b | 638 | { |
vpcola | 0:a1734fe1ec4b | 639 | int numsubscribed = 0; |
vpcola | 0:a1734fe1ec4b | 640 | |
vpcola | 0:a1734fe1ec4b | 641 | if (!isConnected) |
vpcola | 0:a1734fe1ec4b | 642 | { |
vpcola | 0:a1734fe1ec4b | 643 | DBG("Session not connected!!\r\n"); |
vpcola | 0:a1734fe1ec4b | 644 | return 0; |
vpcola | 0:a1734fe1ec4b | 645 | } |
vpcola | 0:a1734fe1ec4b | 646 | |
vpcola | 0:a1734fe1ec4b | 647 | DBG("Processing subscribed topics ....\r\n"); |
vpcola | 0:a1734fe1ec4b | 648 | |
vpcola | 0:a1734fe1ec4b | 649 | std::map<std::string, FP<void, MessageData &> >::iterator it; |
vpcola | 0:a1734fe1ec4b | 650 | for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) |
vpcola | 0:a1734fe1ec4b | 651 | { |
vpcola | 0:a1734fe1ec4b | 652 | int rc = FAILURE; |
vpcola | 0:a1734fe1ec4b | 653 | int len = 0; |
vpcola | 0:a1734fe1ec4b | 654 | //TODO: We only subscribe to QoS = 0 for now |
vpcola | 0:a1734fe1ec4b | 655 | QoS qos = QOS0; |
vpcola | 0:a1734fe1ec4b | 656 | |
vpcola | 0:a1734fe1ec4b | 657 | MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; |
vpcola | 0:a1734fe1ec4b | 658 | DBG("Subscribing to topic [%s]\r\n", topic.cstring); |
vpcola | 0:a1734fe1ec4b | 659 | |
vpcola | 0:a1734fe1ec4b | 660 | |
vpcola | 0:a1734fe1ec4b | 661 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
vpcola | 0:a1734fe1ec4b | 662 | if (len <= 0) { |
vpcola | 0:a1734fe1ec4b | 663 | DBG("Error serializing subscribe packet ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 664 | continue; |
vpcola | 0:a1734fe1ec4b | 665 | } |
vpcola | 0:a1734fe1ec4b | 666 | |
vpcola | 0:a1734fe1ec4b | 667 | if ((rc = sendPacket(len)) != SUCCESS) { |
vpcola | 0:a1734fe1ec4b | 668 | DBG("Error sending subscribe packet [%d]\r\n", rc); |
vpcola | 0:a1734fe1ec4b | 669 | continue; |
vpcola | 0:a1734fe1ec4b | 670 | } |
vpcola | 0:a1734fe1ec4b | 671 | |
vpcola | 0:a1734fe1ec4b | 672 | DBG("Waiting for subscription ack ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 673 | // Wait for SUBACK, dropping packets read along the way ... |
vpcola | 0:a1734fe1ec4b | 674 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
vpcola | 0:a1734fe1ec4b | 675 | int count = 0, grantedQoS = -1; |
vpcola | 0:a1734fe1ec4b | 676 | unsigned short mypacketid; |
vpcola | 0:a1734fe1ec4b | 677 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 0:a1734fe1ec4b | 678 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
vpcola | 0:a1734fe1ec4b | 679 | // For as long as we do not get 0x80 .. |
vpcola | 0:a1734fe1ec4b | 680 | if (rc != 0x80) |
vpcola | 0:a1734fe1ec4b | 681 | { |
vpcola | 0:a1734fe1ec4b | 682 | // Reset connection timers here ... |
vpcola | 0:a1734fe1ec4b | 683 | resetConnectionTimer(); |
vpcola | 0:a1734fe1ec4b | 684 | DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); |
vpcola | 0:a1734fe1ec4b | 685 | numsubscribed++; |
vpcola | 0:a1734fe1ec4b | 686 | } else { |
vpcola | 0:a1734fe1ec4b | 687 | DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); |
vpcola | 0:a1734fe1ec4b | 688 | } |
vpcola | 0:a1734fe1ec4b | 689 | } else |
vpcola | 0:a1734fe1ec4b | 690 | DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); |
vpcola | 0:a1734fe1ec4b | 691 | } // end for loop |
vpcola | 0:a1734fe1ec4b | 692 | |
vpcola | 0:a1734fe1ec4b | 693 | return numsubscribed; |
vpcola | 0:a1734fe1ec4b | 694 | } |
vpcola | 0:a1734fe1ec4b | 695 | |
vpcola | 0:a1734fe1ec4b | 696 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
vpcola | 0:a1734fe1ec4b | 697 | { |
vpcola | 0:a1734fe1ec4b | 698 | char* curf = topicFilter; |
vpcola | 0:a1734fe1ec4b | 699 | char* curn = topicName.lenstring.data; |
vpcola | 0:a1734fe1ec4b | 700 | char* curn_end = curn + topicName.lenstring.len; |
vpcola | 0:a1734fe1ec4b | 701 | |
vpcola | 0:a1734fe1ec4b | 702 | while (*curf && curn < curn_end) |
vpcola | 0:a1734fe1ec4b | 703 | { |
vpcola | 0:a1734fe1ec4b | 704 | if (*curn == '/' && *curf != '/') |
vpcola | 0:a1734fe1ec4b | 705 | break; |
vpcola | 0:a1734fe1ec4b | 706 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
vpcola | 0:a1734fe1ec4b | 707 | break; |
vpcola | 0:a1734fe1ec4b | 708 | if (*curf == '+') |
vpcola | 0:a1734fe1ec4b | 709 | { // skip until we meet the next separator, or end of string |
vpcola | 0:a1734fe1ec4b | 710 | char* nextpos = curn + 1; |
vpcola | 0:a1734fe1ec4b | 711 | while (nextpos < curn_end && *nextpos != '/') |
vpcola | 0:a1734fe1ec4b | 712 | nextpos = ++curn + 1; |
vpcola | 0:a1734fe1ec4b | 713 | } |
vpcola | 0:a1734fe1ec4b | 714 | else if (*curf == '#') |
vpcola | 0:a1734fe1ec4b | 715 | curn = curn_end - 1; // skip until end of string |
vpcola | 0:a1734fe1ec4b | 716 | curf++; |
vpcola | 0:a1734fe1ec4b | 717 | curn++; |
vpcola | 0:a1734fe1ec4b | 718 | }; |
vpcola | 0:a1734fe1ec4b | 719 | |
vpcola | 0:a1734fe1ec4b | 720 | return (curn == curn_end) && (*curf == '\0'); |
vpcola | 0:a1734fe1ec4b | 721 | } |
vpcola | 0:a1734fe1ec4b | 722 | |
vpcola | 0:a1734fe1ec4b | 723 | int MQTTThreadedClient::handlePublishMsg() |
vpcola | 0:a1734fe1ec4b | 724 | { |
vpcola | 0:a1734fe1ec4b | 725 | MQTTString topicName = MQTTString_initializer; |
vpcola | 0:a1734fe1ec4b | 726 | Message msg; |
vpcola | 0:a1734fe1ec4b | 727 | int intQoS; |
vpcola | 0:a1734fe1ec4b | 728 | DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 729 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
vpcola | 0:a1734fe1ec4b | 730 | &intQoS, |
vpcola | 0:a1734fe1ec4b | 731 | (unsigned char*)&msg.retained, |
vpcola | 0:a1734fe1ec4b | 732 | (unsigned short*)&msg.id, |
vpcola | 0:a1734fe1ec4b | 733 | &topicName, |
vpcola | 0:a1734fe1ec4b | 734 | (unsigned char**)&msg.payload, |
vpcola | 0:a1734fe1ec4b | 735 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
vpcola | 0:a1734fe1ec4b | 736 | { |
vpcola | 0:a1734fe1ec4b | 737 | DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 738 | return -1; |
vpcola | 0:a1734fe1ec4b | 739 | } |
vpcola | 0:a1734fe1ec4b | 740 | |
vpcola | 0:a1734fe1ec4b | 741 | std::string topic; |
vpcola | 0:a1734fe1ec4b | 742 | if (topicName.lenstring.len > 0) |
vpcola | 0:a1734fe1ec4b | 743 | { |
vpcola | 0:a1734fe1ec4b | 744 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
vpcola | 0:a1734fe1ec4b | 745 | }else |
vpcola | 0:a1734fe1ec4b | 746 | topic = (const char *) topicName.cstring; |
vpcola | 0:a1734fe1ec4b | 747 | |
vpcola | 0:a1734fe1ec4b | 748 | DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); |
vpcola | 0:a1734fe1ec4b | 749 | |
vpcola | 0:a1734fe1ec4b | 750 | msg.qos = (QoS) intQoS; |
vpcola | 0:a1734fe1ec4b | 751 | |
vpcola | 0:a1734fe1ec4b | 752 | |
vpcola | 0:a1734fe1ec4b | 753 | // Call the handlers for each topic |
vpcola | 0:a1734fe1ec4b | 754 | if (topicCBMap.find(topic) != topicCBMap.end()) |
vpcola | 0:a1734fe1ec4b | 755 | { |
vpcola | 0:a1734fe1ec4b | 756 | // Call the callback function |
vpcola | 0:a1734fe1ec4b | 757 | if (topicCBMap[topic].attached()) |
vpcola | 0:a1734fe1ec4b | 758 | { |
vpcola | 0:a1734fe1ec4b | 759 | DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 760 | MessageData md(topicName, msg); |
vpcola | 0:a1734fe1ec4b | 761 | topicCBMap[topic](md); |
vpcola | 0:a1734fe1ec4b | 762 | |
vpcola | 0:a1734fe1ec4b | 763 | return 1; |
vpcola | 0:a1734fe1ec4b | 764 | } |
vpcola | 0:a1734fe1ec4b | 765 | } |
vpcola | 0:a1734fe1ec4b | 766 | |
vpcola | 0:a1734fe1ec4b | 767 | // TODO: depending on the QoS |
vpcola | 0:a1734fe1ec4b | 768 | // we send data to the server = PUBACK or PUBREC |
vpcola | 0:a1734fe1ec4b | 769 | switch(intQoS) |
vpcola | 0:a1734fe1ec4b | 770 | { |
vpcola | 0:a1734fe1ec4b | 771 | case QOS0: |
vpcola | 0:a1734fe1ec4b | 772 | // We send back nothing ... |
vpcola | 0:a1734fe1ec4b | 773 | break; |
vpcola | 0:a1734fe1ec4b | 774 | case QOS1: |
vpcola | 0:a1734fe1ec4b | 775 | // TODO: implement |
vpcola | 0:a1734fe1ec4b | 776 | break; |
vpcola | 0:a1734fe1ec4b | 777 | case QOS2: |
vpcola | 0:a1734fe1ec4b | 778 | // TODO: implement |
vpcola | 0:a1734fe1ec4b | 779 | break; |
vpcola | 0:a1734fe1ec4b | 780 | default: |
vpcola | 0:a1734fe1ec4b | 781 | break; |
vpcola | 0:a1734fe1ec4b | 782 | } |
vpcola | 0:a1734fe1ec4b | 783 | |
vpcola | 0:a1734fe1ec4b | 784 | return 0; |
vpcola | 0:a1734fe1ec4b | 785 | } |
vpcola | 0:a1734fe1ec4b | 786 | |
vpcola | 0:a1734fe1ec4b | 787 | void MQTTThreadedClient::resetConnectionTimer() |
vpcola | 0:a1734fe1ec4b | 788 | { |
vpcola | 0:a1734fe1ec4b | 789 | if (keepAliveInterval > 0) |
vpcola | 0:a1734fe1ec4b | 790 | { |
vpcola | 0:a1734fe1ec4b | 791 | comTimer.reset(); |
vpcola | 0:a1734fe1ec4b | 792 | comTimer.start(); |
vpcola | 0:a1734fe1ec4b | 793 | } |
vpcola | 0:a1734fe1ec4b | 794 | } |
vpcola | 0:a1734fe1ec4b | 795 | |
vpcola | 0:a1734fe1ec4b | 796 | bool MQTTThreadedClient::hasConnectionTimedOut() |
vpcola | 0:a1734fe1ec4b | 797 | { |
vpcola | 0:a1734fe1ec4b | 798 | if (keepAliveInterval > 0 ) { |
vpcola | 0:a1734fe1ec4b | 799 | // Check connection timer |
vpcola | 0:a1734fe1ec4b | 800 | if (comTimer.read_ms() > keepAliveInterval) |
vpcola | 0:a1734fe1ec4b | 801 | return true; |
vpcola | 0:a1734fe1ec4b | 802 | else |
vpcola | 0:a1734fe1ec4b | 803 | return false; |
vpcola | 0:a1734fe1ec4b | 804 | } |
vpcola | 0:a1734fe1ec4b | 805 | |
vpcola | 0:a1734fe1ec4b | 806 | return false; |
vpcola | 0:a1734fe1ec4b | 807 | } |
vpcola | 0:a1734fe1ec4b | 808 | |
vpcola | 0:a1734fe1ec4b | 809 | void MQTTThreadedClient::sendPingRequest() |
vpcola | 0:a1734fe1ec4b | 810 | { |
vpcola | 0:a1734fe1ec4b | 811 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
vpcola | 0:a1734fe1ec4b | 812 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
vpcola | 0:a1734fe1ec4b | 813 | { |
vpcola | 0:a1734fe1ec4b | 814 | printf("MQTT Ping request sent successfully ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 815 | } |
vpcola | 0:a1734fe1ec4b | 816 | } |
vpcola | 0:a1734fe1ec4b | 817 | |
vpcola | 0:a1734fe1ec4b | 818 | void MQTTThreadedClient::startListener() |
vpcola | 0:a1734fe1ec4b | 819 | { |
vpcola | 0:a1734fe1ec4b | 820 | int pType; |
vpcola | 0:a1734fe1ec4b | 821 | int numsubs; |
vpcola | 0:a1734fe1ec4b | 822 | // Continuesly listens for packets and dispatch |
vpcola | 0:a1734fe1ec4b | 823 | // message handlers ... |
vpcola | 0:a1734fe1ec4b | 824 | if (useTLS) |
vpcola | 0:a1734fe1ec4b | 825 | { |
vpcola | 0:a1734fe1ec4b | 826 | initTLS(); |
vpcola | 0:a1734fe1ec4b | 827 | } |
vpcola | 0:a1734fe1ec4b | 828 | |
vpcola | 0:a1734fe1ec4b | 829 | while(true) |
vpcola | 0:a1734fe1ec4b | 830 | { |
vpcola | 0:a1734fe1ec4b | 831 | |
vpcola | 0:a1734fe1ec4b | 832 | // Attempt to reconnect and login |
vpcola | 0:a1734fe1ec4b | 833 | if ( connect() < 0 ) |
vpcola | 0:a1734fe1ec4b | 834 | { |
vpcola | 0:a1734fe1ec4b | 835 | disconnect(); |
vpcola | 0:a1734fe1ec4b | 836 | // Wait for a few secs and reconnect ... |
vpcola | 0:a1734fe1ec4b | 837 | Thread::wait(6000); |
vpcola | 0:a1734fe1ec4b | 838 | continue; |
vpcola | 0:a1734fe1ec4b | 839 | } |
vpcola | 0:a1734fe1ec4b | 840 | |
vpcola | 0:a1734fe1ec4b | 841 | numsubs = processSubscriptions(); |
vpcola | 0:a1734fe1ec4b | 842 | DBG("Subscribed %d topics ...\r\n", numsubs); |
vpcola | 0:a1734fe1ec4b | 843 | |
vpcola | 0:a1734fe1ec4b | 844 | // loop read |
vpcola | 0:a1734fe1ec4b | 845 | while(true) |
vpcola | 0:a1734fe1ec4b | 846 | { |
vpcola | 0:a1734fe1ec4b | 847 | pType = readPacket(); |
vpcola | 0:a1734fe1ec4b | 848 | switch(pType) |
vpcola | 0:a1734fe1ec4b | 849 | { |
vpcola | 0:a1734fe1ec4b | 850 | case TIMEOUT: |
vpcola | 0:a1734fe1ec4b | 851 | // No data available from the network ... |
vpcola | 0:a1734fe1ec4b | 852 | break; |
vpcola | 0:a1734fe1ec4b | 853 | case FAILURE: |
vpcola | 0:a1734fe1ec4b | 854 | { |
vpcola | 0:a1734fe1ec4b | 855 | DBG("readPacket returned failure \r\n"); |
vpcola | 0:a1734fe1ec4b | 856 | goto reconnect; |
vpcola | 0:a1734fe1ec4b | 857 | } |
vpcola | 0:a1734fe1ec4b | 858 | case BUFFER_OVERFLOW: |
vpcola | 0:a1734fe1ec4b | 859 | { |
vpcola | 0:a1734fe1ec4b | 860 | // TODO: Network error, do we disconnect and reconnect? |
vpcola | 0:a1734fe1ec4b | 861 | DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 862 | MBED_ASSERT(false); |
vpcola | 0:a1734fe1ec4b | 863 | } |
vpcola | 0:a1734fe1ec4b | 864 | break; |
vpcola | 0:a1734fe1ec4b | 865 | /** |
vpcola | 0:a1734fe1ec4b | 866 | * The rest of the return codes below (all positive) is about MQTT |
vpcola | 0:a1734fe1ec4b | 867 | * response codes |
vpcola | 0:a1734fe1ec4b | 868 | **/ |
vpcola | 0:a1734fe1ec4b | 869 | case CONNACK: |
vpcola | 0:a1734fe1ec4b | 870 | case PUBACK: |
vpcola | 0:a1734fe1ec4b | 871 | case SUBACK: |
vpcola | 0:a1734fe1ec4b | 872 | break; |
vpcola | 0:a1734fe1ec4b | 873 | case PUBLISH: |
vpcola | 0:a1734fe1ec4b | 874 | { |
vpcola | 0:a1734fe1ec4b | 875 | DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 876 | // We receive data from the MQTT server .. |
vpcola | 0:a1734fe1ec4b | 877 | if (handlePublishMsg() < 0) { |
vpcola | 0:a1734fe1ec4b | 878 | DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 879 | break; |
vpcola | 0:a1734fe1ec4b | 880 | } |
vpcola | 0:a1734fe1ec4b | 881 | } |
vpcola | 0:a1734fe1ec4b | 882 | break; |
vpcola | 0:a1734fe1ec4b | 883 | case PINGRESP: |
vpcola | 0:a1734fe1ec4b | 884 | { |
vpcola | 0:a1734fe1ec4b | 885 | printf("MQTT Got ping response ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 886 | resetConnectionTimer(); |
vpcola | 0:a1734fe1ec4b | 887 | } |
vpcola | 0:a1734fe1ec4b | 888 | break; |
vpcola | 0:a1734fe1ec4b | 889 | default: |
vpcola | 0:a1734fe1ec4b | 890 | DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); |
vpcola | 0:a1734fe1ec4b | 891 | } |
vpcola | 0:a1734fe1ec4b | 892 | |
vpcola | 0:a1734fe1ec4b | 893 | // Check if its time to send a keepAlive packet |
vpcola | 0:a1734fe1ec4b | 894 | if (hasConnectionTimedOut()) { |
vpcola | 0:a1734fe1ec4b | 895 | // Queue the ping request so that other |
vpcola | 0:a1734fe1ec4b | 896 | // pending operations queued above will go first |
vpcola | 0:a1734fe1ec4b | 897 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
vpcola | 0:a1734fe1ec4b | 898 | } |
vpcola | 0:a1734fe1ec4b | 899 | |
vpcola | 0:a1734fe1ec4b | 900 | // Check if we have messages on the message queue |
vpcola | 0:a1734fe1ec4b | 901 | osEvent evt = mqueue.get(10); |
vpcola | 0:a1734fe1ec4b | 902 | if (evt.status == osEventMessage) { |
vpcola | 0:a1734fe1ec4b | 903 | |
vpcola | 0:a1734fe1ec4b | 904 | DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); |
vpcola | 0:a1734fe1ec4b | 905 | |
vpcola | 0:a1734fe1ec4b | 906 | // Unpack the message |
vpcola | 0:a1734fe1ec4b | 907 | PubMessage * message = (PubMessage *)evt.value.p; |
vpcola | 0:a1734fe1ec4b | 908 | |
vpcola | 0:a1734fe1ec4b | 909 | // Send the packet, do not queue the call |
vpcola | 0:a1734fe1ec4b | 910 | // like the ping above .. |
vpcola | 0:a1734fe1ec4b | 911 | if ( sendPublish(*message) == SUCCESS) { |
vpcola | 0:a1734fe1ec4b | 912 | // Reset timers if we have been able to send successfully |
vpcola | 0:a1734fe1ec4b | 913 | resetConnectionTimer(); |
vpcola | 0:a1734fe1ec4b | 914 | } else { |
vpcola | 0:a1734fe1ec4b | 915 | // Disconnected? |
vpcola | 0:a1734fe1ec4b | 916 | goto reconnect; |
vpcola | 0:a1734fe1ec4b | 917 | } |
vpcola | 0:a1734fe1ec4b | 918 | |
vpcola | 0:a1734fe1ec4b | 919 | // Free the message from mempool after using |
vpcola | 0:a1734fe1ec4b | 920 | mpool.free(message); |
vpcola | 0:a1734fe1ec4b | 921 | } |
vpcola | 0:a1734fe1ec4b | 922 | |
vpcola | 0:a1734fe1ec4b | 923 | // Dispatch any queued events ... |
vpcola | 0:a1734fe1ec4b | 924 | queue.dispatch(100); |
vpcola | 0:a1734fe1ec4b | 925 | } // end while loop |
vpcola | 0:a1734fe1ec4b | 926 | |
vpcola | 0:a1734fe1ec4b | 927 | reconnect: |
vpcola | 0:a1734fe1ec4b | 928 | // reconnect? |
vpcola | 0:a1734fe1ec4b | 929 | DBG("Client disconnected!! ... retrying ...\r\n"); |
vpcola | 0:a1734fe1ec4b | 930 | disconnect(); |
vpcola | 0:a1734fe1ec4b | 931 | |
vpcola | 0:a1734fe1ec4b | 932 | }; |
vpcola | 0:a1734fe1ec4b | 933 | } |
vpcola | 0:a1734fe1ec4b | 934 | |
vpcola | 0:a1734fe1ec4b | 935 | void MQTTThreadedClient::stopListener() |
vpcola | 0:a1734fe1ec4b | 936 | { |
vpcola | 0:a1734fe1ec4b | 937 | // TODO: Set a signal/flag that the running thread |
vpcola | 0:a1734fe1ec4b | 938 | // will check if its ok to stop ... |
vpcola | 0:a1734fe1ec4b | 939 | } |
vpcola | 0:a1734fe1ec4b | 940 | |
vpcola | 0:a1734fe1ec4b | 941 | } |