A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.cpp@25:326f00faa092, 2017-03-27 (annotated)
- Committer:
- vpcola
- Date:
- Mon Mar 27 03:53:18 2017 +0000
- Revision:
- 25:326f00faa092
- Parent:
- 23:06fac173529e
- Child:
- 26:4b21de8043a5
Added SSL/TLS code
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
vpcola | 23:06fac173529e | 1 | #include "mbed.h" |
vpcola | 23:06fac173529e | 2 | #include "rtos.h" |
vpcola | 23:06fac173529e | 3 | #include "MQTTThreadedClient.h" |
vpcola | 25:326f00faa092 | 4 | #include "mbedtls/platform.h" |
vpcola | 25:326f00faa092 | 5 | #include "mbedtls/ssl.h" |
vpcola | 25:326f00faa092 | 6 | #include "mbedtls/entropy.h" |
vpcola | 25:326f00faa092 | 7 | #include "mbedtls/ctr_drbg.h" |
vpcola | 25:326f00faa092 | 8 | #include "mbedtls/error.h" |
vpcola | 23:06fac173529e | 9 | |
vpcola | 23:06fac173529e | 10 | static MemoryPool<PubMessage, 16> mpool; |
vpcola | 23:06fac173529e | 11 | static Queue<PubMessage, 16> mqueue; |
vpcola | 23:06fac173529e | 12 | |
vpcola | 25:326f00faa092 | 13 | // SSL/TLS variables |
vpcola | 25:326f00faa092 | 14 | mbedtls_entropy_context _entropy; |
vpcola | 25:326f00faa092 | 15 | mbedtls_ctr_drbg_context _ctr_drbg; |
vpcola | 25:326f00faa092 | 16 | mbedtls_x509_crt _cacert; |
vpcola | 25:326f00faa092 | 17 | mbedtls_ssl_context _ssl; |
vpcola | 25:326f00faa092 | 18 | mbedtls_ssl_config _ssl_conf; |
vpcola | 25:326f00faa092 | 19 | mbedtls_ssl_session saved_session; |
vpcola | 25:326f00faa092 | 20 | |
vpcola | 25:326f00faa092 | 21 | /** |
vpcola | 25:326f00faa092 | 22 | * Receive callback for mbed TLS |
vpcola | 25:326f00faa092 | 23 | */ |
vpcola | 25:326f00faa092 | 24 | static int ssl_recv(void *ctx, unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 25 | { |
vpcola | 25:326f00faa092 | 26 | int recv = -1; |
vpcola | 25:326f00faa092 | 27 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 28 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 29 | recv = socket->recv(buf, len); |
vpcola | 25:326f00faa092 | 30 | |
vpcola | 25:326f00faa092 | 31 | if (NSAPI_ERROR_WOULD_BLOCK == recv) { |
vpcola | 25:326f00faa092 | 32 | return MBEDTLS_ERR_SSL_WANT_READ; |
vpcola | 25:326f00faa092 | 33 | } else if (recv < 0) { |
vpcola | 25:326f00faa092 | 34 | return -1; |
vpcola | 25:326f00faa092 | 35 | } else { |
vpcola | 25:326f00faa092 | 36 | return recv; |
vpcola | 25:326f00faa092 | 37 | } |
vpcola | 25:326f00faa092 | 38 | } |
vpcola | 25:326f00faa092 | 39 | |
vpcola | 25:326f00faa092 | 40 | /** |
vpcola | 25:326f00faa092 | 41 | * Send callback for mbed TLS |
vpcola | 25:326f00faa092 | 42 | */ |
vpcola | 25:326f00faa092 | 43 | static int ssl_send(void *ctx, const unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 44 | { |
vpcola | 25:326f00faa092 | 45 | int sent = -1; |
vpcola | 25:326f00faa092 | 46 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 47 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 48 | sent = socket->send(buf, len); |
vpcola | 25:326f00faa092 | 49 | |
vpcola | 25:326f00faa092 | 50 | if(NSAPI_ERROR_WOULD_BLOCK == sent) { |
vpcola | 25:326f00faa092 | 51 | return MBEDTLS_ERR_SSL_WANT_WRITE; |
vpcola | 25:326f00faa092 | 52 | } else if (sent < 0) { |
vpcola | 25:326f00faa092 | 53 | return -1; |
vpcola | 25:326f00faa092 | 54 | } else { |
vpcola | 25:326f00faa092 | 55 | return sent; |
vpcola | 25:326f00faa092 | 56 | } |
vpcola | 25:326f00faa092 | 57 | } |
vpcola | 25:326f00faa092 | 58 | |
vpcola | 25:326f00faa092 | 59 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 60 | /** |
vpcola | 25:326f00faa092 | 61 | * Debug callback for mbed TLS |
vpcola | 25:326f00faa092 | 62 | * Just prints on the USB serial port |
vpcola | 25:326f00faa092 | 63 | */ |
vpcola | 25:326f00faa092 | 64 | static void my_debug(void *ctx, int level, const char *file, int line, |
vpcola | 25:326f00faa092 | 65 | const char *str) |
vpcola | 25:326f00faa092 | 66 | { |
vpcola | 25:326f00faa092 | 67 | const char *p, *basename; |
vpcola | 25:326f00faa092 | 68 | (void) ctx; |
vpcola | 25:326f00faa092 | 69 | |
vpcola | 25:326f00faa092 | 70 | /* Extract basename from file */ |
vpcola | 25:326f00faa092 | 71 | for(p = basename = file; *p != '\0'; p++) { |
vpcola | 25:326f00faa092 | 72 | if(*p == '/' || *p == '\\') { |
vpcola | 25:326f00faa092 | 73 | basename = p + 1; |
vpcola | 25:326f00faa092 | 74 | } |
vpcola | 25:326f00faa092 | 75 | } |
vpcola | 25:326f00faa092 | 76 | |
vpcola | 25:326f00faa092 | 77 | if (_debug) { |
vpcola | 25:326f00faa092 | 78 | mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); |
vpcola | 25:326f00faa092 | 79 | } |
vpcola | 25:326f00faa092 | 80 | } |
vpcola | 25:326f00faa092 | 81 | |
vpcola | 25:326f00faa092 | 82 | /** |
vpcola | 25:326f00faa092 | 83 | * Certificate verification callback for mbed TLS |
vpcola | 25:326f00faa092 | 84 | * Here we only use it to display information on each cert in the chain |
vpcola | 25:326f00faa092 | 85 | */ |
vpcola | 25:326f00faa092 | 86 | static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) |
vpcola | 25:326f00faa092 | 87 | { |
vpcola | 25:326f00faa092 | 88 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 89 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 90 | (void) data; |
vpcola | 25:326f00faa092 | 91 | |
vpcola | 25:326f00faa092 | 92 | if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\n", depth); |
vpcola | 25:326f00faa092 | 93 | mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); |
vpcola | 25:326f00faa092 | 94 | if (_debug) mbedtls_printf("%s", buf); |
vpcola | 25:326f00faa092 | 95 | |
vpcola | 25:326f00faa092 | 96 | if (*flags == 0) |
vpcola | 25:326f00faa092 | 97 | if (_debug) mbedtls_printf("No verification issue for this certificate\n"); |
vpcola | 25:326f00faa092 | 98 | else { |
vpcola | 25:326f00faa092 | 99 | mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); |
vpcola | 25:326f00faa092 | 100 | if (_debug) mbedtls_printf("%s\n", buf); |
vpcola | 25:326f00faa092 | 101 | } |
vpcola | 25:326f00faa092 | 102 | |
vpcola | 25:326f00faa092 | 103 | delete[] buf; |
vpcola | 25:326f00faa092 | 104 | return 0; |
vpcola | 25:326f00faa092 | 105 | } |
vpcola | 25:326f00faa092 | 106 | #endif |
vpcola | 25:326f00faa092 | 107 | |
vpcola | 25:326f00faa092 | 108 | |
vpcola | 25:326f00faa092 | 109 | void MQTTThreadedClient::setupTLS() |
vpcola | 25:326f00faa092 | 110 | { |
vpcola | 25:326f00faa092 | 111 | if (ssl_ca_pem != NULL) |
vpcola | 25:326f00faa092 | 112 | { |
vpcola | 25:326f00faa092 | 113 | mbedtls_entropy_init(&_entropy); |
vpcola | 25:326f00faa092 | 114 | mbedtls_ctr_drbg_init(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 115 | mbedtls_x509_crt_init(&_cacert); |
vpcola | 25:326f00faa092 | 116 | mbedtls_ssl_init(&_ssl); |
vpcola | 25:326f00faa092 | 117 | mbedtls_ssl_config_init(&_ssl_conf); |
vpcola | 25:326f00faa092 | 118 | memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); |
vpcola | 25:326f00faa092 | 119 | } |
vpcola | 25:326f00faa092 | 120 | } |
vpcola | 25:326f00faa092 | 121 | |
vpcola | 25:326f00faa092 | 122 | void MQTTThreadedClient::freeTLS() |
vpcola | 25:326f00faa092 | 123 | { |
vpcola | 25:326f00faa092 | 124 | if (ssl_ca_pem != NULL) |
vpcola | 25:326f00faa092 | 125 | { |
vpcola | 25:326f00faa092 | 126 | mbedtls_entropy_free(&_entropy); |
vpcola | 25:326f00faa092 | 127 | mbedtls_ctr_drbg_free(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 128 | mbedtls_x509_crt_free(&_cacert); |
vpcola | 25:326f00faa092 | 129 | mbedtls_ssl_free(&_ssl); |
vpcola | 25:326f00faa092 | 130 | mbedtls_ssl_config_free(&_ssl_conf); |
vpcola | 25:326f00faa092 | 131 | } |
vpcola | 25:326f00faa092 | 132 | } |
vpcola | 25:326f00faa092 | 133 | |
vpcola | 25:326f00faa092 | 134 | int MQTTThreadedClient::initTLS() |
vpcola | 25:326f00faa092 | 135 | { |
vpcola | 25:326f00faa092 | 136 | int ret; |
vpcola | 25:326f00faa092 | 137 | |
vpcola | 25:326f00faa092 | 138 | printf("Initializing TLS ...\r\n"); |
vpcola | 25:326f00faa092 | 139 | printf("mbedtls_ctr_drdbg_seed ...\r\n"); |
vpcola | 25:326f00faa092 | 140 | if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, |
vpcola | 25:326f00faa092 | 141 | (const unsigned char *) DRBG_PERS, |
vpcola | 25:326f00faa092 | 142 | sizeof (DRBG_PERS))) != 0) { |
vpcola | 25:326f00faa092 | 143 | printf("error [%d] mbedtls_crt_drbg_init", ret); |
vpcola | 25:326f00faa092 | 144 | _error = ret; |
vpcola | 25:326f00faa092 | 145 | return -1; |
vpcola | 25:326f00faa092 | 146 | } |
vpcola | 25:326f00faa092 | 147 | printf("mbedtls_x509_crt_parse ...\r\n"); |
vpcola | 25:326f00faa092 | 148 | if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, |
vpcola | 25:326f00faa092 | 149 | strlen(ssl_ca_pem) + 1)) != 0) { |
vpcola | 25:326f00faa092 | 150 | printf("error [%d] mbedtls_x509_crt_parse", ret); |
vpcola | 25:326f00faa092 | 151 | _error = ret; |
vpcola | 25:326f00faa092 | 152 | return -1; |
vpcola | 25:326f00faa092 | 153 | } |
vpcola | 25:326f00faa092 | 154 | |
vpcola | 25:326f00faa092 | 155 | printf("mbedtls_ssl_config_defaults ...\r\n"); |
vpcola | 25:326f00faa092 | 156 | if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, |
vpcola | 25:326f00faa092 | 157 | MBEDTLS_SSL_IS_CLIENT, |
vpcola | 25:326f00faa092 | 158 | MBEDTLS_SSL_TRANSPORT_STREAM, |
vpcola | 25:326f00faa092 | 159 | MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { |
vpcola | 25:326f00faa092 | 160 | printf("error [%d] mbedtls_ssl_config_defaults", ret); |
vpcola | 25:326f00faa092 | 161 | _error = ret; |
vpcola | 25:326f00faa092 | 162 | return -1; |
vpcola | 25:326f00faa092 | 163 | } |
vpcola | 25:326f00faa092 | 164 | |
vpcola | 25:326f00faa092 | 165 | printf("mbedtls_ssl_config_ca_chain ...\r\n"); |
vpcola | 25:326f00faa092 | 166 | mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); |
vpcola | 25:326f00faa092 | 167 | printf("mbedtls_ssl_conf_rng ...\r\n"); |
vpcola | 25:326f00faa092 | 168 | mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); |
vpcola | 25:326f00faa092 | 169 | |
vpcola | 25:326f00faa092 | 170 | /* It is possible to disable authentication by passing |
vpcola | 25:326f00faa092 | 171 | * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() |
vpcola | 25:326f00faa092 | 172 | */ |
vpcola | 25:326f00faa092 | 173 | printf("mbedtls_ssl_conf_authmode ...\r\n"); |
vpcola | 25:326f00faa092 | 174 | mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); |
vpcola | 25:326f00faa092 | 175 | |
vpcola | 25:326f00faa092 | 176 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 177 | mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); |
vpcola | 25:326f00faa092 | 178 | mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); |
vpcola | 25:326f00faa092 | 179 | mbedtls_debug_set_threshold(DEBUG_LEVEL); |
vpcola | 25:326f00faa092 | 180 | #endif |
vpcola | 25:326f00faa092 | 181 | |
vpcola | 25:326f00faa092 | 182 | printf("mbedtls_ssl_setup ...\r\n"); |
vpcola | 25:326f00faa092 | 183 | if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { |
vpcola | 25:326f00faa092 | 184 | printf("error [%d] mbedtls_ssl_setup", ret); |
vpcola | 25:326f00faa092 | 185 | _error = ret; |
vpcola | 25:326f00faa092 | 186 | return -1; |
vpcola | 25:326f00faa092 | 187 | } |
vpcola | 25:326f00faa092 | 188 | |
vpcola | 25:326f00faa092 | 189 | return 0; |
vpcola | 25:326f00faa092 | 190 | } |
vpcola | 25:326f00faa092 | 191 | |
vpcola | 25:326f00faa092 | 192 | int MQTTThreadedClient::doTLSHandshake() |
vpcola | 25:326f00faa092 | 193 | { |
vpcola | 25:326f00faa092 | 194 | int ret; |
vpcola | 25:326f00faa092 | 195 | |
vpcola | 25:326f00faa092 | 196 | /* Start the handshake, the rest will be done in onReceive() */ |
vpcola | 25:326f00faa092 | 197 | printf("Starting the TLS handshake...\r\n"); |
vpcola | 25:326f00faa092 | 198 | ret = mbedtls_ssl_handshake(&_ssl); |
vpcola | 25:326f00faa092 | 199 | if (ret < 0) |
vpcola | 25:326f00faa092 | 200 | { |
vpcola | 25:326f00faa092 | 201 | if (ret != MBEDTLS_ERR_SSL_WANT_READ && |
vpcola | 25:326f00faa092 | 202 | ret != MBEDTLS_ERR_SSL_WANT_WRITE) |
vpcola | 25:326f00faa092 | 203 | { |
vpcola | 25:326f00faa092 | 204 | printf("error [%d] mbedtls_ssl_handshake", ret); |
vpcola | 25:326f00faa092 | 205 | tcpSocket->close(); |
vpcola | 25:326f00faa092 | 206 | _error = -1; |
vpcola | 25:326f00faa092 | 207 | } |
vpcola | 25:326f00faa092 | 208 | else |
vpcola | 25:326f00faa092 | 209 | { |
vpcola | 25:326f00faa092 | 210 | // do not close the socket if timed out |
vpcola | 25:326f00faa092 | 211 | _error = ret; |
vpcola | 25:326f00faa092 | 212 | } |
vpcola | 25:326f00faa092 | 213 | return -1; |
vpcola | 25:326f00faa092 | 214 | } |
vpcola | 25:326f00faa092 | 215 | |
vpcola | 25:326f00faa092 | 216 | /* Handshake done, time to print info */ |
vpcola | 25:326f00faa092 | 217 | printf("TLS connection to %s:%d established\r\n", |
vpcola | 25:326f00faa092 | 218 | host.c_str(), port); |
vpcola | 25:326f00faa092 | 219 | |
vpcola | 25:326f00faa092 | 220 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 221 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 222 | mbedtls_x509_crt_info(buf, buf_size, "\r ", |
vpcola | 25:326f00faa092 | 223 | mbedtls_ssl_get_peer_cert(&_ssl)); |
vpcola | 25:326f00faa092 | 224 | |
vpcola | 25:326f00faa092 | 225 | printf("Server certificate:\r\n%s\r", buf); |
vpcola | 25:326f00faa092 | 226 | // Verify server cert ... |
vpcola | 25:326f00faa092 | 227 | uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); |
vpcola | 25:326f00faa092 | 228 | if( flags != 0 ) |
vpcola | 25:326f00faa092 | 229 | { |
vpcola | 25:326f00faa092 | 230 | mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); |
vpcola | 25:326f00faa092 | 231 | printf("Certificate verification failed:\r\n%s\r\r\n", buf); |
vpcola | 25:326f00faa092 | 232 | // free server cert ... before error return |
vpcola | 25:326f00faa092 | 233 | delete [] buf; |
vpcola | 25:326f00faa092 | 234 | return -1; |
vpcola | 25:326f00faa092 | 235 | } |
vpcola | 25:326f00faa092 | 236 | |
vpcola | 25:326f00faa092 | 237 | printf("Certificate verification passed\r\n\r\n"); |
vpcola | 25:326f00faa092 | 238 | // delete server cert after verification |
vpcola | 25:326f00faa092 | 239 | delete [] buf; |
vpcola | 25:326f00faa092 | 240 | |
vpcola | 25:326f00faa092 | 241 | // TODO: Save the session here for reconnect. |
vpcola | 25:326f00faa092 | 242 | if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) |
vpcola | 25:326f00faa092 | 243 | { |
vpcola | 25:326f00faa092 | 244 | printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); |
vpcola | 25:326f00faa092 | 245 | return -1; |
vpcola | 25:326f00faa092 | 246 | } |
vpcola | 25:326f00faa092 | 247 | |
vpcola | 25:326f00faa092 | 248 | printf("Session saved for reconnect ...\r\n"); |
vpcola | 25:326f00faa092 | 249 | return 0; |
vpcola | 25:326f00faa092 | 250 | } |
vpcola | 25:326f00faa092 | 251 | |
vpcola | 23:06fac173529e | 252 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 253 | { |
vpcola | 23:06fac173529e | 254 | int rc; |
vpcola | 25:326f00faa092 | 255 | |
vpcola | 23:06fac173529e | 256 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 257 | return -1; |
vpcola | 25:326f00faa092 | 258 | |
vpcola | 25:326f00faa092 | 259 | if (ssl_ca_pem != NULL) |
vpcola | 25:326f00faa092 | 260 | { |
vpcola | 25:326f00faa092 | 261 | // Do SSL/TLS read |
vpcola | 25:326f00faa092 | 262 | rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 263 | if (MBEDTLS_ERR_SSL_WANT_READ == rc) |
vpcola | 25:326f00faa092 | 264 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 265 | else |
vpcola | 25:326f00faa092 | 266 | return rc; |
vpcola | 25:326f00faa092 | 267 | } else { |
vpcola | 25:326f00faa092 | 268 | // non-blocking socket ... |
vpcola | 25:326f00faa092 | 269 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 270 | rc = tcpSocket->recv( (void *) buffer, size); |
vpcola | 25:326f00faa092 | 271 | |
vpcola | 25:326f00faa092 | 272 | // return 0 bytes if timeout ... |
vpcola | 25:326f00faa092 | 273 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 274 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 275 | else |
vpcola | 25:326f00faa092 | 276 | return rc; // return the number of bytes received or error |
vpcola | 25:326f00faa092 | 277 | } |
vpcola | 23:06fac173529e | 278 | } |
vpcola | 23:06fac173529e | 279 | |
vpcola | 23:06fac173529e | 280 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 281 | { |
vpcola | 23:06fac173529e | 282 | int rc; |
vpcola | 23:06fac173529e | 283 | |
vpcola | 23:06fac173529e | 284 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 285 | return -1; |
vpcola | 23:06fac173529e | 286 | |
vpcola | 25:326f00faa092 | 287 | if (ssl_ca_pem != NULL) { |
vpcola | 25:326f00faa092 | 288 | // Do SSL/TLS write |
vpcola | 25:326f00faa092 | 289 | rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 290 | if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) |
vpcola | 25:326f00faa092 | 291 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 292 | else |
vpcola | 25:326f00faa092 | 293 | return rc; |
vpcola | 25:326f00faa092 | 294 | } else { |
vpcola | 25:326f00faa092 | 295 | |
vpcola | 25:326f00faa092 | 296 | // set the write timeout |
vpcola | 25:326f00faa092 | 297 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 298 | rc = tcpSocket->send(buffer, size); |
vpcola | 25:326f00faa092 | 299 | |
vpcola | 25:326f00faa092 | 300 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 301 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 302 | else |
vpcola | 25:326f00faa092 | 303 | return rc; |
vpcola | 25:326f00faa092 | 304 | } |
vpcola | 23:06fac173529e | 305 | } |
vpcola | 23:06fac173529e | 306 | |
vpcola | 23:06fac173529e | 307 | int MQTTThreadedClient::readPacketLength(int* value) |
vpcola | 23:06fac173529e | 308 | { |
vpcola | 23:06fac173529e | 309 | int rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 310 | unsigned char c; |
vpcola | 23:06fac173529e | 311 | int multiplier = 1; |
vpcola | 23:06fac173529e | 312 | int len = 0; |
vpcola | 23:06fac173529e | 313 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
vpcola | 23:06fac173529e | 314 | |
vpcola | 23:06fac173529e | 315 | *value = 0; |
vpcola | 23:06fac173529e | 316 | do |
vpcola | 23:06fac173529e | 317 | { |
vpcola | 23:06fac173529e | 318 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
vpcola | 23:06fac173529e | 319 | { |
vpcola | 23:06fac173529e | 320 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
vpcola | 23:06fac173529e | 321 | goto exit; |
vpcola | 23:06fac173529e | 322 | } |
vpcola | 23:06fac173529e | 323 | |
vpcola | 23:06fac173529e | 324 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 325 | if (rc != 1) |
vpcola | 23:06fac173529e | 326 | { |
vpcola | 23:06fac173529e | 327 | rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 328 | goto exit; |
vpcola | 23:06fac173529e | 329 | } |
vpcola | 23:06fac173529e | 330 | |
vpcola | 23:06fac173529e | 331 | *value += (c & 127) * multiplier; |
vpcola | 23:06fac173529e | 332 | multiplier *= 128; |
vpcola | 23:06fac173529e | 333 | } while ((c & 128) != 0); |
vpcola | 23:06fac173529e | 334 | |
vpcola | 23:06fac173529e | 335 | rc = MQTTPACKET_READ_COMPLETE; |
vpcola | 23:06fac173529e | 336 | |
vpcola | 23:06fac173529e | 337 | exit: |
vpcola | 23:06fac173529e | 338 | if (rc == MQTTPACKET_READ_ERROR ) |
vpcola | 23:06fac173529e | 339 | len = -1; |
vpcola | 23:06fac173529e | 340 | |
vpcola | 23:06fac173529e | 341 | return len; |
vpcola | 23:06fac173529e | 342 | } |
vpcola | 23:06fac173529e | 343 | |
vpcola | 23:06fac173529e | 344 | int MQTTThreadedClient::sendPacket(size_t length) |
vpcola | 23:06fac173529e | 345 | { |
vpcola | 23:06fac173529e | 346 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 347 | int sent = 0; |
vpcola | 23:06fac173529e | 348 | |
vpcola | 23:06fac173529e | 349 | while (sent < length) |
vpcola | 23:06fac173529e | 350 | { |
vpcola | 23:06fac173529e | 351 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 352 | if (rc < 0) // there was an error writing the data |
vpcola | 23:06fac173529e | 353 | break; |
vpcola | 23:06fac173529e | 354 | sent += rc; |
vpcola | 23:06fac173529e | 355 | } |
vpcola | 23:06fac173529e | 356 | |
vpcola | 23:06fac173529e | 357 | if (sent == length) |
vpcola | 23:06fac173529e | 358 | rc = SUCCESS; |
vpcola | 23:06fac173529e | 359 | else |
vpcola | 23:06fac173529e | 360 | rc = FAILURE; |
vpcola | 23:06fac173529e | 361 | |
vpcola | 23:06fac173529e | 362 | return rc; |
vpcola | 23:06fac173529e | 363 | } |
vpcola | 23:06fac173529e | 364 | /** |
vpcola | 23:06fac173529e | 365 | * Reads the entire packet to readbuf and returns |
vpcola | 23:06fac173529e | 366 | * the type of packet when successful, otherwise |
vpcola | 23:06fac173529e | 367 | * a negative error code is returned. |
vpcola | 23:06fac173529e | 368 | **/ |
vpcola | 23:06fac173529e | 369 | int MQTTThreadedClient::readPacket() |
vpcola | 23:06fac173529e | 370 | { |
vpcola | 23:06fac173529e | 371 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 372 | MQTTHeader header = {0}; |
vpcola | 23:06fac173529e | 373 | int len = 0; |
vpcola | 23:06fac173529e | 374 | int rem_len = 0; |
vpcola | 23:06fac173529e | 375 | |
vpcola | 23:06fac173529e | 376 | /* 1. read the header byte. This has the packet type in it */ |
vpcola | 23:06fac173529e | 377 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
vpcola | 23:06fac173529e | 378 | goto exit; |
vpcola | 23:06fac173529e | 379 | |
vpcola | 23:06fac173529e | 380 | len = 1; |
vpcola | 23:06fac173529e | 381 | /* 2. read the remaining length. This is variable in itself */ |
vpcola | 23:06fac173529e | 382 | if ( readPacketLength(&rem_len) < 0 ) |
vpcola | 23:06fac173529e | 383 | goto exit; |
vpcola | 23:06fac173529e | 384 | |
vpcola | 23:06fac173529e | 385 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
vpcola | 23:06fac173529e | 386 | |
vpcola | 23:06fac173529e | 387 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
vpcola | 23:06fac173529e | 388 | { |
vpcola | 23:06fac173529e | 389 | rc = BUFFER_OVERFLOW; |
vpcola | 23:06fac173529e | 390 | goto exit; |
vpcola | 23:06fac173529e | 391 | } |
vpcola | 23:06fac173529e | 392 | |
vpcola | 23:06fac173529e | 393 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
vpcola | 23:06fac173529e | 394 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
vpcola | 23:06fac173529e | 395 | goto exit; |
vpcola | 23:06fac173529e | 396 | |
vpcola | 23:06fac173529e | 397 | // Convert the header to type |
vpcola | 23:06fac173529e | 398 | // and update rc |
vpcola | 23:06fac173529e | 399 | header.byte = readbuf[0]; |
vpcola | 23:06fac173529e | 400 | rc = header.bits.type; |
vpcola | 23:06fac173529e | 401 | |
vpcola | 23:06fac173529e | 402 | exit: |
vpcola | 23:06fac173529e | 403 | |
vpcola | 23:06fac173529e | 404 | return rc; |
vpcola | 23:06fac173529e | 405 | } |
vpcola | 23:06fac173529e | 406 | |
vpcola | 23:06fac173529e | 407 | /** |
vpcola | 23:06fac173529e | 408 | * Read until a specified packet type is received, or untill the specified |
vpcola | 23:06fac173529e | 409 | * timeout dropping packets along the way. |
vpcola | 23:06fac173529e | 410 | **/ |
vpcola | 23:06fac173529e | 411 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
vpcola | 23:06fac173529e | 412 | { |
vpcola | 23:06fac173529e | 413 | int pType = FAILURE; |
vpcola | 23:06fac173529e | 414 | Timer timer; |
vpcola | 23:06fac173529e | 415 | |
vpcola | 23:06fac173529e | 416 | timer.start(); |
vpcola | 23:06fac173529e | 417 | do { |
vpcola | 23:06fac173529e | 418 | pType = readPacket(); |
vpcola | 23:06fac173529e | 419 | if (pType < 0) |
vpcola | 23:06fac173529e | 420 | break; |
vpcola | 23:06fac173529e | 421 | |
vpcola | 23:06fac173529e | 422 | if (timer.read_ms() > timeout) |
vpcola | 23:06fac173529e | 423 | { |
vpcola | 23:06fac173529e | 424 | pType = FAILURE; |
vpcola | 23:06fac173529e | 425 | break; |
vpcola | 23:06fac173529e | 426 | } |
vpcola | 23:06fac173529e | 427 | }while(pType != packetType); |
vpcola | 23:06fac173529e | 428 | |
vpcola | 23:06fac173529e | 429 | return pType; |
vpcola | 23:06fac173529e | 430 | } |
vpcola | 23:06fac173529e | 431 | |
vpcola | 23:06fac173529e | 432 | |
vpcola | 23:06fac173529e | 433 | int MQTTThreadedClient::connect(MQTTPacket_connectData& options) |
vpcola | 23:06fac173529e | 434 | { |
vpcola | 23:06fac173529e | 435 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 436 | int len = 0; |
vpcola | 23:06fac173529e | 437 | |
vpcola | 23:06fac173529e | 438 | if (isConnected) |
vpcola | 23:06fac173529e | 439 | { |
vpcola | 23:06fac173529e | 440 | printf("Session already connected! \r\n"); |
vpcola | 23:06fac173529e | 441 | return rc; |
vpcola | 23:06fac173529e | 442 | } |
vpcola | 23:06fac173529e | 443 | |
vpcola | 23:06fac173529e | 444 | // Copy the keepAliveInterval value to local |
vpcola | 23:06fac173529e | 445 | // MQTT specifies in seconds, we have to multiply that |
vpcola | 23:06fac173529e | 446 | // amount for our 32 bit timers which accepts ms. |
vpcola | 23:06fac173529e | 447 | keepAliveInterval = (options.keepAliveInterval * 1000); |
vpcola | 23:06fac173529e | 448 | |
vpcola | 23:06fac173529e | 449 | printf("Connecting with: \r\n"); |
vpcola | 23:06fac173529e | 450 | printf("\tUsername: [%s]\r\n", options.username.cstring); |
vpcola | 23:06fac173529e | 451 | printf("\tPassword: [%s]\r\n", options.password.cstring); |
vpcola | 23:06fac173529e | 452 | |
vpcola | 23:06fac173529e | 453 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &options)) <= 0) |
vpcola | 23:06fac173529e | 454 | { |
vpcola | 23:06fac173529e | 455 | printf("Error serializing connect packet ...\r\n"); |
vpcola | 23:06fac173529e | 456 | return rc; |
vpcola | 23:06fac173529e | 457 | } |
vpcola | 23:06fac173529e | 458 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
vpcola | 23:06fac173529e | 459 | { |
vpcola | 23:06fac173529e | 460 | printf("Error sending the connect request packet ...\r\n"); |
vpcola | 23:06fac173529e | 461 | return rc; |
vpcola | 23:06fac173529e | 462 | } |
vpcola | 23:06fac173529e | 463 | |
vpcola | 23:06fac173529e | 464 | // Wait for the CONNACK |
vpcola | 23:06fac173529e | 465 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
vpcola | 23:06fac173529e | 466 | { |
vpcola | 23:06fac173529e | 467 | unsigned char connack_rc = 255; |
vpcola | 23:06fac173529e | 468 | bool sessionPresent = false; |
vpcola | 23:06fac173529e | 469 | printf("Connection acknowledgement received ... deserializing respones ...\r\n"); |
vpcola | 23:06fac173529e | 470 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 23:06fac173529e | 471 | rc = connack_rc; |
vpcola | 23:06fac173529e | 472 | else |
vpcola | 23:06fac173529e | 473 | rc = FAILURE; |
vpcola | 23:06fac173529e | 474 | } |
vpcola | 23:06fac173529e | 475 | else |
vpcola | 23:06fac173529e | 476 | rc = FAILURE; |
vpcola | 23:06fac173529e | 477 | |
vpcola | 23:06fac173529e | 478 | if (rc == SUCCESS) |
vpcola | 23:06fac173529e | 479 | { |
vpcola | 23:06fac173529e | 480 | printf("Connected!!! ... starting connection timers ...\r\n"); |
vpcola | 23:06fac173529e | 481 | isConnected = true; |
vpcola | 23:06fac173529e | 482 | resetConnectionTimer(); |
vpcola | 23:06fac173529e | 483 | }else |
vpcola | 23:06fac173529e | 484 | { |
vpcola | 23:06fac173529e | 485 | // TODO: Call socket->disconnect()? |
vpcola | 23:06fac173529e | 486 | } |
vpcola | 23:06fac173529e | 487 | |
vpcola | 23:06fac173529e | 488 | printf("Returning with rc = %d\r\n", rc); |
vpcola | 23:06fac173529e | 489 | |
vpcola | 23:06fac173529e | 490 | return rc; |
vpcola | 23:06fac173529e | 491 | } |
vpcola | 23:06fac173529e | 492 | |
vpcola | 25:326f00faa092 | 493 | |
vpcola | 25:326f00faa092 | 494 | void MQTTThreadedClient::disconnect() |
vpcola | 25:326f00faa092 | 495 | { |
vpcola | 25:326f00faa092 | 496 | if (isConnected) |
vpcola | 25:326f00faa092 | 497 | { |
vpcola | 25:326f00faa092 | 498 | // TODO: Send unsubscribe message ... |
vpcola | 25:326f00faa092 | 499 | |
vpcola | 25:326f00faa092 | 500 | isConnected = false; |
vpcola | 25:326f00faa092 | 501 | tcpSocket->close(); |
vpcola | 25:326f00faa092 | 502 | } |
vpcola | 25:326f00faa092 | 503 | } |
vpcola | 25:326f00faa092 | 504 | |
vpcola | 25:326f00faa092 | 505 | int MQTTThreadedClient::connect(const char * chost, uint16_t cport, MQTTPacket_connectData & options) |
vpcola | 23:06fac173529e | 506 | { |
vpcola | 25:326f00faa092 | 507 | int ret = FAILURE; |
vpcola | 25:326f00faa092 | 508 | |
vpcola | 25:326f00faa092 | 509 | // Copy the settings for reconnection |
vpcola | 25:326f00faa092 | 510 | host = chost; |
vpcola | 25:326f00faa092 | 511 | port = cport; |
vpcola | 25:326f00faa092 | 512 | connect_options = options; |
vpcola | 25:326f00faa092 | 513 | |
vpcola | 23:06fac173529e | 514 | tcpSocket->open(network); |
vpcola | 25:326f00faa092 | 515 | if (ssl_ca_pem != NULL) |
vpcola | 25:326f00faa092 | 516 | { |
vpcola | 25:326f00faa092 | 517 | printf("mbedtls_ssl_set_hostname ...\r\n"); |
vpcola | 25:326f00faa092 | 518 | mbedtls_ssl_set_hostname(&_ssl, host.c_str()); |
vpcola | 25:326f00faa092 | 519 | printf("mbedtls_ssl_set_bio ...\r\n"); |
vpcola | 25:326f00faa092 | 520 | mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), |
vpcola | 25:326f00faa092 | 521 | ssl_send, ssl_recv, NULL ); |
vpcola | 25:326f00faa092 | 522 | } |
vpcola | 25:326f00faa092 | 523 | |
vpcola | 25:326f00faa092 | 524 | if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) |
vpcola | 23:06fac173529e | 525 | { |
vpcola | 23:06fac173529e | 526 | |
vpcola | 25:326f00faa092 | 527 | printf("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); |
vpcola | 23:06fac173529e | 528 | return ret; |
vpcola | 23:06fac173529e | 529 | } |
vpcola | 25:326f00faa092 | 530 | |
vpcola | 25:326f00faa092 | 531 | if ((ssl_ca_pem != NULL) && (doTLSHandshake() < 0)) |
vpcola | 25:326f00faa092 | 532 | { |
vpcola | 25:326f00faa092 | 533 | printf("TLS Handshake failed! \r\n"); |
vpcola | 25:326f00faa092 | 534 | return FAILURE; |
vpcola | 25:326f00faa092 | 535 | } |
vpcola | 25:326f00faa092 | 536 | |
vpcola | 25:326f00faa092 | 537 | return connect(connect_options); |
vpcola | 23:06fac173529e | 538 | } |
vpcola | 23:06fac173529e | 539 | |
vpcola | 23:06fac173529e | 540 | int MQTTThreadedClient::publish(PubMessage& msg) |
vpcola | 23:06fac173529e | 541 | { |
vpcola | 23:06fac173529e | 542 | #if 0 |
vpcola | 23:06fac173529e | 543 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
vpcola | 23:06fac173529e | 544 | // TODO: handle id values when the function is called later |
vpcola | 23:06fac173529e | 545 | if (id == 0) |
vpcola | 23:06fac173529e | 546 | return FAILURE; |
vpcola | 23:06fac173529e | 547 | else |
vpcola | 23:06fac173529e | 548 | return SUCCESS; |
vpcola | 23:06fac173529e | 549 | #endif |
vpcola | 23:06fac173529e | 550 | PubMessage *message = mpool.alloc(); |
vpcola | 23:06fac173529e | 551 | // Simple copy |
vpcola | 23:06fac173529e | 552 | *message = msg; |
vpcola | 23:06fac173529e | 553 | |
vpcola | 23:06fac173529e | 554 | // Push the data to the thread |
vpcola | 25:326f00faa092 | 555 | printf("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 556 | mqueue.put(message); |
vpcola | 23:06fac173529e | 557 | |
vpcola | 23:06fac173529e | 558 | return SUCCESS; |
vpcola | 23:06fac173529e | 559 | } |
vpcola | 23:06fac173529e | 560 | |
vpcola | 23:06fac173529e | 561 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
vpcola | 23:06fac173529e | 562 | { |
vpcola | 23:06fac173529e | 563 | MQTTString topicString = MQTTString_initializer; |
vpcola | 23:06fac173529e | 564 | |
vpcola | 23:06fac173529e | 565 | if (!isConnected) |
vpcola | 23:06fac173529e | 566 | { |
vpcola | 25:326f00faa092 | 567 | printf("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 568 | return FAILURE; |
vpcola | 23:06fac173529e | 569 | } |
vpcola | 23:06fac173529e | 570 | |
vpcola | 23:06fac173529e | 571 | topicString.cstring = (char*) &message.topic[0]; |
vpcola | 23:06fac173529e | 572 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
vpcola | 23:06fac173529e | 573 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
vpcola | 23:06fac173529e | 574 | if (len <= 0) |
vpcola | 23:06fac173529e | 575 | { |
vpcola | 25:326f00faa092 | 576 | printf("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 577 | return FAILURE; |
vpcola | 23:06fac173529e | 578 | } |
vpcola | 23:06fac173529e | 579 | |
vpcola | 23:06fac173529e | 580 | if (sendPacket(len) == SUCCESS) |
vpcola | 23:06fac173529e | 581 | { |
vpcola | 25:326f00faa092 | 582 | printf("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 583 | return SUCCESS; |
vpcola | 23:06fac173529e | 584 | } |
vpcola | 23:06fac173529e | 585 | |
vpcola | 25:326f00faa092 | 586 | printf("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 587 | return FAILURE; |
vpcola | 23:06fac173529e | 588 | } |
vpcola | 25:326f00faa092 | 589 | |
vpcola | 25:326f00faa092 | 590 | void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) |
vpcola | 25:326f00faa092 | 591 | { |
vpcola | 25:326f00faa092 | 592 | // Push the subscription into the map ... |
vpcola | 25:326f00faa092 | 593 | FP<void,MessageData &> fp; |
vpcola | 25:326f00faa092 | 594 | fp.attach(function); |
vpcola | 23:06fac173529e | 595 | |
vpcola | 25:326f00faa092 | 596 | topicCBMap_t.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
vpcola | 25:326f00faa092 | 597 | } |
vpcola | 25:326f00faa092 | 598 | |
vpcola | 25:326f00faa092 | 599 | int MQTTThreadedClient::processSubscriptions() |
vpcola | 25:326f00faa092 | 600 | { |
vpcola | 25:326f00faa092 | 601 | int numsubscribed = 0; |
vpcola | 25:326f00faa092 | 602 | |
vpcola | 25:326f00faa092 | 603 | if (!isConnected) |
vpcola | 25:326f00faa092 | 604 | { |
vpcola | 25:326f00faa092 | 605 | printf("Session not connected!!\r\n"); |
vpcola | 25:326f00faa092 | 606 | return 0; |
vpcola | 25:326f00faa092 | 607 | } |
vpcola | 25:326f00faa092 | 608 | |
vpcola | 25:326f00faa092 | 609 | std::map<std::string, FP<void, MessageData &> >::iterator it; |
vpcola | 25:326f00faa092 | 610 | for(it = topicCBMap_t.begin(); it != topicCBMap_t.end(); it++) |
vpcola | 25:326f00faa092 | 611 | { |
vpcola | 25:326f00faa092 | 612 | int rc = FAILURE; |
vpcola | 25:326f00faa092 | 613 | int len = 0; |
vpcola | 25:326f00faa092 | 614 | //TODO: We only subscribe to QoS = 0 for now |
vpcola | 25:326f00faa092 | 615 | QoS qos = QOS0; |
vpcola | 25:326f00faa092 | 616 | |
vpcola | 25:326f00faa092 | 617 | MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; |
vpcola | 25:326f00faa092 | 618 | printf("Subscribing to topic [%s]\r\n", topic.cstring); |
vpcola | 25:326f00faa092 | 619 | |
vpcola | 25:326f00faa092 | 620 | |
vpcola | 25:326f00faa092 | 621 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
vpcola | 25:326f00faa092 | 622 | if (len <= 0) { |
vpcola | 25:326f00faa092 | 623 | printf("Error serializing subscribe packet ...\r\n"); |
vpcola | 25:326f00faa092 | 624 | continue; |
vpcola | 25:326f00faa092 | 625 | } |
vpcola | 25:326f00faa092 | 626 | |
vpcola | 25:326f00faa092 | 627 | if ((rc = sendPacket(len)) != SUCCESS) { |
vpcola | 25:326f00faa092 | 628 | printf("Error sending subscribe packet [%d]\r\n", rc); |
vpcola | 25:326f00faa092 | 629 | continue; |
vpcola | 25:326f00faa092 | 630 | } |
vpcola | 25:326f00faa092 | 631 | |
vpcola | 25:326f00faa092 | 632 | printf("Waiting for subscription ack ...\r\n"); |
vpcola | 25:326f00faa092 | 633 | // Wait for SUBACK, dropping packets read along the way ... |
vpcola | 25:326f00faa092 | 634 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
vpcola | 25:326f00faa092 | 635 | int count = 0, grantedQoS = -1; |
vpcola | 25:326f00faa092 | 636 | unsigned short mypacketid; |
vpcola | 25:326f00faa092 | 637 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 25:326f00faa092 | 638 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
vpcola | 25:326f00faa092 | 639 | // For as long as we do not get 0x80 .. |
vpcola | 25:326f00faa092 | 640 | if (rc != 0x80) |
vpcola | 25:326f00faa092 | 641 | { |
vpcola | 25:326f00faa092 | 642 | // Reset connection timers here ... |
vpcola | 25:326f00faa092 | 643 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 644 | printf("Successfully subscribed to %s ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 645 | numsubscribed++; |
vpcola | 25:326f00faa092 | 646 | } else { |
vpcola | 25:326f00faa092 | 647 | printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 648 | } |
vpcola | 25:326f00faa092 | 649 | } else |
vpcola | 25:326f00faa092 | 650 | printf("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 651 | } // end for loop |
vpcola | 25:326f00faa092 | 652 | |
vpcola | 25:326f00faa092 | 653 | return numsubscribed; |
vpcola | 25:326f00faa092 | 654 | } |
vpcola | 25:326f00faa092 | 655 | |
vpcola | 23:06fac173529e | 656 | int MQTTThreadedClient::subscribe(const char * topicstr, QoS qos, void (*function)(MessageData &)) |
vpcola | 23:06fac173529e | 657 | { |
vpcola | 23:06fac173529e | 658 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 659 | int len = 0; |
vpcola | 23:06fac173529e | 660 | |
vpcola | 23:06fac173529e | 661 | MQTTString topic = {(char*)topicstr, {0, 0}}; |
vpcola | 23:06fac173529e | 662 | printf("Subscribing to topic [%s]\r\n", topicstr); |
vpcola | 23:06fac173529e | 663 | |
vpcola | 23:06fac173529e | 664 | if (!isConnected) |
vpcola | 23:06fac173529e | 665 | { |
vpcola | 23:06fac173529e | 666 | printf("Session already connected!!\r\n"); |
vpcola | 23:06fac173529e | 667 | return rc; |
vpcola | 23:06fac173529e | 668 | } |
vpcola | 23:06fac173529e | 669 | |
vpcola | 23:06fac173529e | 670 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
vpcola | 23:06fac173529e | 671 | if (len <= 0) |
vpcola | 23:06fac173529e | 672 | { |
vpcola | 23:06fac173529e | 673 | printf("Error serializing subscribe packet ...\r\n"); |
vpcola | 23:06fac173529e | 674 | return rc; |
vpcola | 23:06fac173529e | 675 | } |
vpcola | 23:06fac173529e | 676 | |
vpcola | 23:06fac173529e | 677 | if ((rc = sendPacket(len)) != SUCCESS) |
vpcola | 23:06fac173529e | 678 | { |
vpcola | 23:06fac173529e | 679 | printf("Error sending subscribe packet [%d]\r\n", rc); |
vpcola | 23:06fac173529e | 680 | return rc; |
vpcola | 23:06fac173529e | 681 | } |
vpcola | 23:06fac173529e | 682 | |
vpcola | 23:06fac173529e | 683 | printf("Waiting for subscription ack ...\r\n"); |
vpcola | 23:06fac173529e | 684 | // Wait for SUBACK, dropping packets read along the way ... |
vpcola | 23:06fac173529e | 685 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) // wait for suback |
vpcola | 23:06fac173529e | 686 | { |
vpcola | 23:06fac173529e | 687 | int count = 0, grantedQoS = -1; |
vpcola | 23:06fac173529e | 688 | unsigned short mypacketid; |
vpcola | 23:06fac173529e | 689 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 23:06fac173529e | 690 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
vpcola | 23:06fac173529e | 691 | // For as long as we do not get 0x80 .. |
vpcola | 23:06fac173529e | 692 | if (rc != 0x80) |
vpcola | 23:06fac173529e | 693 | { |
vpcola | 23:06fac173529e | 694 | // Add message handlers to the map |
vpcola | 23:06fac173529e | 695 | FP<void,MessageData &> fp; |
vpcola | 23:06fac173529e | 696 | fp.attach(function); |
vpcola | 23:06fac173529e | 697 | |
vpcola | 23:06fac173529e | 698 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
vpcola | 23:06fac173529e | 699 | |
vpcola | 23:06fac173529e | 700 | // Reset connection timers here ... |
vpcola | 23:06fac173529e | 701 | resetConnectionTimer(); |
vpcola | 23:06fac173529e | 702 | |
vpcola | 23:06fac173529e | 703 | printf("Successfully subscribed to %s ...\r\n", topicstr); |
vpcola | 23:06fac173529e | 704 | rc = SUCCESS; |
vpcola | 23:06fac173529e | 705 | }else |
vpcola | 23:06fac173529e | 706 | { |
vpcola | 23:06fac173529e | 707 | printf("Failed to subscribe to topic %s ... (not authorized?)\r\n", topicstr); |
vpcola | 23:06fac173529e | 708 | } |
vpcola | 23:06fac173529e | 709 | } |
vpcola | 23:06fac173529e | 710 | else |
vpcola | 23:06fac173529e | 711 | { |
vpcola | 23:06fac173529e | 712 | printf("Failed to subscribe to topic %s (ack not received) ...\r\n", topicstr); |
vpcola | 23:06fac173529e | 713 | rc = FAILURE; |
vpcola | 23:06fac173529e | 714 | } |
vpcola | 23:06fac173529e | 715 | |
vpcola | 23:06fac173529e | 716 | return rc; |
vpcola | 23:06fac173529e | 717 | |
vpcola | 23:06fac173529e | 718 | } |
vpcola | 23:06fac173529e | 719 | |
vpcola | 23:06fac173529e | 720 | |
vpcola | 23:06fac173529e | 721 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
vpcola | 23:06fac173529e | 722 | { |
vpcola | 23:06fac173529e | 723 | char* curf = topicFilter; |
vpcola | 23:06fac173529e | 724 | char* curn = topicName.lenstring.data; |
vpcola | 23:06fac173529e | 725 | char* curn_end = curn + topicName.lenstring.len; |
vpcola | 23:06fac173529e | 726 | |
vpcola | 23:06fac173529e | 727 | while (*curf && curn < curn_end) |
vpcola | 23:06fac173529e | 728 | { |
vpcola | 23:06fac173529e | 729 | if (*curn == '/' && *curf != '/') |
vpcola | 23:06fac173529e | 730 | break; |
vpcola | 23:06fac173529e | 731 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
vpcola | 23:06fac173529e | 732 | break; |
vpcola | 23:06fac173529e | 733 | if (*curf == '+') |
vpcola | 23:06fac173529e | 734 | { // skip until we meet the next separator, or end of string |
vpcola | 23:06fac173529e | 735 | char* nextpos = curn + 1; |
vpcola | 23:06fac173529e | 736 | while (nextpos < curn_end && *nextpos != '/') |
vpcola | 23:06fac173529e | 737 | nextpos = ++curn + 1; |
vpcola | 23:06fac173529e | 738 | } |
vpcola | 23:06fac173529e | 739 | else if (*curf == '#') |
vpcola | 23:06fac173529e | 740 | curn = curn_end - 1; // skip until end of string |
vpcola | 23:06fac173529e | 741 | curf++; |
vpcola | 23:06fac173529e | 742 | curn++; |
vpcola | 23:06fac173529e | 743 | }; |
vpcola | 23:06fac173529e | 744 | |
vpcola | 23:06fac173529e | 745 | return (curn == curn_end) && (*curf == '\0'); |
vpcola | 23:06fac173529e | 746 | } |
vpcola | 23:06fac173529e | 747 | |
vpcola | 23:06fac173529e | 748 | int MQTTThreadedClient::handlePublishMsg() |
vpcola | 23:06fac173529e | 749 | { |
vpcola | 23:06fac173529e | 750 | MQTTString topicName = MQTTString_initializer; |
vpcola | 23:06fac173529e | 751 | Message msg; |
vpcola | 23:06fac173529e | 752 | int intQoS; |
vpcola | 25:326f00faa092 | 753 | printf("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 754 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
vpcola | 23:06fac173529e | 755 | &intQoS, |
vpcola | 23:06fac173529e | 756 | (unsigned char*)&msg.retained, |
vpcola | 23:06fac173529e | 757 | (unsigned short*)&msg.id, |
vpcola | 23:06fac173529e | 758 | &topicName, |
vpcola | 23:06fac173529e | 759 | (unsigned char**)&msg.payload, |
vpcola | 23:06fac173529e | 760 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
vpcola | 23:06fac173529e | 761 | { |
vpcola | 25:326f00faa092 | 762 | printf("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 763 | return -1; |
vpcola | 23:06fac173529e | 764 | } |
vpcola | 23:06fac173529e | 765 | |
vpcola | 23:06fac173529e | 766 | std::string topic; |
vpcola | 23:06fac173529e | 767 | if (topicName.lenstring.len > 0) |
vpcola | 23:06fac173529e | 768 | { |
vpcola | 23:06fac173529e | 769 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
vpcola | 23:06fac173529e | 770 | }else |
vpcola | 23:06fac173529e | 771 | topic = (const char *) topicName.cstring; |
vpcola | 23:06fac173529e | 772 | |
vpcola | 25:326f00faa092 | 773 | printf("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); |
vpcola | 23:06fac173529e | 774 | |
vpcola | 23:06fac173529e | 775 | msg.qos = (QoS) intQoS; |
vpcola | 23:06fac173529e | 776 | |
vpcola | 23:06fac173529e | 777 | |
vpcola | 23:06fac173529e | 778 | // Call the handlers for each topic |
vpcola | 23:06fac173529e | 779 | if (topicCBMap.find(topic) != topicCBMap.end()) |
vpcola | 23:06fac173529e | 780 | { |
vpcola | 23:06fac173529e | 781 | // Call the callback function |
vpcola | 23:06fac173529e | 782 | if (topicCBMap[topic].attached()) |
vpcola | 23:06fac173529e | 783 | { |
vpcola | 25:326f00faa092 | 784 | printf("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 785 | MessageData md(topicName, msg); |
vpcola | 23:06fac173529e | 786 | topicCBMap[topic](md); |
vpcola | 23:06fac173529e | 787 | |
vpcola | 23:06fac173529e | 788 | return 1; |
vpcola | 23:06fac173529e | 789 | } |
vpcola | 23:06fac173529e | 790 | } |
vpcola | 23:06fac173529e | 791 | |
vpcola | 23:06fac173529e | 792 | // TODO: depending on the QoS |
vpcola | 23:06fac173529e | 793 | // we send data to the server = PUBACK or PUBREC |
vpcola | 23:06fac173529e | 794 | switch(intQoS) |
vpcola | 23:06fac173529e | 795 | { |
vpcola | 23:06fac173529e | 796 | case QOS0: |
vpcola | 23:06fac173529e | 797 | // We send back nothing ... |
vpcola | 23:06fac173529e | 798 | break; |
vpcola | 23:06fac173529e | 799 | case QOS1: |
vpcola | 23:06fac173529e | 800 | // TODO: implement |
vpcola | 23:06fac173529e | 801 | break; |
vpcola | 23:06fac173529e | 802 | case QOS2: |
vpcola | 23:06fac173529e | 803 | // TODO: implement |
vpcola | 23:06fac173529e | 804 | break; |
vpcola | 23:06fac173529e | 805 | default: |
vpcola | 23:06fac173529e | 806 | break; |
vpcola | 23:06fac173529e | 807 | } |
vpcola | 23:06fac173529e | 808 | |
vpcola | 23:06fac173529e | 809 | return 0; |
vpcola | 23:06fac173529e | 810 | } |
vpcola | 23:06fac173529e | 811 | |
vpcola | 23:06fac173529e | 812 | void MQTTThreadedClient::resetConnectionTimer() |
vpcola | 23:06fac173529e | 813 | { |
vpcola | 23:06fac173529e | 814 | if (keepAliveInterval > 0) |
vpcola | 23:06fac173529e | 815 | { |
vpcola | 23:06fac173529e | 816 | comTimer.reset(); |
vpcola | 23:06fac173529e | 817 | comTimer.start(); |
vpcola | 23:06fac173529e | 818 | } |
vpcola | 23:06fac173529e | 819 | } |
vpcola | 23:06fac173529e | 820 | |
vpcola | 23:06fac173529e | 821 | bool MQTTThreadedClient::hasConnectionTimedOut() |
vpcola | 23:06fac173529e | 822 | { |
vpcola | 23:06fac173529e | 823 | if (keepAliveInterval > 0 ) { |
vpcola | 23:06fac173529e | 824 | // Check connection timer |
vpcola | 23:06fac173529e | 825 | if (comTimer.read_ms() > keepAliveInterval) |
vpcola | 23:06fac173529e | 826 | return true; |
vpcola | 23:06fac173529e | 827 | else |
vpcola | 23:06fac173529e | 828 | return false; |
vpcola | 23:06fac173529e | 829 | } |
vpcola | 23:06fac173529e | 830 | |
vpcola | 23:06fac173529e | 831 | return false; |
vpcola | 23:06fac173529e | 832 | } |
vpcola | 23:06fac173529e | 833 | |
vpcola | 23:06fac173529e | 834 | void MQTTThreadedClient::sendPingRequest() |
vpcola | 23:06fac173529e | 835 | { |
vpcola | 23:06fac173529e | 836 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
vpcola | 23:06fac173529e | 837 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
vpcola | 23:06fac173529e | 838 | { |
vpcola | 25:326f00faa092 | 839 | printf("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 840 | } |
vpcola | 23:06fac173529e | 841 | } |
vpcola | 23:06fac173529e | 842 | |
vpcola | 23:06fac173529e | 843 | void MQTTThreadedClient::startListener() |
vpcola | 23:06fac173529e | 844 | { |
vpcola | 23:06fac173529e | 845 | int pType; |
vpcola | 23:06fac173529e | 846 | // Continuesly listens for packets and dispatch |
vpcola | 23:06fac173529e | 847 | // message handlers ... |
vpcola | 25:326f00faa092 | 848 | do { |
vpcola | 25:326f00faa092 | 849 | // Connect to server ... |
vpcola | 25:326f00faa092 | 850 | if (!isConnected) |
vpcola | 23:06fac173529e | 851 | { |
vpcola | 25:326f00faa092 | 852 | // Attempt to reconnect ... |
vpcola | 23:06fac173529e | 853 | } |
vpcola | 23:06fac173529e | 854 | |
vpcola | 25:326f00faa092 | 855 | while(true) { |
vpcola | 25:326f00faa092 | 856 | pType = readPacket(); |
vpcola | 25:326f00faa092 | 857 | switch(pType) { |
vpcola | 25:326f00faa092 | 858 | case TIMEOUT: |
vpcola | 25:326f00faa092 | 859 | // No data available from the network ... |
vpcola | 25:326f00faa092 | 860 | break; |
vpcola | 25:326f00faa092 | 861 | case FAILURE: |
vpcola | 25:326f00faa092 | 862 | goto reconnect; |
vpcola | 25:326f00faa092 | 863 | case BUFFER_OVERFLOW: |
vpcola | 25:326f00faa092 | 864 | { |
vpcola | 25:326f00faa092 | 865 | // TODO: Network error, do we disconnect and reconnect? |
vpcola | 25:326f00faa092 | 866 | printf("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 867 | MBED_ASSERT(false); |
vpcola | 25:326f00faa092 | 868 | } |
vpcola | 25:326f00faa092 | 869 | break; |
vpcola | 25:326f00faa092 | 870 | /** |
vpcola | 25:326f00faa092 | 871 | * The rest of the return codes below (all positive) is about MQTT |
vpcola | 25:326f00faa092 | 872 | * response codes |
vpcola | 25:326f00faa092 | 873 | **/ |
vpcola | 25:326f00faa092 | 874 | case CONNACK: |
vpcola | 25:326f00faa092 | 875 | case PUBACK: |
vpcola | 25:326f00faa092 | 876 | case SUBACK: |
vpcola | 25:326f00faa092 | 877 | break; |
vpcola | 25:326f00faa092 | 878 | case PUBLISH: |
vpcola | 25:326f00faa092 | 879 | { |
vpcola | 25:326f00faa092 | 880 | printf("[Thread:%d]Publish received!....\r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 881 | // We receive data from the MQTT server .. |
vpcola | 25:326f00faa092 | 882 | if (handlePublishMsg() < 0) { |
vpcola | 25:326f00faa092 | 883 | printf("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 884 | break; |
vpcola | 25:326f00faa092 | 885 | } |
vpcola | 25:326f00faa092 | 886 | } |
vpcola | 25:326f00faa092 | 887 | break; |
vpcola | 25:326f00faa092 | 888 | case PINGRESP: |
vpcola | 25:326f00faa092 | 889 | { |
vpcola | 25:326f00faa092 | 890 | printf("[Thread:%d]Got ping response ...\r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 891 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 892 | } |
vpcola | 25:326f00faa092 | 893 | break; |
vpcola | 25:326f00faa092 | 894 | default: |
vpcola | 25:326f00faa092 | 895 | printf("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); |
vpcola | 25:326f00faa092 | 896 | } |
vpcola | 25:326f00faa092 | 897 | |
vpcola | 25:326f00faa092 | 898 | // Check if its time to send a keepAlive packet |
vpcola | 25:326f00faa092 | 899 | if (hasConnectionTimedOut()) { |
vpcola | 25:326f00faa092 | 900 | // Queue the ping request so that other |
vpcola | 25:326f00faa092 | 901 | // pending operations queued above will go first |
vpcola | 25:326f00faa092 | 902 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
vpcola | 25:326f00faa092 | 903 | } |
vpcola | 25:326f00faa092 | 904 | |
vpcola | 25:326f00faa092 | 905 | // Check if we have messages on the message queue |
vpcola | 25:326f00faa092 | 906 | osEvent evt = mqueue.get(10); |
vpcola | 25:326f00faa092 | 907 | if (evt.status == osEventMessage) { |
vpcola | 25:326f00faa092 | 908 | |
vpcola | 25:326f00faa092 | 909 | printf("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 910 | |
vpcola | 25:326f00faa092 | 911 | // Unpack the message |
vpcola | 25:326f00faa092 | 912 | PubMessage * message = (PubMessage *)evt.value.p; |
vpcola | 25:326f00faa092 | 913 | |
vpcola | 25:326f00faa092 | 914 | // Send the packet, do not queue the call |
vpcola | 25:326f00faa092 | 915 | // like the ping above .. |
vpcola | 25:326f00faa092 | 916 | if ( sendPublish(*message) == SUCCESS) { |
vpcola | 25:326f00faa092 | 917 | // Reset timers if we have been able to send successfully |
vpcola | 25:326f00faa092 | 918 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 919 | } else { |
vpcola | 25:326f00faa092 | 920 | // Disconnected? |
vpcola | 25:326f00faa092 | 921 | goto reconnect; |
vpcola | 25:326f00faa092 | 922 | } |
vpcola | 25:326f00faa092 | 923 | |
vpcola | 25:326f00faa092 | 924 | // Free the message from mempool after using |
vpcola | 25:326f00faa092 | 925 | mpool.free(message); |
vpcola | 25:326f00faa092 | 926 | } |
vpcola | 25:326f00faa092 | 927 | |
vpcola | 25:326f00faa092 | 928 | // Dispatch any queued events ... |
vpcola | 25:326f00faa092 | 929 | queue.dispatch(100); |
vpcola | 25:326f00faa092 | 930 | } // end while loop |
vpcola | 25:326f00faa092 | 931 | |
vpcola | 25:326f00faa092 | 932 | reconnect: |
vpcola | 25:326f00faa092 | 933 | disconnect(); |
vpcola | 25:326f00faa092 | 934 | // reconnect? |
vpcola | 25:326f00faa092 | 935 | } while(true); |
vpcola | 23:06fac173529e | 936 | } |
vpcola | 23:06fac173529e | 937 |