A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.cpp@33:38e2e7bf91eb, 2017-04-02 (annotated)
- Committer:
- vpcola
- Date:
- Sun Apr 02 09:11:53 2017 +0800
- Revision:
- 33:38e2e7bf91eb
- Parent:
- 30:b2aed80037db
Updated easy-connect library
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
vpcola | 23:06fac173529e | 1 | #include "mbed.h" |
vpcola | 23:06fac173529e | 2 | #include "rtos.h" |
vpcola | 23:06fac173529e | 3 | #include "MQTTThreadedClient.h" |
vpcola | 25:326f00faa092 | 4 | #include "mbedtls/platform.h" |
vpcola | 25:326f00faa092 | 5 | #include "mbedtls/ssl.h" |
vpcola | 25:326f00faa092 | 6 | #include "mbedtls/entropy.h" |
vpcola | 25:326f00faa092 | 7 | #include "mbedtls/ctr_drbg.h" |
vpcola | 25:326f00faa092 | 8 | #include "mbedtls/error.h" |
vpcola | 23:06fac173529e | 9 | |
vpcola | 30:b2aed80037db | 10 | static MemoryPool<MQTT::PubMessage, 8> mpool; |
vpcola | 30:b2aed80037db | 11 | static Queue<MQTT::PubMessage, 8> mqueue; |
vpcola | 23:06fac173529e | 12 | |
vpcola | 25:326f00faa092 | 13 | // SSL/TLS variables |
vpcola | 25:326f00faa092 | 14 | mbedtls_entropy_context _entropy; |
vpcola | 25:326f00faa092 | 15 | mbedtls_ctr_drbg_context _ctr_drbg; |
vpcola | 25:326f00faa092 | 16 | mbedtls_x509_crt _cacert; |
vpcola | 25:326f00faa092 | 17 | mbedtls_ssl_context _ssl; |
vpcola | 25:326f00faa092 | 18 | mbedtls_ssl_config _ssl_conf; |
vpcola | 25:326f00faa092 | 19 | mbedtls_ssl_session saved_session; |
vpcola | 25:326f00faa092 | 20 | |
vpcola | 30:b2aed80037db | 21 | namespace MQTT { |
vpcola | 30:b2aed80037db | 22 | |
vpcola | 25:326f00faa092 | 23 | /** |
vpcola | 25:326f00faa092 | 24 | * Receive callback for mbed TLS |
vpcola | 25:326f00faa092 | 25 | */ |
vpcola | 25:326f00faa092 | 26 | static int ssl_recv(void *ctx, unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 27 | { |
vpcola | 25:326f00faa092 | 28 | int recv = -1; |
vpcola | 25:326f00faa092 | 29 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 30 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 31 | recv = socket->recv(buf, len); |
vpcola | 25:326f00faa092 | 32 | |
vpcola | 25:326f00faa092 | 33 | if (NSAPI_ERROR_WOULD_BLOCK == recv) { |
vpcola | 25:326f00faa092 | 34 | return MBEDTLS_ERR_SSL_WANT_READ; |
vpcola | 25:326f00faa092 | 35 | } else if (recv < 0) { |
vpcola | 25:326f00faa092 | 36 | return -1; |
vpcola | 25:326f00faa092 | 37 | } else { |
vpcola | 25:326f00faa092 | 38 | return recv; |
vpcola | 25:326f00faa092 | 39 | } |
vpcola | 25:326f00faa092 | 40 | } |
vpcola | 25:326f00faa092 | 41 | |
vpcola | 25:326f00faa092 | 42 | /** |
vpcola | 25:326f00faa092 | 43 | * Send callback for mbed TLS |
vpcola | 25:326f00faa092 | 44 | */ |
vpcola | 25:326f00faa092 | 45 | static int ssl_send(void *ctx, const unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 46 | { |
vpcola | 25:326f00faa092 | 47 | int sent = -1; |
vpcola | 25:326f00faa092 | 48 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 49 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 50 | sent = socket->send(buf, len); |
vpcola | 25:326f00faa092 | 51 | |
vpcola | 25:326f00faa092 | 52 | if(NSAPI_ERROR_WOULD_BLOCK == sent) { |
vpcola | 25:326f00faa092 | 53 | return MBEDTLS_ERR_SSL_WANT_WRITE; |
vpcola | 25:326f00faa092 | 54 | } else if (sent < 0) { |
vpcola | 25:326f00faa092 | 55 | return -1; |
vpcola | 25:326f00faa092 | 56 | } else { |
vpcola | 25:326f00faa092 | 57 | return sent; |
vpcola | 25:326f00faa092 | 58 | } |
vpcola | 25:326f00faa092 | 59 | } |
vpcola | 25:326f00faa092 | 60 | |
vpcola | 25:326f00faa092 | 61 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 62 | /** |
vpcola | 25:326f00faa092 | 63 | * Debug callback for mbed TLS |
vpcola | 25:326f00faa092 | 64 | * Just prints on the USB serial port |
vpcola | 25:326f00faa092 | 65 | */ |
vpcola | 25:326f00faa092 | 66 | static void my_debug(void *ctx, int level, const char *file, int line, |
vpcola | 25:326f00faa092 | 67 | const char *str) |
vpcola | 25:326f00faa092 | 68 | { |
vpcola | 25:326f00faa092 | 69 | const char *p, *basename; |
vpcola | 25:326f00faa092 | 70 | (void) ctx; |
vpcola | 25:326f00faa092 | 71 | |
vpcola | 25:326f00faa092 | 72 | /* Extract basename from file */ |
vpcola | 25:326f00faa092 | 73 | for(p = basename = file; *p != '\0'; p++) { |
vpcola | 25:326f00faa092 | 74 | if(*p == '/' || *p == '\\') { |
vpcola | 25:326f00faa092 | 75 | basename = p + 1; |
vpcola | 25:326f00faa092 | 76 | } |
vpcola | 25:326f00faa092 | 77 | } |
vpcola | 25:326f00faa092 | 78 | |
vpcola | 25:326f00faa092 | 79 | if (_debug) { |
vpcola | 25:326f00faa092 | 80 | mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); |
vpcola | 25:326f00faa092 | 81 | } |
vpcola | 25:326f00faa092 | 82 | } |
vpcola | 25:326f00faa092 | 83 | |
vpcola | 25:326f00faa092 | 84 | /** |
vpcola | 25:326f00faa092 | 85 | * Certificate verification callback for mbed TLS |
vpcola | 25:326f00faa092 | 86 | * Here we only use it to display information on each cert in the chain |
vpcola | 25:326f00faa092 | 87 | */ |
vpcola | 25:326f00faa092 | 88 | static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) |
vpcola | 25:326f00faa092 | 89 | { |
vpcola | 25:326f00faa092 | 90 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 91 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 92 | (void) data; |
vpcola | 25:326f00faa092 | 93 | |
vpcola | 26:4b21de8043a5 | 94 | if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); |
vpcola | 25:326f00faa092 | 95 | mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); |
vpcola | 25:326f00faa092 | 96 | if (_debug) mbedtls_printf("%s", buf); |
vpcola | 25:326f00faa092 | 97 | |
vpcola | 25:326f00faa092 | 98 | if (*flags == 0) |
vpcola | 26:4b21de8043a5 | 99 | if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); |
vpcola | 25:326f00faa092 | 100 | else { |
vpcola | 25:326f00faa092 | 101 | mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); |
vpcola | 25:326f00faa092 | 102 | if (_debug) mbedtls_printf("%s\n", buf); |
vpcola | 25:326f00faa092 | 103 | } |
vpcola | 25:326f00faa092 | 104 | |
vpcola | 25:326f00faa092 | 105 | delete[] buf; |
vpcola | 25:326f00faa092 | 106 | return 0; |
vpcola | 25:326f00faa092 | 107 | } |
vpcola | 25:326f00faa092 | 108 | #endif |
vpcola | 25:326f00faa092 | 109 | |
vpcola | 25:326f00faa092 | 110 | |
vpcola | 25:326f00faa092 | 111 | void MQTTThreadedClient::setupTLS() |
vpcola | 25:326f00faa092 | 112 | { |
vpcola | 26:4b21de8043a5 | 113 | if (useTLS) |
vpcola | 25:326f00faa092 | 114 | { |
vpcola | 25:326f00faa092 | 115 | mbedtls_entropy_init(&_entropy); |
vpcola | 25:326f00faa092 | 116 | mbedtls_ctr_drbg_init(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 117 | mbedtls_x509_crt_init(&_cacert); |
vpcola | 25:326f00faa092 | 118 | mbedtls_ssl_init(&_ssl); |
vpcola | 25:326f00faa092 | 119 | mbedtls_ssl_config_init(&_ssl_conf); |
vpcola | 25:326f00faa092 | 120 | memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); |
vpcola | 25:326f00faa092 | 121 | } |
vpcola | 25:326f00faa092 | 122 | } |
vpcola | 25:326f00faa092 | 123 | |
vpcola | 25:326f00faa092 | 124 | void MQTTThreadedClient::freeTLS() |
vpcola | 25:326f00faa092 | 125 | { |
vpcola | 26:4b21de8043a5 | 126 | if (useTLS) |
vpcola | 25:326f00faa092 | 127 | { |
vpcola | 25:326f00faa092 | 128 | mbedtls_entropy_free(&_entropy); |
vpcola | 25:326f00faa092 | 129 | mbedtls_ctr_drbg_free(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 130 | mbedtls_x509_crt_free(&_cacert); |
vpcola | 25:326f00faa092 | 131 | mbedtls_ssl_free(&_ssl); |
vpcola | 25:326f00faa092 | 132 | mbedtls_ssl_config_free(&_ssl_conf); |
vpcola | 25:326f00faa092 | 133 | } |
vpcola | 25:326f00faa092 | 134 | } |
vpcola | 25:326f00faa092 | 135 | |
vpcola | 25:326f00faa092 | 136 | int MQTTThreadedClient::initTLS() |
vpcola | 25:326f00faa092 | 137 | { |
vpcola | 25:326f00faa092 | 138 | int ret; |
vpcola | 25:326f00faa092 | 139 | |
vpcola | 30:b2aed80037db | 140 | DBG("Initializing TLS ...\r\n"); |
vpcola | 30:b2aed80037db | 141 | DBG("mbedtls_ctr_drdbg_seed ...\r\n"); |
vpcola | 25:326f00faa092 | 142 | if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, |
vpcola | 25:326f00faa092 | 143 | (const unsigned char *) DRBG_PERS, |
vpcola | 25:326f00faa092 | 144 | sizeof (DRBG_PERS))) != 0) { |
vpcola | 26:4b21de8043a5 | 145 | mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 146 | _error = ret; |
vpcola | 25:326f00faa092 | 147 | return -1; |
vpcola | 25:326f00faa092 | 148 | } |
vpcola | 30:b2aed80037db | 149 | DBG("mbedtls_x509_crt_parse ...\r\n"); |
vpcola | 25:326f00faa092 | 150 | if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, |
vpcola | 25:326f00faa092 | 151 | strlen(ssl_ca_pem) + 1)) != 0) { |
vpcola | 26:4b21de8043a5 | 152 | mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 153 | _error = ret; |
vpcola | 25:326f00faa092 | 154 | return -1; |
vpcola | 25:326f00faa092 | 155 | } |
vpcola | 25:326f00faa092 | 156 | |
vpcola | 30:b2aed80037db | 157 | DBG("mbedtls_ssl_config_defaults ...\r\n"); |
vpcola | 25:326f00faa092 | 158 | if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, |
vpcola | 25:326f00faa092 | 159 | MBEDTLS_SSL_IS_CLIENT, |
vpcola | 25:326f00faa092 | 160 | MBEDTLS_SSL_TRANSPORT_STREAM, |
vpcola | 25:326f00faa092 | 161 | MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { |
vpcola | 26:4b21de8043a5 | 162 | mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 163 | _error = ret; |
vpcola | 25:326f00faa092 | 164 | return -1; |
vpcola | 25:326f00faa092 | 165 | } |
vpcola | 25:326f00faa092 | 166 | |
vpcola | 30:b2aed80037db | 167 | DBG("mbedtls_ssl_config_ca_chain ...\r\n"); |
vpcola | 25:326f00faa092 | 168 | mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); |
vpcola | 30:b2aed80037db | 169 | DBG("mbedtls_ssl_conf_rng ...\r\n"); |
vpcola | 25:326f00faa092 | 170 | mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); |
vpcola | 25:326f00faa092 | 171 | |
vpcola | 25:326f00faa092 | 172 | /* It is possible to disable authentication by passing |
vpcola | 25:326f00faa092 | 173 | * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() |
vpcola | 25:326f00faa092 | 174 | */ |
vpcola | 30:b2aed80037db | 175 | DBG("mbedtls_ssl_conf_authmode ...\r\n"); |
vpcola | 25:326f00faa092 | 176 | mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); |
vpcola | 25:326f00faa092 | 177 | |
vpcola | 25:326f00faa092 | 178 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 179 | mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); |
vpcola | 25:326f00faa092 | 180 | mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); |
vpcola | 25:326f00faa092 | 181 | mbedtls_debug_set_threshold(DEBUG_LEVEL); |
vpcola | 25:326f00faa092 | 182 | #endif |
vpcola | 25:326f00faa092 | 183 | |
vpcola | 30:b2aed80037db | 184 | DBG("mbedtls_ssl_setup ...\r\n"); |
vpcola | 25:326f00faa092 | 185 | if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { |
vpcola | 26:4b21de8043a5 | 186 | mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 187 | _error = ret; |
vpcola | 25:326f00faa092 | 188 | return -1; |
vpcola | 25:326f00faa092 | 189 | } |
vpcola | 25:326f00faa092 | 190 | |
vpcola | 25:326f00faa092 | 191 | return 0; |
vpcola | 25:326f00faa092 | 192 | } |
vpcola | 25:326f00faa092 | 193 | |
vpcola | 25:326f00faa092 | 194 | int MQTTThreadedClient::doTLSHandshake() |
vpcola | 25:326f00faa092 | 195 | { |
vpcola | 25:326f00faa092 | 196 | int ret; |
vpcola | 25:326f00faa092 | 197 | |
vpcola | 25:326f00faa092 | 198 | /* Start the handshake, the rest will be done in onReceive() */ |
vpcola |
33:38e2e7bf91eb | 199 | printf("Starting the TLS handshake...\r\n"); |
vpcola | 25:326f00faa092 | 200 | ret = mbedtls_ssl_handshake(&_ssl); |
vpcola | 25:326f00faa092 | 201 | if (ret < 0) |
vpcola | 25:326f00faa092 | 202 | { |
vpcola | 25:326f00faa092 | 203 | if (ret != MBEDTLS_ERR_SSL_WANT_READ && |
vpcola | 25:326f00faa092 | 204 | ret != MBEDTLS_ERR_SSL_WANT_WRITE) |
vpcola | 26:4b21de8043a5 | 205 | mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 206 | else |
vpcola | 25:326f00faa092 | 207 | { |
vpcola | 25:326f00faa092 | 208 | // do not close the socket if timed out |
vpcola | 26:4b21de8043a5 | 209 | ret = TIMEOUT; |
vpcola | 25:326f00faa092 | 210 | } |
vpcola | 26:4b21de8043a5 | 211 | return ret; |
vpcola | 25:326f00faa092 | 212 | } |
vpcola | 25:326f00faa092 | 213 | |
vpcola | 25:326f00faa092 | 214 | /* Handshake done, time to print info */ |
vpcola |
33:38e2e7bf91eb | 215 | printf("TLS connection to %s:%d established\r\n", |
vpcola | 25:326f00faa092 | 216 | host.c_str(), port); |
vpcola | 25:326f00faa092 | 217 | |
vpcola | 25:326f00faa092 | 218 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 219 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 220 | mbedtls_x509_crt_info(buf, buf_size, "\r ", |
vpcola | 25:326f00faa092 | 221 | mbedtls_ssl_get_peer_cert(&_ssl)); |
vpcola | 25:326f00faa092 | 222 | |
vpcola |
33:38e2e7bf91eb | 223 | printf("Server certificate:\r\n%s\r", buf); |
vpcola | 25:326f00faa092 | 224 | // Verify server cert ... |
vpcola | 25:326f00faa092 | 225 | uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); |
vpcola | 25:326f00faa092 | 226 | if( flags != 0 ) |
vpcola | 25:326f00faa092 | 227 | { |
vpcola | 25:326f00faa092 | 228 | mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); |
vpcola |
33:38e2e7bf91eb | 229 | printf("Certificate verification failed:\r\n%s\r\r\n", buf); |
vpcola | 25:326f00faa092 | 230 | // free server cert ... before error return |
vpcola | 25:326f00faa092 | 231 | delete [] buf; |
vpcola | 25:326f00faa092 | 232 | return -1; |
vpcola | 25:326f00faa092 | 233 | } |
vpcola | 25:326f00faa092 | 234 | |
vpcola |
33:38e2e7bf91eb | 235 | printf("Certificate verification passed\r\n\r\n"); |
vpcola | 25:326f00faa092 | 236 | // delete server cert after verification |
vpcola | 25:326f00faa092 | 237 | delete [] buf; |
vpcola | 25:326f00faa092 | 238 | |
vpcola | 30:b2aed80037db | 239 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola |
33:38e2e7bf91eb | 240 | printf("Saving SSL/TLS session ...\r\n"); |
vpcola | 25:326f00faa092 | 241 | // TODO: Save the session here for reconnect. |
vpcola | 25:326f00faa092 | 242 | if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) |
vpcola | 25:326f00faa092 | 243 | { |
vpcola | 26:4b21de8043a5 | 244 | mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); |
vpcola | 26:4b21de8043a5 | 245 | hasSavedSession = false; |
vpcola | 25:326f00faa092 | 246 | return -1; |
vpcola | 25:326f00faa092 | 247 | } |
vpcola |
33:38e2e7bf91eb | 248 | printf("Session saved for reconnect ...\r\n"); |
vpcola | 30:b2aed80037db | 249 | #endif |
vpcola |
33:38e2e7bf91eb | 250 | |
vpcola | 26:4b21de8043a5 | 251 | hasSavedSession = true; |
vpcola | 26:4b21de8043a5 | 252 | |
vpcola | 25:326f00faa092 | 253 | return 0; |
vpcola | 25:326f00faa092 | 254 | } |
vpcola | 25:326f00faa092 | 255 | |
vpcola | 23:06fac173529e | 256 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 257 | { |
vpcola | 23:06fac173529e | 258 | int rc; |
vpcola | 25:326f00faa092 | 259 | |
vpcola | 23:06fac173529e | 260 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 261 | return -1; |
vpcola | 25:326f00faa092 | 262 | |
vpcola | 26:4b21de8043a5 | 263 | if (useTLS) |
vpcola | 25:326f00faa092 | 264 | { |
vpcola | 25:326f00faa092 | 265 | // Do SSL/TLS read |
vpcola | 25:326f00faa092 | 266 | rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 267 | if (MBEDTLS_ERR_SSL_WANT_READ == rc) |
vpcola | 25:326f00faa092 | 268 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 269 | else |
vpcola | 25:326f00faa092 | 270 | return rc; |
vpcola | 25:326f00faa092 | 271 | } else { |
vpcola | 25:326f00faa092 | 272 | // non-blocking socket ... |
vpcola | 25:326f00faa092 | 273 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 274 | rc = tcpSocket->recv( (void *) buffer, size); |
vpcola | 25:326f00faa092 | 275 | |
vpcola | 25:326f00faa092 | 276 | // return 0 bytes if timeout ... |
vpcola | 25:326f00faa092 | 277 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 278 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 279 | else |
vpcola | 25:326f00faa092 | 280 | return rc; // return the number of bytes received or error |
vpcola | 25:326f00faa092 | 281 | } |
vpcola | 23:06fac173529e | 282 | } |
vpcola | 23:06fac173529e | 283 | |
vpcola | 23:06fac173529e | 284 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 285 | { |
vpcola | 23:06fac173529e | 286 | int rc; |
vpcola | 23:06fac173529e | 287 | |
vpcola | 23:06fac173529e | 288 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 289 | return -1; |
vpcola | 23:06fac173529e | 290 | |
vpcola | 26:4b21de8043a5 | 291 | if (useTLS) { |
vpcola | 25:326f00faa092 | 292 | // Do SSL/TLS write |
vpcola | 25:326f00faa092 | 293 | rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 294 | if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) |
vpcola | 25:326f00faa092 | 295 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 296 | else |
vpcola | 25:326f00faa092 | 297 | return rc; |
vpcola | 25:326f00faa092 | 298 | } else { |
vpcola | 25:326f00faa092 | 299 | |
vpcola | 25:326f00faa092 | 300 | // set the write timeout |
vpcola | 25:326f00faa092 | 301 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 302 | rc = tcpSocket->send(buffer, size); |
vpcola | 25:326f00faa092 | 303 | |
vpcola | 25:326f00faa092 | 304 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 305 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 306 | else |
vpcola | 25:326f00faa092 | 307 | return rc; |
vpcola | 25:326f00faa092 | 308 | } |
vpcola | 23:06fac173529e | 309 | } |
vpcola | 23:06fac173529e | 310 | |
vpcola | 23:06fac173529e | 311 | int MQTTThreadedClient::readPacketLength(int* value) |
vpcola | 23:06fac173529e | 312 | { |
vpcola | 23:06fac173529e | 313 | int rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 314 | unsigned char c; |
vpcola | 23:06fac173529e | 315 | int multiplier = 1; |
vpcola | 23:06fac173529e | 316 | int len = 0; |
vpcola | 23:06fac173529e | 317 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
vpcola | 23:06fac173529e | 318 | |
vpcola | 23:06fac173529e | 319 | *value = 0; |
vpcola | 23:06fac173529e | 320 | do |
vpcola | 23:06fac173529e | 321 | { |
vpcola | 23:06fac173529e | 322 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
vpcola | 23:06fac173529e | 323 | { |
vpcola | 23:06fac173529e | 324 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
vpcola | 23:06fac173529e | 325 | goto exit; |
vpcola | 23:06fac173529e | 326 | } |
vpcola | 23:06fac173529e | 327 | |
vpcola | 23:06fac173529e | 328 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 329 | if (rc != 1) |
vpcola | 23:06fac173529e | 330 | { |
vpcola | 23:06fac173529e | 331 | rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 332 | goto exit; |
vpcola | 23:06fac173529e | 333 | } |
vpcola | 23:06fac173529e | 334 | |
vpcola | 23:06fac173529e | 335 | *value += (c & 127) * multiplier; |
vpcola | 23:06fac173529e | 336 | multiplier *= 128; |
vpcola | 23:06fac173529e | 337 | } while ((c & 128) != 0); |
vpcola | 23:06fac173529e | 338 | |
vpcola | 23:06fac173529e | 339 | rc = MQTTPACKET_READ_COMPLETE; |
vpcola | 23:06fac173529e | 340 | |
vpcola | 23:06fac173529e | 341 | exit: |
vpcola | 23:06fac173529e | 342 | if (rc == MQTTPACKET_READ_ERROR ) |
vpcola | 23:06fac173529e | 343 | len = -1; |
vpcola | 23:06fac173529e | 344 | |
vpcola | 23:06fac173529e | 345 | return len; |
vpcola | 23:06fac173529e | 346 | } |
vpcola | 23:06fac173529e | 347 | |
vpcola | 23:06fac173529e | 348 | int MQTTThreadedClient::sendPacket(size_t length) |
vpcola | 23:06fac173529e | 349 | { |
vpcola | 23:06fac173529e | 350 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 351 | int sent = 0; |
vpcola | 23:06fac173529e | 352 | |
vpcola | 23:06fac173529e | 353 | while (sent < length) |
vpcola | 23:06fac173529e | 354 | { |
vpcola | 23:06fac173529e | 355 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 356 | if (rc < 0) // there was an error writing the data |
vpcola | 23:06fac173529e | 357 | break; |
vpcola | 23:06fac173529e | 358 | sent += rc; |
vpcola | 23:06fac173529e | 359 | } |
vpcola | 23:06fac173529e | 360 | |
vpcola | 23:06fac173529e | 361 | if (sent == length) |
vpcola | 23:06fac173529e | 362 | rc = SUCCESS; |
vpcola | 23:06fac173529e | 363 | else |
vpcola | 23:06fac173529e | 364 | rc = FAILURE; |
vpcola | 23:06fac173529e | 365 | |
vpcola | 23:06fac173529e | 366 | return rc; |
vpcola | 23:06fac173529e | 367 | } |
vpcola | 23:06fac173529e | 368 | /** |
vpcola | 23:06fac173529e | 369 | * Reads the entire packet to readbuf and returns |
vpcola | 23:06fac173529e | 370 | * the type of packet when successful, otherwise |
vpcola | 23:06fac173529e | 371 | * a negative error code is returned. |
vpcola | 23:06fac173529e | 372 | **/ |
vpcola | 23:06fac173529e | 373 | int MQTTThreadedClient::readPacket() |
vpcola | 23:06fac173529e | 374 | { |
vpcola | 23:06fac173529e | 375 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 376 | MQTTHeader header = {0}; |
vpcola | 23:06fac173529e | 377 | int len = 0; |
vpcola | 23:06fac173529e | 378 | int rem_len = 0; |
vpcola | 23:06fac173529e | 379 | |
vpcola | 23:06fac173529e | 380 | /* 1. read the header byte. This has the packet type in it */ |
vpcola | 23:06fac173529e | 381 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
vpcola | 23:06fac173529e | 382 | goto exit; |
vpcola | 23:06fac173529e | 383 | |
vpcola | 23:06fac173529e | 384 | len = 1; |
vpcola | 23:06fac173529e | 385 | /* 2. read the remaining length. This is variable in itself */ |
vpcola | 23:06fac173529e | 386 | if ( readPacketLength(&rem_len) < 0 ) |
vpcola | 23:06fac173529e | 387 | goto exit; |
vpcola | 23:06fac173529e | 388 | |
vpcola | 23:06fac173529e | 389 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
vpcola | 23:06fac173529e | 390 | |
vpcola | 23:06fac173529e | 391 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
vpcola | 23:06fac173529e | 392 | { |
vpcola | 23:06fac173529e | 393 | rc = BUFFER_OVERFLOW; |
vpcola | 23:06fac173529e | 394 | goto exit; |
vpcola | 23:06fac173529e | 395 | } |
vpcola | 23:06fac173529e | 396 | |
vpcola | 23:06fac173529e | 397 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
vpcola | 23:06fac173529e | 398 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
vpcola | 23:06fac173529e | 399 | goto exit; |
vpcola | 23:06fac173529e | 400 | |
vpcola | 23:06fac173529e | 401 | // Convert the header to type |
vpcola | 23:06fac173529e | 402 | // and update rc |
vpcola | 23:06fac173529e | 403 | header.byte = readbuf[0]; |
vpcola | 23:06fac173529e | 404 | rc = header.bits.type; |
vpcola | 23:06fac173529e | 405 | |
vpcola | 23:06fac173529e | 406 | exit: |
vpcola | 23:06fac173529e | 407 | |
vpcola | 23:06fac173529e | 408 | return rc; |
vpcola | 23:06fac173529e | 409 | } |
vpcola | 23:06fac173529e | 410 | |
vpcola | 23:06fac173529e | 411 | /** |
vpcola | 23:06fac173529e | 412 | * Read until a specified packet type is received, or untill the specified |
vpcola | 23:06fac173529e | 413 | * timeout dropping packets along the way. |
vpcola | 23:06fac173529e | 414 | **/ |
vpcola | 23:06fac173529e | 415 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
vpcola | 23:06fac173529e | 416 | { |
vpcola | 23:06fac173529e | 417 | int pType = FAILURE; |
vpcola | 23:06fac173529e | 418 | Timer timer; |
vpcola | 23:06fac173529e | 419 | |
vpcola | 23:06fac173529e | 420 | timer.start(); |
vpcola | 23:06fac173529e | 421 | do { |
vpcola | 23:06fac173529e | 422 | pType = readPacket(); |
vpcola | 23:06fac173529e | 423 | if (pType < 0) |
vpcola | 23:06fac173529e | 424 | break; |
vpcola | 23:06fac173529e | 425 | |
vpcola | 23:06fac173529e | 426 | if (timer.read_ms() > timeout) |
vpcola | 23:06fac173529e | 427 | { |
vpcola | 23:06fac173529e | 428 | pType = FAILURE; |
vpcola | 23:06fac173529e | 429 | break; |
vpcola | 23:06fac173529e | 430 | } |
vpcola | 23:06fac173529e | 431 | }while(pType != packetType); |
vpcola | 23:06fac173529e | 432 | |
vpcola | 23:06fac173529e | 433 | return pType; |
vpcola | 23:06fac173529e | 434 | } |
vpcola | 23:06fac173529e | 435 | |
vpcola | 23:06fac173529e | 436 | |
vpcola | 26:4b21de8043a5 | 437 | int MQTTThreadedClient::login() |
vpcola | 23:06fac173529e | 438 | { |
vpcola | 23:06fac173529e | 439 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 440 | int len = 0; |
vpcola | 23:06fac173529e | 441 | |
vpcola | 26:4b21de8043a5 | 442 | if (!isConnected) |
vpcola | 23:06fac173529e | 443 | { |
vpcola | 30:b2aed80037db | 444 | DBG("Session not connected! \r\n"); |
vpcola | 23:06fac173529e | 445 | return rc; |
vpcola | 23:06fac173529e | 446 | } |
vpcola | 23:06fac173529e | 447 | |
vpcola | 23:06fac173529e | 448 | // Copy the keepAliveInterval value to local |
vpcola | 23:06fac173529e | 449 | // MQTT specifies in seconds, we have to multiply that |
vpcola | 23:06fac173529e | 450 | // amount for our 32 bit timers which accepts ms. |
vpcola | 26:4b21de8043a5 | 451 | keepAliveInterval = (connect_options.keepAliveInterval * 1000); |
vpcola | 23:06fac173529e | 452 | |
vpcola | 30:b2aed80037db | 453 | DBG("Login with: \r\n"); |
vpcola | 30:b2aed80037db | 454 | DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); |
vpcola | 30:b2aed80037db | 455 | DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); |
vpcola | 23:06fac173529e | 456 | |
vpcola | 26:4b21de8043a5 | 457 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) |
vpcola | 23:06fac173529e | 458 | { |
vpcola | 30:b2aed80037db | 459 | DBG("Error serializing connect packet ...\r\n"); |
vpcola | 23:06fac173529e | 460 | return rc; |
vpcola | 23:06fac173529e | 461 | } |
vpcola | 23:06fac173529e | 462 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
vpcola | 23:06fac173529e | 463 | { |
vpcola | 30:b2aed80037db | 464 | DBG("Error sending the connect request packet ...\r\n"); |
vpcola | 23:06fac173529e | 465 | return rc; |
vpcola | 23:06fac173529e | 466 | } |
vpcola | 23:06fac173529e | 467 | |
vpcola | 23:06fac173529e | 468 | // Wait for the CONNACK |
vpcola | 23:06fac173529e | 469 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
vpcola | 23:06fac173529e | 470 | { |
vpcola | 23:06fac173529e | 471 | unsigned char connack_rc = 255; |
vpcola | 23:06fac173529e | 472 | bool sessionPresent = false; |
vpcola | 30:b2aed80037db | 473 | DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); |
vpcola | 23:06fac173529e | 474 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 23:06fac173529e | 475 | rc = connack_rc; |
vpcola | 23:06fac173529e | 476 | else |
vpcola | 23:06fac173529e | 477 | rc = FAILURE; |
vpcola | 23:06fac173529e | 478 | } |
vpcola | 23:06fac173529e | 479 | else |
vpcola | 23:06fac173529e | 480 | rc = FAILURE; |
vpcola | 23:06fac173529e | 481 | |
vpcola | 23:06fac173529e | 482 | if (rc == SUCCESS) |
vpcola | 23:06fac173529e | 483 | { |
vpcola | 30:b2aed80037db | 484 | DBG("Connected!!! ... starting connection timers ...\r\n"); |
vpcola | 23:06fac173529e | 485 | resetConnectionTimer(); |
vpcola | 23:06fac173529e | 486 | } |
vpcola | 23:06fac173529e | 487 | |
vpcola | 30:b2aed80037db | 488 | DBG("Returning with rc = %d\r\n", rc); |
vpcola | 23:06fac173529e | 489 | |
vpcola | 23:06fac173529e | 490 | return rc; |
vpcola | 23:06fac173529e | 491 | } |
vpcola | 23:06fac173529e | 492 | |
vpcola | 25:326f00faa092 | 493 | |
vpcola | 25:326f00faa092 | 494 | void MQTTThreadedClient::disconnect() |
vpcola | 25:326f00faa092 | 495 | { |
vpcola | 25:326f00faa092 | 496 | if (isConnected) |
vpcola | 25:326f00faa092 | 497 | { |
vpcola | 26:4b21de8043a5 | 498 | if( useTLS |
vpcola | 26:4b21de8043a5 | 499 | && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) |
vpcola | 26:4b21de8043a5 | 500 | ) |
vpcola | 26:4b21de8043a5 | 501 | { |
vpcola | 30:b2aed80037db | 502 | DBG( "Session reset returned an error \r\n"); |
vpcola | 26:4b21de8043a5 | 503 | } |
vpcola | 25:326f00faa092 | 504 | |
vpcola | 25:326f00faa092 | 505 | isConnected = false; |
vpcola | 25:326f00faa092 | 506 | tcpSocket->close(); |
vpcola | 25:326f00faa092 | 507 | } |
vpcola | 25:326f00faa092 | 508 | } |
vpcola | 25:326f00faa092 | 509 | |
vpcola | 26:4b21de8043a5 | 510 | int MQTTThreadedClient::connect() |
vpcola | 23:06fac173529e | 511 | { |
vpcola | 25:326f00faa092 | 512 | int ret = FAILURE; |
vpcola | 26:4b21de8043a5 | 513 | |
vpcola | 26:4b21de8043a5 | 514 | if ((network == NULL) || (tcpSocket == NULL) |
vpcola | 26:4b21de8043a5 | 515 | || host.empty()) |
vpcola | 26:4b21de8043a5 | 516 | { |
vpcola | 30:b2aed80037db | 517 | DBG("Network settings not set! \r\n"); |
vpcola | 26:4b21de8043a5 | 518 | return ret; |
vpcola | 26:4b21de8043a5 | 519 | } |
vpcola | 25:326f00faa092 | 520 | |
vpcola | 26:4b21de8043a5 | 521 | if (useTLS) |
vpcola | 26:4b21de8043a5 | 522 | { |
vpcola | 26:4b21de8043a5 | 523 | if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { |
vpcola | 26:4b21de8043a5 | 524 | mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); |
vpcola | 26:4b21de8043a5 | 525 | return ret; |
vpcola | 26:4b21de8043a5 | 526 | } |
vpcola | 30:b2aed80037db | 527 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola | 26:4b21de8043a5 | 528 | if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { |
vpcola | 26:4b21de8043a5 | 529 | mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); |
vpcola | 26:4b21de8043a5 | 530 | return ret; |
vpcola | 26:4b21de8043a5 | 531 | } |
vpcola | 30:b2aed80037db | 532 | #endif |
vpcola | 26:4b21de8043a5 | 533 | } |
vpcola | 26:4b21de8043a5 | 534 | |
vpcola | 23:06fac173529e | 535 | tcpSocket->open(network); |
vpcola | 26:4b21de8043a5 | 536 | if (useTLS) |
vpcola | 25:326f00faa092 | 537 | { |
vpcola | 30:b2aed80037db | 538 | DBG("mbedtls_ssl_set_hostname ...\r\n"); |
vpcola | 25:326f00faa092 | 539 | mbedtls_ssl_set_hostname(&_ssl, host.c_str()); |
vpcola | 30:b2aed80037db | 540 | DBG("mbedtls_ssl_set_bio ...\r\n"); |
vpcola | 25:326f00faa092 | 541 | mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), |
vpcola | 25:326f00faa092 | 542 | ssl_send, ssl_recv, NULL ); |
vpcola | 25:326f00faa092 | 543 | } |
vpcola | 25:326f00faa092 | 544 | |
vpcola | 25:326f00faa092 | 545 | if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) |
vpcola | 23:06fac173529e | 546 | { |
vpcola | 30:b2aed80037db | 547 | DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); |
vpcola | 23:06fac173529e | 548 | return ret; |
vpcola | 26:4b21de8043a5 | 549 | }else |
vpcola | 26:4b21de8043a5 | 550 | isConnected = true; |
vpcola | 25:326f00faa092 | 551 | |
vpcola | 26:4b21de8043a5 | 552 | if (useTLS) |
vpcola | 25:326f00faa092 | 553 | { |
vpcola | 26:4b21de8043a5 | 554 | |
vpcola | 26:4b21de8043a5 | 555 | if (doTLSHandshake() < 0) |
vpcola | 26:4b21de8043a5 | 556 | { |
vpcola | 30:b2aed80037db | 557 | DBG("TLS Handshake failed! \r\n"); |
vpcola | 26:4b21de8043a5 | 558 | return FAILURE; |
vpcola | 26:4b21de8043a5 | 559 | }else |
vpcola | 30:b2aed80037db | 560 | DBG("TLS Handshake complete!! \r\n"); |
vpcola | 25:326f00faa092 | 561 | } |
vpcola | 25:326f00faa092 | 562 | |
vpcola | 26:4b21de8043a5 | 563 | return login(); |
vpcola | 26:4b21de8043a5 | 564 | } |
vpcola | 26:4b21de8043a5 | 565 | |
vpcola | 26:4b21de8043a5 | 566 | void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) |
vpcola | 26:4b21de8043a5 | 567 | { |
vpcola | 26:4b21de8043a5 | 568 | // Copy the settings for reconnection |
vpcola | 26:4b21de8043a5 | 569 | host = chost; |
vpcola | 26:4b21de8043a5 | 570 | port = cport; |
vpcola | 26:4b21de8043a5 | 571 | connect_options = options; |
vpcola | 23:06fac173529e | 572 | } |
vpcola | 23:06fac173529e | 573 | |
vpcola | 23:06fac173529e | 574 | int MQTTThreadedClient::publish(PubMessage& msg) |
vpcola | 23:06fac173529e | 575 | { |
vpcola | 23:06fac173529e | 576 | #if 0 |
vpcola | 23:06fac173529e | 577 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
vpcola | 23:06fac173529e | 578 | // TODO: handle id values when the function is called later |
vpcola | 23:06fac173529e | 579 | if (id == 0) |
vpcola | 23:06fac173529e | 580 | return FAILURE; |
vpcola | 23:06fac173529e | 581 | else |
vpcola | 23:06fac173529e | 582 | return SUCCESS; |
vpcola | 23:06fac173529e | 583 | #endif |
vpcola | 23:06fac173529e | 584 | PubMessage *message = mpool.alloc(); |
vpcola | 23:06fac173529e | 585 | // Simple copy |
vpcola | 23:06fac173529e | 586 | *message = msg; |
vpcola | 23:06fac173529e | 587 | |
vpcola | 23:06fac173529e | 588 | // Push the data to the thread |
vpcola |
33:38e2e7bf91eb | 589 | DBG("Pushing data to consumer thread ...\r\n"); |
vpcola | 23:06fac173529e | 590 | mqueue.put(message); |
vpcola | 23:06fac173529e | 591 | |
vpcola | 23:06fac173529e | 592 | return SUCCESS; |
vpcola | 23:06fac173529e | 593 | } |
vpcola | 23:06fac173529e | 594 | |
vpcola | 23:06fac173529e | 595 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
vpcola | 23:06fac173529e | 596 | { |
vpcola | 23:06fac173529e | 597 | MQTTString topicString = MQTTString_initializer; |
vpcola | 23:06fac173529e | 598 | |
vpcola | 23:06fac173529e | 599 | if (!isConnected) |
vpcola | 23:06fac173529e | 600 | { |
vpcola |
33:38e2e7bf91eb | 601 | DBG("Not connected!!! ...\r\n"); |
vpcola | 23:06fac173529e | 602 | return FAILURE; |
vpcola | 23:06fac173529e | 603 | } |
vpcola | 23:06fac173529e | 604 | |
vpcola | 23:06fac173529e | 605 | topicString.cstring = (char*) &message.topic[0]; |
vpcola | 23:06fac173529e | 606 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
vpcola | 23:06fac173529e | 607 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
vpcola | 23:06fac173529e | 608 | if (len <= 0) |
vpcola | 23:06fac173529e | 609 | { |
vpcola |
33:38e2e7bf91eb | 610 | DBG("Failed serializing message ...\r\n"); |
vpcola | 23:06fac173529e | 611 | return FAILURE; |
vpcola | 23:06fac173529e | 612 | } |
vpcola | 23:06fac173529e | 613 | |
vpcola | 23:06fac173529e | 614 | if (sendPacket(len) == SUCCESS) |
vpcola | 23:06fac173529e | 615 | { |
vpcola |
33:38e2e7bf91eb | 616 | DBG("Successfully sent publish packet to server ...\r\n"); |
vpcola | 23:06fac173529e | 617 | return SUCCESS; |
vpcola | 23:06fac173529e | 618 | } |
vpcola | 23:06fac173529e | 619 | |
vpcola |
33:38e2e7bf91eb | 620 | DBG("Failed to send publish packet to server ...\r\n"); |
vpcola | 23:06fac173529e | 621 | return FAILURE; |
vpcola | 23:06fac173529e | 622 | } |
vpcola | 25:326f00faa092 | 623 | |
vpcola | 25:326f00faa092 | 624 | void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) |
vpcola | 25:326f00faa092 | 625 | { |
vpcola | 25:326f00faa092 | 626 | // Push the subscription into the map ... |
vpcola | 25:326f00faa092 | 627 | FP<void,MessageData &> fp; |
vpcola | 25:326f00faa092 | 628 | fp.attach(function); |
vpcola | 23:06fac173529e | 629 | |
vpcola | 26:4b21de8043a5 | 630 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
vpcola | 25:326f00faa092 | 631 | } |
vpcola | 25:326f00faa092 | 632 | |
vpcola | 25:326f00faa092 | 633 | int MQTTThreadedClient::processSubscriptions() |
vpcola | 25:326f00faa092 | 634 | { |
vpcola | 25:326f00faa092 | 635 | int numsubscribed = 0; |
vpcola | 25:326f00faa092 | 636 | |
vpcola | 25:326f00faa092 | 637 | if (!isConnected) |
vpcola | 25:326f00faa092 | 638 | { |
vpcola | 30:b2aed80037db | 639 | DBG("Session not connected!!\r\n"); |
vpcola | 25:326f00faa092 | 640 | return 0; |
vpcola | 25:326f00faa092 | 641 | } |
vpcola | 25:326f00faa092 | 642 | |
vpcola | 30:b2aed80037db | 643 | DBG("Processing subscribed topics ....\r\n"); |
vpcola | 26:4b21de8043a5 | 644 | |
vpcola | 25:326f00faa092 | 645 | std::map<std::string, FP<void, MessageData &> >::iterator it; |
vpcola | 26:4b21de8043a5 | 646 | for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) |
vpcola | 25:326f00faa092 | 647 | { |
vpcola | 25:326f00faa092 | 648 | int rc = FAILURE; |
vpcola | 25:326f00faa092 | 649 | int len = 0; |
vpcola | 25:326f00faa092 | 650 | //TODO: We only subscribe to QoS = 0 for now |
vpcola | 25:326f00faa092 | 651 | QoS qos = QOS0; |
vpcola | 25:326f00faa092 | 652 | |
vpcola | 25:326f00faa092 | 653 | MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; |
vpcola | 30:b2aed80037db | 654 | DBG("Subscribing to topic [%s]\r\n", topic.cstring); |
vpcola | 25:326f00faa092 | 655 | |
vpcola | 25:326f00faa092 | 656 | |
vpcola | 25:326f00faa092 | 657 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
vpcola | 25:326f00faa092 | 658 | if (len <= 0) { |
vpcola | 30:b2aed80037db | 659 | DBG("Error serializing subscribe packet ...\r\n"); |
vpcola | 25:326f00faa092 | 660 | continue; |
vpcola | 25:326f00faa092 | 661 | } |
vpcola | 25:326f00faa092 | 662 | |
vpcola | 25:326f00faa092 | 663 | if ((rc = sendPacket(len)) != SUCCESS) { |
vpcola | 30:b2aed80037db | 664 | DBG("Error sending subscribe packet [%d]\r\n", rc); |
vpcola | 25:326f00faa092 | 665 | continue; |
vpcola | 25:326f00faa092 | 666 | } |
vpcola | 25:326f00faa092 | 667 | |
vpcola | 30:b2aed80037db | 668 | DBG("Waiting for subscription ack ...\r\n"); |
vpcola | 25:326f00faa092 | 669 | // Wait for SUBACK, dropping packets read along the way ... |
vpcola | 25:326f00faa092 | 670 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
vpcola | 25:326f00faa092 | 671 | int count = 0, grantedQoS = -1; |
vpcola | 25:326f00faa092 | 672 | unsigned short mypacketid; |
vpcola | 25:326f00faa092 | 673 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 25:326f00faa092 | 674 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
vpcola | 25:326f00faa092 | 675 | // For as long as we do not get 0x80 .. |
vpcola | 25:326f00faa092 | 676 | if (rc != 0x80) |
vpcola | 25:326f00faa092 | 677 | { |
vpcola | 25:326f00faa092 | 678 | // Reset connection timers here ... |
vpcola | 25:326f00faa092 | 679 | resetConnectionTimer(); |
vpcola | 30:b2aed80037db | 680 | DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 681 | numsubscribed++; |
vpcola | 25:326f00faa092 | 682 | } else { |
vpcola | 30:b2aed80037db | 683 | DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 684 | } |
vpcola | 25:326f00faa092 | 685 | } else |
vpcola | 30:b2aed80037db | 686 | DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 687 | } // end for loop |
vpcola | 25:326f00faa092 | 688 | |
vpcola | 25:326f00faa092 | 689 | return numsubscribed; |
vpcola | 25:326f00faa092 | 690 | } |
vpcola | 25:326f00faa092 | 691 | |
vpcola | 23:06fac173529e | 692 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
vpcola | 23:06fac173529e | 693 | { |
vpcola | 23:06fac173529e | 694 | char* curf = topicFilter; |
vpcola | 23:06fac173529e | 695 | char* curn = topicName.lenstring.data; |
vpcola | 23:06fac173529e | 696 | char* curn_end = curn + topicName.lenstring.len; |
vpcola | 23:06fac173529e | 697 | |
vpcola | 23:06fac173529e | 698 | while (*curf && curn < curn_end) |
vpcola | 23:06fac173529e | 699 | { |
vpcola | 23:06fac173529e | 700 | if (*curn == '/' && *curf != '/') |
vpcola | 23:06fac173529e | 701 | break; |
vpcola | 23:06fac173529e | 702 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
vpcola | 23:06fac173529e | 703 | break; |
vpcola | 23:06fac173529e | 704 | if (*curf == '+') |
vpcola | 23:06fac173529e | 705 | { // skip until we meet the next separator, or end of string |
vpcola | 23:06fac173529e | 706 | char* nextpos = curn + 1; |
vpcola | 23:06fac173529e | 707 | while (nextpos < curn_end && *nextpos != '/') |
vpcola | 23:06fac173529e | 708 | nextpos = ++curn + 1; |
vpcola | 23:06fac173529e | 709 | } |
vpcola | 23:06fac173529e | 710 | else if (*curf == '#') |
vpcola | 23:06fac173529e | 711 | curn = curn_end - 1; // skip until end of string |
vpcola | 23:06fac173529e | 712 | curf++; |
vpcola | 23:06fac173529e | 713 | curn++; |
vpcola | 23:06fac173529e | 714 | }; |
vpcola | 23:06fac173529e | 715 | |
vpcola | 23:06fac173529e | 716 | return (curn == curn_end) && (*curf == '\0'); |
vpcola | 23:06fac173529e | 717 | } |
vpcola | 23:06fac173529e | 718 | |
vpcola | 23:06fac173529e | 719 | int MQTTThreadedClient::handlePublishMsg() |
vpcola | 23:06fac173529e | 720 | { |
vpcola | 23:06fac173529e | 721 | MQTTString topicName = MQTTString_initializer; |
vpcola | 23:06fac173529e | 722 | Message msg; |
vpcola | 23:06fac173529e | 723 | int intQoS; |
vpcola |
33:38e2e7bf91eb | 724 | DBG("Deserializing publish message ...\r\n"); |
vpcola | 23:06fac173529e | 725 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
vpcola | 23:06fac173529e | 726 | &intQoS, |
vpcola | 23:06fac173529e | 727 | (unsigned char*)&msg.retained, |
vpcola | 23:06fac173529e | 728 | (unsigned short*)&msg.id, |
vpcola | 23:06fac173529e | 729 | &topicName, |
vpcola | 23:06fac173529e | 730 | (unsigned char**)&msg.payload, |
vpcola | 23:06fac173529e | 731 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
vpcola | 23:06fac173529e | 732 | { |
vpcola |
33:38e2e7bf91eb | 733 | DBG("Error deserializing published message ...\r\n"); |
vpcola | 23:06fac173529e | 734 | return -1; |
vpcola | 23:06fac173529e | 735 | } |
vpcola | 23:06fac173529e | 736 | |
vpcola | 23:06fac173529e | 737 | std::string topic; |
vpcola | 23:06fac173529e | 738 | if (topicName.lenstring.len > 0) |
vpcola | 23:06fac173529e | 739 | { |
vpcola | 23:06fac173529e | 740 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
vpcola | 23:06fac173529e | 741 | }else |
vpcola | 23:06fac173529e | 742 | topic = (const char *) topicName.cstring; |
vpcola | 23:06fac173529e | 743 | |
vpcola |
33:38e2e7bf91eb | 744 | DBG("Got message for topic [%s], QoS [%d] ...\r\n", topic.c_str(), intQoS); |
vpcola | 23:06fac173529e | 745 | |
vpcola | 23:06fac173529e | 746 | msg.qos = (QoS) intQoS; |
vpcola | 23:06fac173529e | 747 | |
vpcola | 23:06fac173529e | 748 | |
vpcola | 23:06fac173529e | 749 | // Call the handlers for each topic |
vpcola | 23:06fac173529e | 750 | if (topicCBMap.find(topic) != topicCBMap.end()) |
vpcola | 23:06fac173529e | 751 | { |
vpcola | 23:06fac173529e | 752 | // Call the callback function |
vpcola | 23:06fac173529e | 753 | if (topicCBMap[topic].attached()) |
vpcola | 23:06fac173529e | 754 | { |
vpcola |
33:38e2e7bf91eb | 755 | DBG("Invoking function handler for topic ...\r\n"); |
vpcola | 23:06fac173529e | 756 | MessageData md(topicName, msg); |
vpcola | 23:06fac173529e | 757 | topicCBMap[topic](md); |
vpcola | 23:06fac173529e | 758 | |
vpcola | 23:06fac173529e | 759 | return 1; |
vpcola | 23:06fac173529e | 760 | } |
vpcola | 23:06fac173529e | 761 | } |
vpcola | 23:06fac173529e | 762 | |
vpcola | 23:06fac173529e | 763 | // TODO: depending on the QoS |
vpcola | 23:06fac173529e | 764 | // we send data to the server = PUBACK or PUBREC |
vpcola | 23:06fac173529e | 765 | switch(intQoS) |
vpcola | 23:06fac173529e | 766 | { |
vpcola | 23:06fac173529e | 767 | case QOS0: |
vpcola | 23:06fac173529e | 768 | // We send back nothing ... |
vpcola | 23:06fac173529e | 769 | break; |
vpcola | 23:06fac173529e | 770 | case QOS1: |
vpcola | 23:06fac173529e | 771 | // TODO: implement |
vpcola | 23:06fac173529e | 772 | break; |
vpcola | 23:06fac173529e | 773 | case QOS2: |
vpcola | 23:06fac173529e | 774 | // TODO: implement |
vpcola | 23:06fac173529e | 775 | break; |
vpcola | 23:06fac173529e | 776 | default: |
vpcola | 23:06fac173529e | 777 | break; |
vpcola | 23:06fac173529e | 778 | } |
vpcola | 23:06fac173529e | 779 | |
vpcola | 23:06fac173529e | 780 | return 0; |
vpcola | 23:06fac173529e | 781 | } |
vpcola | 23:06fac173529e | 782 | |
vpcola | 23:06fac173529e | 783 | void MQTTThreadedClient::resetConnectionTimer() |
vpcola | 23:06fac173529e | 784 | { |
vpcola | 23:06fac173529e | 785 | if (keepAliveInterval > 0) |
vpcola | 23:06fac173529e | 786 | { |
vpcola | 23:06fac173529e | 787 | comTimer.reset(); |
vpcola | 23:06fac173529e | 788 | comTimer.start(); |
vpcola | 23:06fac173529e | 789 | } |
vpcola | 23:06fac173529e | 790 | } |
vpcola | 23:06fac173529e | 791 | |
vpcola | 23:06fac173529e | 792 | bool MQTTThreadedClient::hasConnectionTimedOut() |
vpcola | 23:06fac173529e | 793 | { |
vpcola | 23:06fac173529e | 794 | if (keepAliveInterval > 0 ) { |
vpcola | 23:06fac173529e | 795 | // Check connection timer |
vpcola | 23:06fac173529e | 796 | if (comTimer.read_ms() > keepAliveInterval) |
vpcola | 23:06fac173529e | 797 | return true; |
vpcola | 23:06fac173529e | 798 | else |
vpcola | 23:06fac173529e | 799 | return false; |
vpcola | 23:06fac173529e | 800 | } |
vpcola | 23:06fac173529e | 801 | |
vpcola | 23:06fac173529e | 802 | return false; |
vpcola | 23:06fac173529e | 803 | } |
vpcola | 23:06fac173529e | 804 | |
vpcola | 23:06fac173529e | 805 | void MQTTThreadedClient::sendPingRequest() |
vpcola | 23:06fac173529e | 806 | { |
vpcola | 23:06fac173529e | 807 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
vpcola | 23:06fac173529e | 808 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
vpcola | 23:06fac173529e | 809 | { |
vpcola |
33:38e2e7bf91eb | 810 | DBG("Ping request sent successfully ...\r\n"); |
vpcola | 23:06fac173529e | 811 | } |
vpcola | 23:06fac173529e | 812 | } |
vpcola | 23:06fac173529e | 813 | |
vpcola | 23:06fac173529e | 814 | void MQTTThreadedClient::startListener() |
vpcola | 23:06fac173529e | 815 | { |
vpcola | 23:06fac173529e | 816 | int pType; |
vpcola | 26:4b21de8043a5 | 817 | int numsubs; |
vpcola | 23:06fac173529e | 818 | // Continuesly listens for packets and dispatch |
vpcola | 23:06fac173529e | 819 | // message handlers ... |
vpcola | 26:4b21de8043a5 | 820 | if (useTLS) |
vpcola | 26:4b21de8043a5 | 821 | { |
vpcola | 26:4b21de8043a5 | 822 | initTLS(); |
vpcola | 26:4b21de8043a5 | 823 | } |
vpcola | 26:4b21de8043a5 | 824 | |
vpcola | 26:4b21de8043a5 | 825 | while(true) |
vpcola | 26:4b21de8043a5 | 826 | { |
vpcola | 26:4b21de8043a5 | 827 | |
vpcola | 26:4b21de8043a5 | 828 | // Attempt to reconnect and login |
vpcola | 26:4b21de8043a5 | 829 | if ( connect() < 0 ) |
vpcola | 23:06fac173529e | 830 | { |
vpcola | 26:4b21de8043a5 | 831 | disconnect(); |
vpcola | 26:4b21de8043a5 | 832 | // Wait for a few secs and reconnect ... |
vpcola | 26:4b21de8043a5 | 833 | Thread::wait(6000); |
vpcola | 26:4b21de8043a5 | 834 | continue; |
vpcola | 23:06fac173529e | 835 | } |
vpcola | 23:06fac173529e | 836 | |
vpcola | 26:4b21de8043a5 | 837 | numsubs = processSubscriptions(); |
vpcola | 30:b2aed80037db | 838 | DBG("Subscribed %d topics ...\r\n", numsubs); |
vpcola | 26:4b21de8043a5 | 839 | |
vpcola | 26:4b21de8043a5 | 840 | // loop read |
vpcola | 26:4b21de8043a5 | 841 | while(true) |
vpcola | 26:4b21de8043a5 | 842 | { |
vpcola | 25:326f00faa092 | 843 | pType = readPacket(); |
vpcola | 26:4b21de8043a5 | 844 | switch(pType) |
vpcola | 26:4b21de8043a5 | 845 | { |
vpcola | 25:326f00faa092 | 846 | case TIMEOUT: |
vpcola | 25:326f00faa092 | 847 | // No data available from the network ... |
vpcola | 25:326f00faa092 | 848 | break; |
vpcola | 25:326f00faa092 | 849 | case FAILURE: |
vpcola | 26:4b21de8043a5 | 850 | { |
vpcola | 30:b2aed80037db | 851 | DBG("readPacket returned failure \r\n"); |
vpcola | 26:4b21de8043a5 | 852 | goto reconnect; |
vpcola | 26:4b21de8043a5 | 853 | } |
vpcola | 25:326f00faa092 | 854 | case BUFFER_OVERFLOW: |
vpcola | 25:326f00faa092 | 855 | { |
vpcola | 25:326f00faa092 | 856 | // TODO: Network error, do we disconnect and reconnect? |
vpcola |
33:38e2e7bf91eb | 857 | DBG("Failure or buffer overflow problem ... \r\n"); |
vpcola | 25:326f00faa092 | 858 | MBED_ASSERT(false); |
vpcola | 25:326f00faa092 | 859 | } |
vpcola | 25:326f00faa092 | 860 | break; |
vpcola | 25:326f00faa092 | 861 | /** |
vpcola | 25:326f00faa092 | 862 | * The rest of the return codes below (all positive) is about MQTT |
vpcola | 25:326f00faa092 | 863 | * response codes |
vpcola | 25:326f00faa092 | 864 | **/ |
vpcola | 25:326f00faa092 | 865 | case CONNACK: |
vpcola | 25:326f00faa092 | 866 | case PUBACK: |
vpcola | 25:326f00faa092 | 867 | case SUBACK: |
vpcola | 25:326f00faa092 | 868 | break; |
vpcola | 25:326f00faa092 | 869 | case PUBLISH: |
vpcola | 25:326f00faa092 | 870 | { |
vpcola |
33:38e2e7bf91eb | 871 | DBG("Publish received!....\r\n"); |
vpcola | 25:326f00faa092 | 872 | // We receive data from the MQTT server .. |
vpcola | 25:326f00faa092 | 873 | if (handlePublishMsg() < 0) { |
vpcola |
33:38e2e7bf91eb | 874 | DBG("Error handling PUBLISH message ... \r\n"); |
vpcola | 25:326f00faa092 | 875 | break; |
vpcola | 25:326f00faa092 | 876 | } |
vpcola | 25:326f00faa092 | 877 | } |
vpcola | 25:326f00faa092 | 878 | break; |
vpcola | 25:326f00faa092 | 879 | case PINGRESP: |
vpcola | 25:326f00faa092 | 880 | { |
vpcola |
33:38e2e7bf91eb | 881 | DBG("Got ping response ...\r\n"); |
vpcola | 25:326f00faa092 | 882 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 883 | } |
vpcola | 25:326f00faa092 | 884 | break; |
vpcola | 25:326f00faa092 | 885 | default: |
vpcola |
33:38e2e7bf91eb | 886 | DBG("Unknown/Not handled message from server pType[%d]\r\n", pType); |
vpcola | 25:326f00faa092 | 887 | } |
vpcola | 25:326f00faa092 | 888 | |
vpcola | 25:326f00faa092 | 889 | // Check if its time to send a keepAlive packet |
vpcola | 25:326f00faa092 | 890 | if (hasConnectionTimedOut()) { |
vpcola | 25:326f00faa092 | 891 | // Queue the ping request so that other |
vpcola | 25:326f00faa092 | 892 | // pending operations queued above will go first |
vpcola | 25:326f00faa092 | 893 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
vpcola | 25:326f00faa092 | 894 | } |
vpcola | 25:326f00faa092 | 895 | |
vpcola | 25:326f00faa092 | 896 | // Check if we have messages on the message queue |
vpcola | 25:326f00faa092 | 897 | osEvent evt = mqueue.get(10); |
vpcola | 25:326f00faa092 | 898 | if (evt.status == osEventMessage) { |
vpcola | 25:326f00faa092 | 899 | |
vpcola |
33:38e2e7bf91eb | 900 | DBG("Got message to publish! ... \r\n"); |
vpcola | 25:326f00faa092 | 901 | |
vpcola | 25:326f00faa092 | 902 | // Unpack the message |
vpcola | 25:326f00faa092 | 903 | PubMessage * message = (PubMessage *)evt.value.p; |
vpcola | 25:326f00faa092 | 904 | |
vpcola | 25:326f00faa092 | 905 | // Send the packet, do not queue the call |
vpcola | 25:326f00faa092 | 906 | // like the ping above .. |
vpcola | 25:326f00faa092 | 907 | if ( sendPublish(*message) == SUCCESS) { |
vpcola | 25:326f00faa092 | 908 | // Reset timers if we have been able to send successfully |
vpcola | 25:326f00faa092 | 909 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 910 | } else { |
vpcola | 25:326f00faa092 | 911 | // Disconnected? |
vpcola | 25:326f00faa092 | 912 | goto reconnect; |
vpcola | 25:326f00faa092 | 913 | } |
vpcola | 25:326f00faa092 | 914 | |
vpcola | 25:326f00faa092 | 915 | // Free the message from mempool after using |
vpcola | 25:326f00faa092 | 916 | mpool.free(message); |
vpcola | 25:326f00faa092 | 917 | } |
vpcola | 25:326f00faa092 | 918 | |
vpcola | 25:326f00faa092 | 919 | // Dispatch any queued events ... |
vpcola | 25:326f00faa092 | 920 | queue.dispatch(100); |
vpcola | 25:326f00faa092 | 921 | } // end while loop |
vpcola | 25:326f00faa092 | 922 | |
vpcola | 25:326f00faa092 | 923 | reconnect: |
vpcola | 26:4b21de8043a5 | 924 | // reconnect? |
vpcola | 30:b2aed80037db | 925 | DBG("Client disconnected!! ... retrying ...\r\n"); |
vpcola | 26:4b21de8043a5 | 926 | disconnect(); |
vpcola | 26:4b21de8043a5 | 927 | |
vpcola | 26:4b21de8043a5 | 928 | }; |
vpcola | 23:06fac173529e | 929 | } |
vpcola | 26:4b21de8043a5 | 930 | |
vpcola | 26:4b21de8043a5 | 931 | void MQTTThreadedClient::stopListener() |
vpcola | 26:4b21de8043a5 | 932 | { |
vpcola | 26:4b21de8043a5 | 933 | // TODO: Set a signal/flag that the running thread |
vpcola | 26:4b21de8043a5 | 934 | // will check if its ok to stop ... |
vpcola | 30:b2aed80037db | 935 | } |
vpcola | 30:b2aed80037db | 936 | |
vpcola | 26:4b21de8043a5 | 937 | } |