A Threaded Secure MQTT Client example. Uses MBED TLS for SSL/TLS connection. QoS0 only for now. Example has been tested with K64F connected via Ethernet.
Fork of HelloMQTT by
MQTTThreadedClient.cpp@30:b2aed80037db, 2017-03-28 (annotated)
- Committer:
- vpcola
- Date:
- Tue Mar 28 09:18:49 2017 +0000
- Revision:
- 30:b2aed80037db
- Parent:
- 27:c90092f35d79
- Child:
- 33:38e2e7bf91eb
Added Nucleo L476 to mbed_app.json
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
vpcola | 23:06fac173529e | 1 | #include "mbed.h" |
vpcola | 23:06fac173529e | 2 | #include "rtos.h" |
vpcola | 23:06fac173529e | 3 | #include "MQTTThreadedClient.h" |
vpcola | 25:326f00faa092 | 4 | #include "mbedtls/platform.h" |
vpcola | 25:326f00faa092 | 5 | #include "mbedtls/ssl.h" |
vpcola | 25:326f00faa092 | 6 | #include "mbedtls/entropy.h" |
vpcola | 25:326f00faa092 | 7 | #include "mbedtls/ctr_drbg.h" |
vpcola | 25:326f00faa092 | 8 | #include "mbedtls/error.h" |
vpcola | 23:06fac173529e | 9 | |
vpcola | 30:b2aed80037db | 10 | static MemoryPool<MQTT::PubMessage, 8> mpool; |
vpcola | 30:b2aed80037db | 11 | static Queue<MQTT::PubMessage, 8> mqueue; |
vpcola | 23:06fac173529e | 12 | |
vpcola | 25:326f00faa092 | 13 | // SSL/TLS variables |
vpcola | 25:326f00faa092 | 14 | mbedtls_entropy_context _entropy; |
vpcola | 25:326f00faa092 | 15 | mbedtls_ctr_drbg_context _ctr_drbg; |
vpcola | 25:326f00faa092 | 16 | mbedtls_x509_crt _cacert; |
vpcola | 25:326f00faa092 | 17 | mbedtls_ssl_context _ssl; |
vpcola | 25:326f00faa092 | 18 | mbedtls_ssl_config _ssl_conf; |
vpcola | 25:326f00faa092 | 19 | mbedtls_ssl_session saved_session; |
vpcola | 25:326f00faa092 | 20 | |
vpcola | 30:b2aed80037db | 21 | namespace MQTT { |
vpcola | 30:b2aed80037db | 22 | |
vpcola | 25:326f00faa092 | 23 | /** |
vpcola | 25:326f00faa092 | 24 | * Receive callback for mbed TLS |
vpcola | 25:326f00faa092 | 25 | */ |
vpcola | 25:326f00faa092 | 26 | static int ssl_recv(void *ctx, unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 27 | { |
vpcola | 25:326f00faa092 | 28 | int recv = -1; |
vpcola | 25:326f00faa092 | 29 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 30 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 31 | recv = socket->recv(buf, len); |
vpcola | 25:326f00faa092 | 32 | |
vpcola | 25:326f00faa092 | 33 | if (NSAPI_ERROR_WOULD_BLOCK == recv) { |
vpcola | 25:326f00faa092 | 34 | return MBEDTLS_ERR_SSL_WANT_READ; |
vpcola | 25:326f00faa092 | 35 | } else if (recv < 0) { |
vpcola | 25:326f00faa092 | 36 | return -1; |
vpcola | 25:326f00faa092 | 37 | } else { |
vpcola | 25:326f00faa092 | 38 | return recv; |
vpcola | 25:326f00faa092 | 39 | } |
vpcola | 25:326f00faa092 | 40 | } |
vpcola | 25:326f00faa092 | 41 | |
vpcola | 25:326f00faa092 | 42 | /** |
vpcola | 25:326f00faa092 | 43 | * Send callback for mbed TLS |
vpcola | 25:326f00faa092 | 44 | */ |
vpcola | 25:326f00faa092 | 45 | static int ssl_send(void *ctx, const unsigned char *buf, size_t len) |
vpcola | 25:326f00faa092 | 46 | { |
vpcola | 25:326f00faa092 | 47 | int sent = -1; |
vpcola | 25:326f00faa092 | 48 | TCPSocket *socket = static_cast<TCPSocket *>(ctx); |
vpcola | 25:326f00faa092 | 49 | socket->set_timeout(DEFAULT_SOCKET_TIMEOUT); |
vpcola | 25:326f00faa092 | 50 | sent = socket->send(buf, len); |
vpcola | 25:326f00faa092 | 51 | |
vpcola | 25:326f00faa092 | 52 | if(NSAPI_ERROR_WOULD_BLOCK == sent) { |
vpcola | 25:326f00faa092 | 53 | return MBEDTLS_ERR_SSL_WANT_WRITE; |
vpcola | 25:326f00faa092 | 54 | } else if (sent < 0) { |
vpcola | 25:326f00faa092 | 55 | return -1; |
vpcola | 25:326f00faa092 | 56 | } else { |
vpcola | 25:326f00faa092 | 57 | return sent; |
vpcola | 25:326f00faa092 | 58 | } |
vpcola | 25:326f00faa092 | 59 | } |
vpcola | 25:326f00faa092 | 60 | |
vpcola | 25:326f00faa092 | 61 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 62 | /** |
vpcola | 25:326f00faa092 | 63 | * Debug callback for mbed TLS |
vpcola | 25:326f00faa092 | 64 | * Just prints on the USB serial port |
vpcola | 25:326f00faa092 | 65 | */ |
vpcola | 25:326f00faa092 | 66 | static void my_debug(void *ctx, int level, const char *file, int line, |
vpcola | 25:326f00faa092 | 67 | const char *str) |
vpcola | 25:326f00faa092 | 68 | { |
vpcola | 25:326f00faa092 | 69 | const char *p, *basename; |
vpcola | 25:326f00faa092 | 70 | (void) ctx; |
vpcola | 25:326f00faa092 | 71 | |
vpcola | 25:326f00faa092 | 72 | /* Extract basename from file */ |
vpcola | 25:326f00faa092 | 73 | for(p = basename = file; *p != '\0'; p++) { |
vpcola | 25:326f00faa092 | 74 | if(*p == '/' || *p == '\\') { |
vpcola | 25:326f00faa092 | 75 | basename = p + 1; |
vpcola | 25:326f00faa092 | 76 | } |
vpcola | 25:326f00faa092 | 77 | } |
vpcola | 25:326f00faa092 | 78 | |
vpcola | 25:326f00faa092 | 79 | if (_debug) { |
vpcola | 25:326f00faa092 | 80 | mbedtls_printf("%s:%04d: |%d| %s", basename, line, level, str); |
vpcola | 25:326f00faa092 | 81 | } |
vpcola | 25:326f00faa092 | 82 | } |
vpcola | 25:326f00faa092 | 83 | |
vpcola | 25:326f00faa092 | 84 | /** |
vpcola | 25:326f00faa092 | 85 | * Certificate verification callback for mbed TLS |
vpcola | 25:326f00faa092 | 86 | * Here we only use it to display information on each cert in the chain |
vpcola | 25:326f00faa092 | 87 | */ |
vpcola | 25:326f00faa092 | 88 | static int my_verify(void *data, mbedtls_x509_crt *crt, int depth, uint32_t *flags) |
vpcola | 25:326f00faa092 | 89 | { |
vpcola | 25:326f00faa092 | 90 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 91 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 92 | (void) data; |
vpcola | 25:326f00faa092 | 93 | |
vpcola | 26:4b21de8043a5 | 94 | if (_debug) mbedtls_printf("\nVerifying certificate at depth %d:\r\n", depth); |
vpcola | 25:326f00faa092 | 95 | mbedtls_x509_crt_info(buf, buf_size - 1, " ", crt); |
vpcola | 25:326f00faa092 | 96 | if (_debug) mbedtls_printf("%s", buf); |
vpcola | 25:326f00faa092 | 97 | |
vpcola | 25:326f00faa092 | 98 | if (*flags == 0) |
vpcola | 26:4b21de8043a5 | 99 | if (_debug) mbedtls_printf("No verification issue for this certificate\r\n"); |
vpcola | 25:326f00faa092 | 100 | else { |
vpcola | 25:326f00faa092 | 101 | mbedtls_x509_crt_verify_info(buf, buf_size, " ! ", *flags); |
vpcola | 25:326f00faa092 | 102 | if (_debug) mbedtls_printf("%s\n", buf); |
vpcola | 25:326f00faa092 | 103 | } |
vpcola | 25:326f00faa092 | 104 | |
vpcola | 25:326f00faa092 | 105 | delete[] buf; |
vpcola | 25:326f00faa092 | 106 | return 0; |
vpcola | 25:326f00faa092 | 107 | } |
vpcola | 25:326f00faa092 | 108 | #endif |
vpcola | 25:326f00faa092 | 109 | |
vpcola | 25:326f00faa092 | 110 | |
vpcola | 25:326f00faa092 | 111 | void MQTTThreadedClient::setupTLS() |
vpcola | 25:326f00faa092 | 112 | { |
vpcola | 26:4b21de8043a5 | 113 | if (useTLS) |
vpcola | 25:326f00faa092 | 114 | { |
vpcola | 25:326f00faa092 | 115 | mbedtls_entropy_init(&_entropy); |
vpcola | 25:326f00faa092 | 116 | mbedtls_ctr_drbg_init(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 117 | mbedtls_x509_crt_init(&_cacert); |
vpcola | 25:326f00faa092 | 118 | mbedtls_ssl_init(&_ssl); |
vpcola | 25:326f00faa092 | 119 | mbedtls_ssl_config_init(&_ssl_conf); |
vpcola | 25:326f00faa092 | 120 | memset( &saved_session, 0, sizeof( mbedtls_ssl_session ) ); |
vpcola | 25:326f00faa092 | 121 | } |
vpcola | 25:326f00faa092 | 122 | } |
vpcola | 25:326f00faa092 | 123 | |
vpcola | 25:326f00faa092 | 124 | void MQTTThreadedClient::freeTLS() |
vpcola | 25:326f00faa092 | 125 | { |
vpcola | 26:4b21de8043a5 | 126 | if (useTLS) |
vpcola | 25:326f00faa092 | 127 | { |
vpcola | 25:326f00faa092 | 128 | mbedtls_entropy_free(&_entropy); |
vpcola | 25:326f00faa092 | 129 | mbedtls_ctr_drbg_free(&_ctr_drbg); |
vpcola | 25:326f00faa092 | 130 | mbedtls_x509_crt_free(&_cacert); |
vpcola | 25:326f00faa092 | 131 | mbedtls_ssl_free(&_ssl); |
vpcola | 25:326f00faa092 | 132 | mbedtls_ssl_config_free(&_ssl_conf); |
vpcola | 25:326f00faa092 | 133 | } |
vpcola | 25:326f00faa092 | 134 | } |
vpcola | 25:326f00faa092 | 135 | |
vpcola | 25:326f00faa092 | 136 | int MQTTThreadedClient::initTLS() |
vpcola | 25:326f00faa092 | 137 | { |
vpcola | 25:326f00faa092 | 138 | int ret; |
vpcola | 25:326f00faa092 | 139 | |
vpcola | 30:b2aed80037db | 140 | DBG("Initializing TLS ...\r\n"); |
vpcola | 30:b2aed80037db | 141 | DBG("mbedtls_ctr_drdbg_seed ...\r\n"); |
vpcola | 25:326f00faa092 | 142 | if ((ret = mbedtls_ctr_drbg_seed(&_ctr_drbg, mbedtls_entropy_func, &_entropy, |
vpcola | 25:326f00faa092 | 143 | (const unsigned char *) DRBG_PERS, |
vpcola | 25:326f00faa092 | 144 | sizeof (DRBG_PERS))) != 0) { |
vpcola | 26:4b21de8043a5 | 145 | mbedtls_printf("mbedtls_crt_drbg_init returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 146 | _error = ret; |
vpcola | 25:326f00faa092 | 147 | return -1; |
vpcola | 25:326f00faa092 | 148 | } |
vpcola | 30:b2aed80037db | 149 | DBG("mbedtls_x509_crt_parse ...\r\n"); |
vpcola | 25:326f00faa092 | 150 | if ((ret = mbedtls_x509_crt_parse(&_cacert, (const unsigned char *) ssl_ca_pem, |
vpcola | 25:326f00faa092 | 151 | strlen(ssl_ca_pem) + 1)) != 0) { |
vpcola | 26:4b21de8043a5 | 152 | mbedtls_printf("mbedtls_x509_crt_parse returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 153 | _error = ret; |
vpcola | 25:326f00faa092 | 154 | return -1; |
vpcola | 25:326f00faa092 | 155 | } |
vpcola | 25:326f00faa092 | 156 | |
vpcola | 30:b2aed80037db | 157 | DBG("mbedtls_ssl_config_defaults ...\r\n"); |
vpcola | 25:326f00faa092 | 158 | if ((ret = mbedtls_ssl_config_defaults(&_ssl_conf, |
vpcola | 25:326f00faa092 | 159 | MBEDTLS_SSL_IS_CLIENT, |
vpcola | 25:326f00faa092 | 160 | MBEDTLS_SSL_TRANSPORT_STREAM, |
vpcola | 25:326f00faa092 | 161 | MBEDTLS_SSL_PRESET_DEFAULT)) != 0) { |
vpcola | 26:4b21de8043a5 | 162 | mbedtls_printf("mbedtls_ssl_config_defaults returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 163 | _error = ret; |
vpcola | 25:326f00faa092 | 164 | return -1; |
vpcola | 25:326f00faa092 | 165 | } |
vpcola | 25:326f00faa092 | 166 | |
vpcola | 30:b2aed80037db | 167 | DBG("mbedtls_ssl_config_ca_chain ...\r\n"); |
vpcola | 25:326f00faa092 | 168 | mbedtls_ssl_conf_ca_chain(&_ssl_conf, &_cacert, NULL); |
vpcola | 30:b2aed80037db | 169 | DBG("mbedtls_ssl_conf_rng ...\r\n"); |
vpcola | 25:326f00faa092 | 170 | mbedtls_ssl_conf_rng(&_ssl_conf, mbedtls_ctr_drbg_random, &_ctr_drbg); |
vpcola | 25:326f00faa092 | 171 | |
vpcola | 25:326f00faa092 | 172 | /* It is possible to disable authentication by passing |
vpcola | 25:326f00faa092 | 173 | * MBEDTLS_SSL_VERIFY_NONE in the call to mbedtls_ssl_conf_authmode() |
vpcola | 25:326f00faa092 | 174 | */ |
vpcola | 30:b2aed80037db | 175 | DBG("mbedtls_ssl_conf_authmode ...\r\n"); |
vpcola | 25:326f00faa092 | 176 | mbedtls_ssl_conf_authmode(&_ssl_conf, MBEDTLS_SSL_VERIFY_REQUIRED); |
vpcola | 25:326f00faa092 | 177 | |
vpcola | 25:326f00faa092 | 178 | #if DEBUG_LEVEL > 0 |
vpcola | 25:326f00faa092 | 179 | mbedtls_ssl_conf_verify(&_ssl_conf, my_verify, NULL); |
vpcola | 25:326f00faa092 | 180 | mbedtls_ssl_conf_dbg(&_ssl_conf, my_debug, NULL); |
vpcola | 25:326f00faa092 | 181 | mbedtls_debug_set_threshold(DEBUG_LEVEL); |
vpcola | 25:326f00faa092 | 182 | #endif |
vpcola | 25:326f00faa092 | 183 | |
vpcola | 30:b2aed80037db | 184 | DBG("mbedtls_ssl_setup ...\r\n"); |
vpcola | 25:326f00faa092 | 185 | if ((ret = mbedtls_ssl_setup(&_ssl, &_ssl_conf)) != 0) { |
vpcola | 26:4b21de8043a5 | 186 | mbedtls_printf("mbedtls_ssl_setup returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 187 | _error = ret; |
vpcola | 25:326f00faa092 | 188 | return -1; |
vpcola | 25:326f00faa092 | 189 | } |
vpcola | 25:326f00faa092 | 190 | |
vpcola | 25:326f00faa092 | 191 | return 0; |
vpcola | 25:326f00faa092 | 192 | } |
vpcola | 25:326f00faa092 | 193 | |
vpcola | 25:326f00faa092 | 194 | int MQTTThreadedClient::doTLSHandshake() |
vpcola | 25:326f00faa092 | 195 | { |
vpcola | 25:326f00faa092 | 196 | int ret; |
vpcola | 25:326f00faa092 | 197 | |
vpcola | 25:326f00faa092 | 198 | /* Start the handshake, the rest will be done in onReceive() */ |
vpcola | 30:b2aed80037db | 199 | DBG("Starting the TLS handshake...\r\n"); |
vpcola | 25:326f00faa092 | 200 | ret = mbedtls_ssl_handshake(&_ssl); |
vpcola | 25:326f00faa092 | 201 | if (ret < 0) |
vpcola | 25:326f00faa092 | 202 | { |
vpcola | 25:326f00faa092 | 203 | if (ret != MBEDTLS_ERR_SSL_WANT_READ && |
vpcola | 25:326f00faa092 | 204 | ret != MBEDTLS_ERR_SSL_WANT_WRITE) |
vpcola | 26:4b21de8043a5 | 205 | mbedtls_printf("mbedtls_ssl_handshake returned [%x]\r\n", ret); |
vpcola | 25:326f00faa092 | 206 | else |
vpcola | 25:326f00faa092 | 207 | { |
vpcola | 25:326f00faa092 | 208 | // do not close the socket if timed out |
vpcola | 26:4b21de8043a5 | 209 | ret = TIMEOUT; |
vpcola | 25:326f00faa092 | 210 | } |
vpcola | 26:4b21de8043a5 | 211 | return ret; |
vpcola | 25:326f00faa092 | 212 | } |
vpcola | 25:326f00faa092 | 213 | |
vpcola | 25:326f00faa092 | 214 | /* Handshake done, time to print info */ |
vpcola | 30:b2aed80037db | 215 | DBG("TLS connection to %s:%d established\r\n", |
vpcola | 25:326f00faa092 | 216 | host.c_str(), port); |
vpcola | 25:326f00faa092 | 217 | |
vpcola | 25:326f00faa092 | 218 | const uint32_t buf_size = 1024; |
vpcola | 25:326f00faa092 | 219 | char *buf = new char[buf_size]; |
vpcola | 25:326f00faa092 | 220 | mbedtls_x509_crt_info(buf, buf_size, "\r ", |
vpcola | 25:326f00faa092 | 221 | mbedtls_ssl_get_peer_cert(&_ssl)); |
vpcola | 25:326f00faa092 | 222 | |
vpcola | 30:b2aed80037db | 223 | DBG("Server certificate:\r\n%s\r", buf); |
vpcola | 25:326f00faa092 | 224 | // Verify server cert ... |
vpcola | 25:326f00faa092 | 225 | uint32_t flags = mbedtls_ssl_get_verify_result(&_ssl); |
vpcola | 25:326f00faa092 | 226 | if( flags != 0 ) |
vpcola | 25:326f00faa092 | 227 | { |
vpcola | 25:326f00faa092 | 228 | mbedtls_x509_crt_verify_info(buf, buf_size, "\r ! ", flags); |
vpcola | 30:b2aed80037db | 229 | DBG("Certificate verification failed:\r\n%s\r\r\n", buf); |
vpcola | 25:326f00faa092 | 230 | // free server cert ... before error return |
vpcola | 25:326f00faa092 | 231 | delete [] buf; |
vpcola | 25:326f00faa092 | 232 | return -1; |
vpcola | 25:326f00faa092 | 233 | } |
vpcola | 25:326f00faa092 | 234 | |
vpcola | 30:b2aed80037db | 235 | DBG("Certificate verification passed\r\n\r\n"); |
vpcola | 25:326f00faa092 | 236 | // delete server cert after verification |
vpcola | 25:326f00faa092 | 237 | delete [] buf; |
vpcola | 25:326f00faa092 | 238 | |
vpcola | 30:b2aed80037db | 239 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola | 25:326f00faa092 | 240 | // TODO: Save the session here for reconnect. |
vpcola | 25:326f00faa092 | 241 | if( ( ret = mbedtls_ssl_get_session( &_ssl, &saved_session ) ) != 0 ) |
vpcola | 25:326f00faa092 | 242 | { |
vpcola | 26:4b21de8043a5 | 243 | mbedtls_printf( "mbedtls_ssl_get_session returned -0x%x\n\n", -ret ); |
vpcola | 26:4b21de8043a5 | 244 | hasSavedSession = false; |
vpcola | 25:326f00faa092 | 245 | return -1; |
vpcola | 25:326f00faa092 | 246 | } |
vpcola | 30:b2aed80037db | 247 | #endif |
vpcola | 30:b2aed80037db | 248 | DBG("Session saved for reconnect ...\r\n"); |
vpcola | 26:4b21de8043a5 | 249 | hasSavedSession = true; |
vpcola | 26:4b21de8043a5 | 250 | |
vpcola | 25:326f00faa092 | 251 | return 0; |
vpcola | 25:326f00faa092 | 252 | } |
vpcola | 25:326f00faa092 | 253 | |
vpcola | 23:06fac173529e | 254 | int MQTTThreadedClient::readBytesToBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 255 | { |
vpcola | 23:06fac173529e | 256 | int rc; |
vpcola | 25:326f00faa092 | 257 | |
vpcola | 23:06fac173529e | 258 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 259 | return -1; |
vpcola | 25:326f00faa092 | 260 | |
vpcola | 26:4b21de8043a5 | 261 | if (useTLS) |
vpcola | 25:326f00faa092 | 262 | { |
vpcola | 25:326f00faa092 | 263 | // Do SSL/TLS read |
vpcola | 25:326f00faa092 | 264 | rc = mbedtls_ssl_read(&_ssl, (unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 265 | if (MBEDTLS_ERR_SSL_WANT_READ == rc) |
vpcola | 25:326f00faa092 | 266 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 267 | else |
vpcola | 25:326f00faa092 | 268 | return rc; |
vpcola | 25:326f00faa092 | 269 | } else { |
vpcola | 25:326f00faa092 | 270 | // non-blocking socket ... |
vpcola | 25:326f00faa092 | 271 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 272 | rc = tcpSocket->recv( (void *) buffer, size); |
vpcola | 25:326f00faa092 | 273 | |
vpcola | 25:326f00faa092 | 274 | // return 0 bytes if timeout ... |
vpcola | 25:326f00faa092 | 275 | if (NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 276 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 277 | else |
vpcola | 25:326f00faa092 | 278 | return rc; // return the number of bytes received or error |
vpcola | 25:326f00faa092 | 279 | } |
vpcola | 23:06fac173529e | 280 | } |
vpcola | 23:06fac173529e | 281 | |
vpcola | 23:06fac173529e | 282 | int MQTTThreadedClient::sendBytesFromBuffer(char * buffer, size_t size, int timeout) |
vpcola | 23:06fac173529e | 283 | { |
vpcola | 23:06fac173529e | 284 | int rc; |
vpcola | 23:06fac173529e | 285 | |
vpcola | 23:06fac173529e | 286 | if (tcpSocket == NULL) |
vpcola | 23:06fac173529e | 287 | return -1; |
vpcola | 23:06fac173529e | 288 | |
vpcola | 26:4b21de8043a5 | 289 | if (useTLS) { |
vpcola | 25:326f00faa092 | 290 | // Do SSL/TLS write |
vpcola | 25:326f00faa092 | 291 | rc = mbedtls_ssl_write(&_ssl, (const unsigned char *) buffer, size); |
vpcola | 25:326f00faa092 | 292 | if (MBEDTLS_ERR_SSL_WANT_WRITE == rc) |
vpcola | 25:326f00faa092 | 293 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 294 | else |
vpcola | 25:326f00faa092 | 295 | return rc; |
vpcola | 25:326f00faa092 | 296 | } else { |
vpcola | 25:326f00faa092 | 297 | |
vpcola | 25:326f00faa092 | 298 | // set the write timeout |
vpcola | 25:326f00faa092 | 299 | tcpSocket->set_timeout(timeout); |
vpcola | 25:326f00faa092 | 300 | rc = tcpSocket->send(buffer, size); |
vpcola | 25:326f00faa092 | 301 | |
vpcola | 25:326f00faa092 | 302 | if ( NSAPI_ERROR_WOULD_BLOCK == rc) |
vpcola | 25:326f00faa092 | 303 | return TIMEOUT; |
vpcola | 25:326f00faa092 | 304 | else |
vpcola | 25:326f00faa092 | 305 | return rc; |
vpcola | 25:326f00faa092 | 306 | } |
vpcola | 23:06fac173529e | 307 | } |
vpcola | 23:06fac173529e | 308 | |
vpcola | 23:06fac173529e | 309 | int MQTTThreadedClient::readPacketLength(int* value) |
vpcola | 23:06fac173529e | 310 | { |
vpcola | 23:06fac173529e | 311 | int rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 312 | unsigned char c; |
vpcola | 23:06fac173529e | 313 | int multiplier = 1; |
vpcola | 23:06fac173529e | 314 | int len = 0; |
vpcola | 23:06fac173529e | 315 | const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; |
vpcola | 23:06fac173529e | 316 | |
vpcola | 23:06fac173529e | 317 | *value = 0; |
vpcola | 23:06fac173529e | 318 | do |
vpcola | 23:06fac173529e | 319 | { |
vpcola | 23:06fac173529e | 320 | if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) |
vpcola | 23:06fac173529e | 321 | { |
vpcola | 23:06fac173529e | 322 | rc = MQTTPACKET_READ_ERROR; /* bad data */ |
vpcola | 23:06fac173529e | 323 | goto exit; |
vpcola | 23:06fac173529e | 324 | } |
vpcola | 23:06fac173529e | 325 | |
vpcola | 23:06fac173529e | 326 | rc = readBytesToBuffer((char *) &c, 1, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 327 | if (rc != 1) |
vpcola | 23:06fac173529e | 328 | { |
vpcola | 23:06fac173529e | 329 | rc = MQTTPACKET_READ_ERROR; |
vpcola | 23:06fac173529e | 330 | goto exit; |
vpcola | 23:06fac173529e | 331 | } |
vpcola | 23:06fac173529e | 332 | |
vpcola | 23:06fac173529e | 333 | *value += (c & 127) * multiplier; |
vpcola | 23:06fac173529e | 334 | multiplier *= 128; |
vpcola | 23:06fac173529e | 335 | } while ((c & 128) != 0); |
vpcola | 23:06fac173529e | 336 | |
vpcola | 23:06fac173529e | 337 | rc = MQTTPACKET_READ_COMPLETE; |
vpcola | 23:06fac173529e | 338 | |
vpcola | 23:06fac173529e | 339 | exit: |
vpcola | 23:06fac173529e | 340 | if (rc == MQTTPACKET_READ_ERROR ) |
vpcola | 23:06fac173529e | 341 | len = -1; |
vpcola | 23:06fac173529e | 342 | |
vpcola | 23:06fac173529e | 343 | return len; |
vpcola | 23:06fac173529e | 344 | } |
vpcola | 23:06fac173529e | 345 | |
vpcola | 23:06fac173529e | 346 | int MQTTThreadedClient::sendPacket(size_t length) |
vpcola | 23:06fac173529e | 347 | { |
vpcola | 23:06fac173529e | 348 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 349 | int sent = 0; |
vpcola | 23:06fac173529e | 350 | |
vpcola | 23:06fac173529e | 351 | while (sent < length) |
vpcola | 23:06fac173529e | 352 | { |
vpcola | 23:06fac173529e | 353 | rc = sendBytesFromBuffer((char *) &sendbuf[sent], length - sent, DEFAULT_SOCKET_TIMEOUT); |
vpcola | 23:06fac173529e | 354 | if (rc < 0) // there was an error writing the data |
vpcola | 23:06fac173529e | 355 | break; |
vpcola | 23:06fac173529e | 356 | sent += rc; |
vpcola | 23:06fac173529e | 357 | } |
vpcola | 23:06fac173529e | 358 | |
vpcola | 23:06fac173529e | 359 | if (sent == length) |
vpcola | 23:06fac173529e | 360 | rc = SUCCESS; |
vpcola | 23:06fac173529e | 361 | else |
vpcola | 23:06fac173529e | 362 | rc = FAILURE; |
vpcola | 23:06fac173529e | 363 | |
vpcola | 23:06fac173529e | 364 | return rc; |
vpcola | 23:06fac173529e | 365 | } |
vpcola | 23:06fac173529e | 366 | /** |
vpcola | 23:06fac173529e | 367 | * Reads the entire packet to readbuf and returns |
vpcola | 23:06fac173529e | 368 | * the type of packet when successful, otherwise |
vpcola | 23:06fac173529e | 369 | * a negative error code is returned. |
vpcola | 23:06fac173529e | 370 | **/ |
vpcola | 23:06fac173529e | 371 | int MQTTThreadedClient::readPacket() |
vpcola | 23:06fac173529e | 372 | { |
vpcola | 23:06fac173529e | 373 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 374 | MQTTHeader header = {0}; |
vpcola | 23:06fac173529e | 375 | int len = 0; |
vpcola | 23:06fac173529e | 376 | int rem_len = 0; |
vpcola | 23:06fac173529e | 377 | |
vpcola | 23:06fac173529e | 378 | /* 1. read the header byte. This has the packet type in it */ |
vpcola | 23:06fac173529e | 379 | if ( (rc = readBytesToBuffer((char *) &readbuf[0], 1, DEFAULT_SOCKET_TIMEOUT)) != 1) |
vpcola | 23:06fac173529e | 380 | goto exit; |
vpcola | 23:06fac173529e | 381 | |
vpcola | 23:06fac173529e | 382 | len = 1; |
vpcola | 23:06fac173529e | 383 | /* 2. read the remaining length. This is variable in itself */ |
vpcola | 23:06fac173529e | 384 | if ( readPacketLength(&rem_len) < 0 ) |
vpcola | 23:06fac173529e | 385 | goto exit; |
vpcola | 23:06fac173529e | 386 | |
vpcola | 23:06fac173529e | 387 | len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length into the buffer */ |
vpcola | 23:06fac173529e | 388 | |
vpcola | 23:06fac173529e | 389 | if (rem_len > (MAX_MQTT_PACKET_SIZE - len)) |
vpcola | 23:06fac173529e | 390 | { |
vpcola | 23:06fac173529e | 391 | rc = BUFFER_OVERFLOW; |
vpcola | 23:06fac173529e | 392 | goto exit; |
vpcola | 23:06fac173529e | 393 | } |
vpcola | 23:06fac173529e | 394 | |
vpcola | 23:06fac173529e | 395 | /* 3. read the rest of the buffer using a callback to supply the rest of the data */ |
vpcola | 23:06fac173529e | 396 | if (rem_len > 0 && (readBytesToBuffer((char *) (readbuf + len), rem_len, DEFAULT_SOCKET_TIMEOUT) != rem_len)) |
vpcola | 23:06fac173529e | 397 | goto exit; |
vpcola | 23:06fac173529e | 398 | |
vpcola | 23:06fac173529e | 399 | // Convert the header to type |
vpcola | 23:06fac173529e | 400 | // and update rc |
vpcola | 23:06fac173529e | 401 | header.byte = readbuf[0]; |
vpcola | 23:06fac173529e | 402 | rc = header.bits.type; |
vpcola | 23:06fac173529e | 403 | |
vpcola | 23:06fac173529e | 404 | exit: |
vpcola | 23:06fac173529e | 405 | |
vpcola | 23:06fac173529e | 406 | return rc; |
vpcola | 23:06fac173529e | 407 | } |
vpcola | 23:06fac173529e | 408 | |
vpcola | 23:06fac173529e | 409 | /** |
vpcola | 23:06fac173529e | 410 | * Read until a specified packet type is received, or untill the specified |
vpcola | 23:06fac173529e | 411 | * timeout dropping packets along the way. |
vpcola | 23:06fac173529e | 412 | **/ |
vpcola | 23:06fac173529e | 413 | int MQTTThreadedClient::readUntil(int packetType, int timeout) |
vpcola | 23:06fac173529e | 414 | { |
vpcola | 23:06fac173529e | 415 | int pType = FAILURE; |
vpcola | 23:06fac173529e | 416 | Timer timer; |
vpcola | 23:06fac173529e | 417 | |
vpcola | 23:06fac173529e | 418 | timer.start(); |
vpcola | 23:06fac173529e | 419 | do { |
vpcola | 23:06fac173529e | 420 | pType = readPacket(); |
vpcola | 23:06fac173529e | 421 | if (pType < 0) |
vpcola | 23:06fac173529e | 422 | break; |
vpcola | 23:06fac173529e | 423 | |
vpcola | 23:06fac173529e | 424 | if (timer.read_ms() > timeout) |
vpcola | 23:06fac173529e | 425 | { |
vpcola | 23:06fac173529e | 426 | pType = FAILURE; |
vpcola | 23:06fac173529e | 427 | break; |
vpcola | 23:06fac173529e | 428 | } |
vpcola | 23:06fac173529e | 429 | }while(pType != packetType); |
vpcola | 23:06fac173529e | 430 | |
vpcola | 23:06fac173529e | 431 | return pType; |
vpcola | 23:06fac173529e | 432 | } |
vpcola | 23:06fac173529e | 433 | |
vpcola | 23:06fac173529e | 434 | |
vpcola | 26:4b21de8043a5 | 435 | int MQTTThreadedClient::login() |
vpcola | 23:06fac173529e | 436 | { |
vpcola | 23:06fac173529e | 437 | int rc = FAILURE; |
vpcola | 23:06fac173529e | 438 | int len = 0; |
vpcola | 23:06fac173529e | 439 | |
vpcola | 26:4b21de8043a5 | 440 | if (!isConnected) |
vpcola | 23:06fac173529e | 441 | { |
vpcola | 30:b2aed80037db | 442 | DBG("Session not connected! \r\n"); |
vpcola | 23:06fac173529e | 443 | return rc; |
vpcola | 23:06fac173529e | 444 | } |
vpcola | 23:06fac173529e | 445 | |
vpcola | 23:06fac173529e | 446 | // Copy the keepAliveInterval value to local |
vpcola | 23:06fac173529e | 447 | // MQTT specifies in seconds, we have to multiply that |
vpcola | 23:06fac173529e | 448 | // amount for our 32 bit timers which accepts ms. |
vpcola | 26:4b21de8043a5 | 449 | keepAliveInterval = (connect_options.keepAliveInterval * 1000); |
vpcola | 23:06fac173529e | 450 | |
vpcola | 30:b2aed80037db | 451 | DBG("Login with: \r\n"); |
vpcola | 30:b2aed80037db | 452 | DBG("\tUsername: [%s]\r\n", connect_options.username.cstring); |
vpcola | 30:b2aed80037db | 453 | DBG("\tPassword: [%s]\r\n", connect_options.password.cstring); |
vpcola | 23:06fac173529e | 454 | |
vpcola | 26:4b21de8043a5 | 455 | if ((len = MQTTSerialize_connect(sendbuf, MAX_MQTT_PACKET_SIZE, &connect_options)) <= 0) |
vpcola | 23:06fac173529e | 456 | { |
vpcola | 30:b2aed80037db | 457 | DBG("Error serializing connect packet ...\r\n"); |
vpcola | 23:06fac173529e | 458 | return rc; |
vpcola | 23:06fac173529e | 459 | } |
vpcola | 23:06fac173529e | 460 | if ((rc = sendPacket((size_t) len)) != SUCCESS) // send the connect packet |
vpcola | 23:06fac173529e | 461 | { |
vpcola | 30:b2aed80037db | 462 | DBG("Error sending the connect request packet ...\r\n"); |
vpcola | 23:06fac173529e | 463 | return rc; |
vpcola | 23:06fac173529e | 464 | } |
vpcola | 23:06fac173529e | 465 | |
vpcola | 23:06fac173529e | 466 | // Wait for the CONNACK |
vpcola | 23:06fac173529e | 467 | if (readUntil(CONNACK, COMMAND_TIMEOUT) == CONNACK) |
vpcola | 23:06fac173529e | 468 | { |
vpcola | 23:06fac173529e | 469 | unsigned char connack_rc = 255; |
vpcola | 23:06fac173529e | 470 | bool sessionPresent = false; |
vpcola | 30:b2aed80037db | 471 | DBG("Connection acknowledgement received ... deserializing respones ...\r\n"); |
vpcola | 23:06fac173529e | 472 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent, &connack_rc, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 23:06fac173529e | 473 | rc = connack_rc; |
vpcola | 23:06fac173529e | 474 | else |
vpcola | 23:06fac173529e | 475 | rc = FAILURE; |
vpcola | 23:06fac173529e | 476 | } |
vpcola | 23:06fac173529e | 477 | else |
vpcola | 23:06fac173529e | 478 | rc = FAILURE; |
vpcola | 23:06fac173529e | 479 | |
vpcola | 23:06fac173529e | 480 | if (rc == SUCCESS) |
vpcola | 23:06fac173529e | 481 | { |
vpcola | 30:b2aed80037db | 482 | DBG("Connected!!! ... starting connection timers ...\r\n"); |
vpcola | 23:06fac173529e | 483 | resetConnectionTimer(); |
vpcola | 23:06fac173529e | 484 | } |
vpcola | 23:06fac173529e | 485 | |
vpcola | 30:b2aed80037db | 486 | DBG("Returning with rc = %d\r\n", rc); |
vpcola | 23:06fac173529e | 487 | |
vpcola | 23:06fac173529e | 488 | return rc; |
vpcola | 23:06fac173529e | 489 | } |
vpcola | 23:06fac173529e | 490 | |
vpcola | 25:326f00faa092 | 491 | |
vpcola | 25:326f00faa092 | 492 | void MQTTThreadedClient::disconnect() |
vpcola | 25:326f00faa092 | 493 | { |
vpcola | 25:326f00faa092 | 494 | if (isConnected) |
vpcola | 25:326f00faa092 | 495 | { |
vpcola | 26:4b21de8043a5 | 496 | if( useTLS |
vpcola | 26:4b21de8043a5 | 497 | && ( mbedtls_ssl_session_reset( &_ssl ) != 0 ) |
vpcola | 26:4b21de8043a5 | 498 | ) |
vpcola | 26:4b21de8043a5 | 499 | { |
vpcola | 30:b2aed80037db | 500 | DBG( "Session reset returned an error \r\n"); |
vpcola | 26:4b21de8043a5 | 501 | } |
vpcola | 25:326f00faa092 | 502 | |
vpcola | 25:326f00faa092 | 503 | isConnected = false; |
vpcola | 25:326f00faa092 | 504 | tcpSocket->close(); |
vpcola | 25:326f00faa092 | 505 | } |
vpcola | 25:326f00faa092 | 506 | } |
vpcola | 25:326f00faa092 | 507 | |
vpcola | 26:4b21de8043a5 | 508 | int MQTTThreadedClient::connect() |
vpcola | 23:06fac173529e | 509 | { |
vpcola | 25:326f00faa092 | 510 | int ret = FAILURE; |
vpcola | 26:4b21de8043a5 | 511 | |
vpcola | 26:4b21de8043a5 | 512 | if ((network == NULL) || (tcpSocket == NULL) |
vpcola | 26:4b21de8043a5 | 513 | || host.empty()) |
vpcola | 26:4b21de8043a5 | 514 | { |
vpcola | 30:b2aed80037db | 515 | DBG("Network settings not set! \r\n"); |
vpcola | 26:4b21de8043a5 | 516 | return ret; |
vpcola | 26:4b21de8043a5 | 517 | } |
vpcola | 25:326f00faa092 | 518 | |
vpcola | 26:4b21de8043a5 | 519 | if (useTLS) |
vpcola | 26:4b21de8043a5 | 520 | { |
vpcola | 26:4b21de8043a5 | 521 | if( ( ret = mbedtls_ssl_session_reset( &_ssl ) ) != 0 ) { |
vpcola | 26:4b21de8043a5 | 522 | mbedtls_printf( " failed\n ! mbedtls_ssl_session_reset returned -0x%x\n\n", -ret ); |
vpcola | 26:4b21de8043a5 | 523 | return ret; |
vpcola | 26:4b21de8043a5 | 524 | } |
vpcola | 30:b2aed80037db | 525 | #if defined(MBEDTLS_SSL_CLI_C) |
vpcola | 26:4b21de8043a5 | 526 | if ( hasSavedSession && (( ret = mbedtls_ssl_set_session( &_ssl, &saved_session ) ) != 0 )) { |
vpcola | 26:4b21de8043a5 | 527 | mbedtls_printf( " failed\n ! mbedtls_ssl_conf_session returned %d\n\n", ret ); |
vpcola | 26:4b21de8043a5 | 528 | return ret; |
vpcola | 26:4b21de8043a5 | 529 | } |
vpcola | 30:b2aed80037db | 530 | #endif |
vpcola | 26:4b21de8043a5 | 531 | } |
vpcola | 26:4b21de8043a5 | 532 | |
vpcola | 23:06fac173529e | 533 | tcpSocket->open(network); |
vpcola | 26:4b21de8043a5 | 534 | if (useTLS) |
vpcola | 25:326f00faa092 | 535 | { |
vpcola | 30:b2aed80037db | 536 | DBG("mbedtls_ssl_set_hostname ...\r\n"); |
vpcola | 25:326f00faa092 | 537 | mbedtls_ssl_set_hostname(&_ssl, host.c_str()); |
vpcola | 30:b2aed80037db | 538 | DBG("mbedtls_ssl_set_bio ...\r\n"); |
vpcola | 25:326f00faa092 | 539 | mbedtls_ssl_set_bio(&_ssl, static_cast<void *>(tcpSocket), |
vpcola | 25:326f00faa092 | 540 | ssl_send, ssl_recv, NULL ); |
vpcola | 25:326f00faa092 | 541 | } |
vpcola | 25:326f00faa092 | 542 | |
vpcola | 25:326f00faa092 | 543 | if (( ret = tcpSocket->connect(host.c_str(), port)) < 0 ) |
vpcola | 23:06fac173529e | 544 | { |
vpcola | 30:b2aed80037db | 545 | DBG("Error connecting to %s:%d with %d\r\n", host.c_str(), port, ret); |
vpcola | 23:06fac173529e | 546 | return ret; |
vpcola | 26:4b21de8043a5 | 547 | }else |
vpcola | 26:4b21de8043a5 | 548 | isConnected = true; |
vpcola | 25:326f00faa092 | 549 | |
vpcola | 26:4b21de8043a5 | 550 | if (useTLS) |
vpcola | 25:326f00faa092 | 551 | { |
vpcola | 26:4b21de8043a5 | 552 | |
vpcola | 26:4b21de8043a5 | 553 | if (doTLSHandshake() < 0) |
vpcola | 26:4b21de8043a5 | 554 | { |
vpcola | 30:b2aed80037db | 555 | DBG("TLS Handshake failed! \r\n"); |
vpcola | 26:4b21de8043a5 | 556 | return FAILURE; |
vpcola | 26:4b21de8043a5 | 557 | }else |
vpcola | 30:b2aed80037db | 558 | DBG("TLS Handshake complete!! \r\n"); |
vpcola | 25:326f00faa092 | 559 | } |
vpcola | 25:326f00faa092 | 560 | |
vpcola | 26:4b21de8043a5 | 561 | return login(); |
vpcola | 26:4b21de8043a5 | 562 | } |
vpcola | 26:4b21de8043a5 | 563 | |
vpcola | 26:4b21de8043a5 | 564 | void MQTTThreadedClient::setConnectionParameters(const char * chost, uint16_t cport, MQTTPacket_connectData & options) |
vpcola | 26:4b21de8043a5 | 565 | { |
vpcola | 26:4b21de8043a5 | 566 | // Copy the settings for reconnection |
vpcola | 26:4b21de8043a5 | 567 | host = chost; |
vpcola | 26:4b21de8043a5 | 568 | port = cport; |
vpcola | 26:4b21de8043a5 | 569 | connect_options = options; |
vpcola | 23:06fac173529e | 570 | } |
vpcola | 23:06fac173529e | 571 | |
vpcola | 23:06fac173529e | 572 | int MQTTThreadedClient::publish(PubMessage& msg) |
vpcola | 23:06fac173529e | 573 | { |
vpcola | 23:06fac173529e | 574 | #if 0 |
vpcola | 23:06fac173529e | 575 | int id = queue.call(mbed::callback(this, &MQTTThreadedClient::sendPublish), topic, message); |
vpcola | 23:06fac173529e | 576 | // TODO: handle id values when the function is called later |
vpcola | 23:06fac173529e | 577 | if (id == 0) |
vpcola | 23:06fac173529e | 578 | return FAILURE; |
vpcola | 23:06fac173529e | 579 | else |
vpcola | 23:06fac173529e | 580 | return SUCCESS; |
vpcola | 23:06fac173529e | 581 | #endif |
vpcola | 23:06fac173529e | 582 | PubMessage *message = mpool.alloc(); |
vpcola | 23:06fac173529e | 583 | // Simple copy |
vpcola | 23:06fac173529e | 584 | *message = msg; |
vpcola | 23:06fac173529e | 585 | |
vpcola | 23:06fac173529e | 586 | // Push the data to the thread |
vpcola | 30:b2aed80037db | 587 | DBG("[Thread:%d] Pushing data to consumer thread ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 588 | mqueue.put(message); |
vpcola | 23:06fac173529e | 589 | |
vpcola | 23:06fac173529e | 590 | return SUCCESS; |
vpcola | 23:06fac173529e | 591 | } |
vpcola | 23:06fac173529e | 592 | |
vpcola | 23:06fac173529e | 593 | int MQTTThreadedClient::sendPublish(PubMessage& message) |
vpcola | 23:06fac173529e | 594 | { |
vpcola | 23:06fac173529e | 595 | MQTTString topicString = MQTTString_initializer; |
vpcola | 23:06fac173529e | 596 | |
vpcola | 23:06fac173529e | 597 | if (!isConnected) |
vpcola | 23:06fac173529e | 598 | { |
vpcola | 30:b2aed80037db | 599 | DBG("[Thread:%d] Not connected!!! ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 600 | return FAILURE; |
vpcola | 23:06fac173529e | 601 | } |
vpcola | 23:06fac173529e | 602 | |
vpcola | 23:06fac173529e | 603 | topicString.cstring = (char*) &message.topic[0]; |
vpcola | 23:06fac173529e | 604 | int len = MQTTSerialize_publish(sendbuf, MAX_MQTT_PACKET_SIZE, 0, message.qos, false, message.id, |
vpcola | 23:06fac173529e | 605 | topicString, (unsigned char*) &message.payload[0], (int) message.payloadlen); |
vpcola | 23:06fac173529e | 606 | if (len <= 0) |
vpcola | 23:06fac173529e | 607 | { |
vpcola | 30:b2aed80037db | 608 | DBG("[Thread:%d]Failed serializing message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 609 | return FAILURE; |
vpcola | 23:06fac173529e | 610 | } |
vpcola | 23:06fac173529e | 611 | |
vpcola | 23:06fac173529e | 612 | if (sendPacket(len) == SUCCESS) |
vpcola | 23:06fac173529e | 613 | { |
vpcola | 30:b2aed80037db | 614 | DBG("[Thread:%d]Successfully sent publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 615 | return SUCCESS; |
vpcola | 23:06fac173529e | 616 | } |
vpcola | 23:06fac173529e | 617 | |
vpcola | 30:b2aed80037db | 618 | DBG("[Thread:%d]Failed to send publish packet to server ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 619 | return FAILURE; |
vpcola | 23:06fac173529e | 620 | } |
vpcola | 25:326f00faa092 | 621 | |
vpcola | 25:326f00faa092 | 622 | void MQTTThreadedClient::addTopicHandler(const char * topicstr, void (*function)(MessageData &)) |
vpcola | 25:326f00faa092 | 623 | { |
vpcola | 25:326f00faa092 | 624 | // Push the subscription into the map ... |
vpcola | 25:326f00faa092 | 625 | FP<void,MessageData &> fp; |
vpcola | 25:326f00faa092 | 626 | fp.attach(function); |
vpcola | 23:06fac173529e | 627 | |
vpcola | 26:4b21de8043a5 | 628 | topicCBMap.insert(std::pair<std::string, FP<void,MessageData &> >(std::string(topicstr),fp)); |
vpcola | 25:326f00faa092 | 629 | } |
vpcola | 25:326f00faa092 | 630 | |
vpcola | 25:326f00faa092 | 631 | int MQTTThreadedClient::processSubscriptions() |
vpcola | 25:326f00faa092 | 632 | { |
vpcola | 25:326f00faa092 | 633 | int numsubscribed = 0; |
vpcola | 25:326f00faa092 | 634 | |
vpcola | 25:326f00faa092 | 635 | if (!isConnected) |
vpcola | 25:326f00faa092 | 636 | { |
vpcola | 30:b2aed80037db | 637 | DBG("Session not connected!!\r\n"); |
vpcola | 25:326f00faa092 | 638 | return 0; |
vpcola | 25:326f00faa092 | 639 | } |
vpcola | 25:326f00faa092 | 640 | |
vpcola | 30:b2aed80037db | 641 | DBG("Processing subscribed topics ....\r\n"); |
vpcola | 26:4b21de8043a5 | 642 | |
vpcola | 25:326f00faa092 | 643 | std::map<std::string, FP<void, MessageData &> >::iterator it; |
vpcola | 26:4b21de8043a5 | 644 | for(it = topicCBMap.begin(); it != topicCBMap.end(); it++) |
vpcola | 25:326f00faa092 | 645 | { |
vpcola | 25:326f00faa092 | 646 | int rc = FAILURE; |
vpcola | 25:326f00faa092 | 647 | int len = 0; |
vpcola | 25:326f00faa092 | 648 | //TODO: We only subscribe to QoS = 0 for now |
vpcola | 25:326f00faa092 | 649 | QoS qos = QOS0; |
vpcola | 25:326f00faa092 | 650 | |
vpcola | 25:326f00faa092 | 651 | MQTTString topic = {(char*)it->first.c_str(), {0, 0}}; |
vpcola | 30:b2aed80037db | 652 | DBG("Subscribing to topic [%s]\r\n", topic.cstring); |
vpcola | 25:326f00faa092 | 653 | |
vpcola | 25:326f00faa092 | 654 | |
vpcola | 25:326f00faa092 | 655 | len = MQTTSerialize_subscribe(sendbuf, MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); |
vpcola | 25:326f00faa092 | 656 | if (len <= 0) { |
vpcola | 30:b2aed80037db | 657 | DBG("Error serializing subscribe packet ...\r\n"); |
vpcola | 25:326f00faa092 | 658 | continue; |
vpcola | 25:326f00faa092 | 659 | } |
vpcola | 25:326f00faa092 | 660 | |
vpcola | 25:326f00faa092 | 661 | if ((rc = sendPacket(len)) != SUCCESS) { |
vpcola | 30:b2aed80037db | 662 | DBG("Error sending subscribe packet [%d]\r\n", rc); |
vpcola | 25:326f00faa092 | 663 | continue; |
vpcola | 25:326f00faa092 | 664 | } |
vpcola | 25:326f00faa092 | 665 | |
vpcola | 30:b2aed80037db | 666 | DBG("Waiting for subscription ack ...\r\n"); |
vpcola | 25:326f00faa092 | 667 | // Wait for SUBACK, dropping packets read along the way ... |
vpcola | 25:326f00faa092 | 668 | if (readUntil(SUBACK, COMMAND_TIMEOUT) == SUBACK) { // wait for suback |
vpcola | 25:326f00faa092 | 669 | int count = 0, grantedQoS = -1; |
vpcola | 25:326f00faa092 | 670 | unsigned short mypacketid; |
vpcola | 25:326f00faa092 | 671 | if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, MAX_MQTT_PACKET_SIZE) == 1) |
vpcola | 25:326f00faa092 | 672 | rc = grantedQoS; // 0, 1, 2 or 0x80 |
vpcola | 25:326f00faa092 | 673 | // For as long as we do not get 0x80 .. |
vpcola | 25:326f00faa092 | 674 | if (rc != 0x80) |
vpcola | 25:326f00faa092 | 675 | { |
vpcola | 25:326f00faa092 | 676 | // Reset connection timers here ... |
vpcola | 25:326f00faa092 | 677 | resetConnectionTimer(); |
vpcola | 30:b2aed80037db | 678 | DBG("Successfully subscribed to %s ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 679 | numsubscribed++; |
vpcola | 25:326f00faa092 | 680 | } else { |
vpcola | 30:b2aed80037db | 681 | DBG("Failed to subscribe to topic %s ... (not authorized?)\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 682 | } |
vpcola | 25:326f00faa092 | 683 | } else |
vpcola | 30:b2aed80037db | 684 | DBG("Failed to subscribe to topic %s (ack not received) ...\r\n", it->first.c_str()); |
vpcola | 25:326f00faa092 | 685 | } // end for loop |
vpcola | 25:326f00faa092 | 686 | |
vpcola | 25:326f00faa092 | 687 | return numsubscribed; |
vpcola | 25:326f00faa092 | 688 | } |
vpcola | 25:326f00faa092 | 689 | |
vpcola | 23:06fac173529e | 690 | bool MQTTThreadedClient::isTopicMatched(char* topicFilter, MQTTString& topicName) |
vpcola | 23:06fac173529e | 691 | { |
vpcola | 23:06fac173529e | 692 | char* curf = topicFilter; |
vpcola | 23:06fac173529e | 693 | char* curn = topicName.lenstring.data; |
vpcola | 23:06fac173529e | 694 | char* curn_end = curn + topicName.lenstring.len; |
vpcola | 23:06fac173529e | 695 | |
vpcola | 23:06fac173529e | 696 | while (*curf && curn < curn_end) |
vpcola | 23:06fac173529e | 697 | { |
vpcola | 23:06fac173529e | 698 | if (*curn == '/' && *curf != '/') |
vpcola | 23:06fac173529e | 699 | break; |
vpcola | 23:06fac173529e | 700 | if (*curf != '+' && *curf != '#' && *curf != *curn) |
vpcola | 23:06fac173529e | 701 | break; |
vpcola | 23:06fac173529e | 702 | if (*curf == '+') |
vpcola | 23:06fac173529e | 703 | { // skip until we meet the next separator, or end of string |
vpcola | 23:06fac173529e | 704 | char* nextpos = curn + 1; |
vpcola | 23:06fac173529e | 705 | while (nextpos < curn_end && *nextpos != '/') |
vpcola | 23:06fac173529e | 706 | nextpos = ++curn + 1; |
vpcola | 23:06fac173529e | 707 | } |
vpcola | 23:06fac173529e | 708 | else if (*curf == '#') |
vpcola | 23:06fac173529e | 709 | curn = curn_end - 1; // skip until end of string |
vpcola | 23:06fac173529e | 710 | curf++; |
vpcola | 23:06fac173529e | 711 | curn++; |
vpcola | 23:06fac173529e | 712 | }; |
vpcola | 23:06fac173529e | 713 | |
vpcola | 23:06fac173529e | 714 | return (curn == curn_end) && (*curf == '\0'); |
vpcola | 23:06fac173529e | 715 | } |
vpcola | 23:06fac173529e | 716 | |
vpcola | 23:06fac173529e | 717 | int MQTTThreadedClient::handlePublishMsg() |
vpcola | 23:06fac173529e | 718 | { |
vpcola | 23:06fac173529e | 719 | MQTTString topicName = MQTTString_initializer; |
vpcola | 23:06fac173529e | 720 | Message msg; |
vpcola | 23:06fac173529e | 721 | int intQoS; |
vpcola | 30:b2aed80037db | 722 | DBG("[Thread:%d]Deserializing publish message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 723 | if (MQTTDeserialize_publish((unsigned char*)&msg.dup, |
vpcola | 23:06fac173529e | 724 | &intQoS, |
vpcola | 23:06fac173529e | 725 | (unsigned char*)&msg.retained, |
vpcola | 23:06fac173529e | 726 | (unsigned short*)&msg.id, |
vpcola | 23:06fac173529e | 727 | &topicName, |
vpcola | 23:06fac173529e | 728 | (unsigned char**)&msg.payload, |
vpcola | 23:06fac173529e | 729 | (int*)&msg.payloadlen, readbuf, MAX_MQTT_PACKET_SIZE) != 1) |
vpcola | 23:06fac173529e | 730 | { |
vpcola | 30:b2aed80037db | 731 | DBG("[Thread:%d]Error deserializing published message ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 732 | return -1; |
vpcola | 23:06fac173529e | 733 | } |
vpcola | 23:06fac173529e | 734 | |
vpcola | 23:06fac173529e | 735 | std::string topic; |
vpcola | 23:06fac173529e | 736 | if (topicName.lenstring.len > 0) |
vpcola | 23:06fac173529e | 737 | { |
vpcola | 23:06fac173529e | 738 | topic = std::string((const char *) topicName.lenstring.data, (size_t) topicName.lenstring.len); |
vpcola | 23:06fac173529e | 739 | }else |
vpcola | 23:06fac173529e | 740 | topic = (const char *) topicName.cstring; |
vpcola | 23:06fac173529e | 741 | |
vpcola | 30:b2aed80037db | 742 | DBG("[Thread:%d]Got message for topic [%s], QoS [%d] ...\r\n", Thread::gettid(), topic.c_str(), intQoS); |
vpcola | 23:06fac173529e | 743 | |
vpcola | 23:06fac173529e | 744 | msg.qos = (QoS) intQoS; |
vpcola | 23:06fac173529e | 745 | |
vpcola | 23:06fac173529e | 746 | |
vpcola | 23:06fac173529e | 747 | // Call the handlers for each topic |
vpcola | 23:06fac173529e | 748 | if (topicCBMap.find(topic) != topicCBMap.end()) |
vpcola | 23:06fac173529e | 749 | { |
vpcola | 23:06fac173529e | 750 | // Call the callback function |
vpcola | 23:06fac173529e | 751 | if (topicCBMap[topic].attached()) |
vpcola | 23:06fac173529e | 752 | { |
vpcola | 30:b2aed80037db | 753 | DBG("[Thread:%d]Invoking function handler for topic ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 754 | MessageData md(topicName, msg); |
vpcola | 23:06fac173529e | 755 | topicCBMap[topic](md); |
vpcola | 23:06fac173529e | 756 | |
vpcola | 23:06fac173529e | 757 | return 1; |
vpcola | 23:06fac173529e | 758 | } |
vpcola | 23:06fac173529e | 759 | } |
vpcola | 23:06fac173529e | 760 | |
vpcola | 23:06fac173529e | 761 | // TODO: depending on the QoS |
vpcola | 23:06fac173529e | 762 | // we send data to the server = PUBACK or PUBREC |
vpcola | 23:06fac173529e | 763 | switch(intQoS) |
vpcola | 23:06fac173529e | 764 | { |
vpcola | 23:06fac173529e | 765 | case QOS0: |
vpcola | 23:06fac173529e | 766 | // We send back nothing ... |
vpcola | 23:06fac173529e | 767 | break; |
vpcola | 23:06fac173529e | 768 | case QOS1: |
vpcola | 23:06fac173529e | 769 | // TODO: implement |
vpcola | 23:06fac173529e | 770 | break; |
vpcola | 23:06fac173529e | 771 | case QOS2: |
vpcola | 23:06fac173529e | 772 | // TODO: implement |
vpcola | 23:06fac173529e | 773 | break; |
vpcola | 23:06fac173529e | 774 | default: |
vpcola | 23:06fac173529e | 775 | break; |
vpcola | 23:06fac173529e | 776 | } |
vpcola | 23:06fac173529e | 777 | |
vpcola | 23:06fac173529e | 778 | return 0; |
vpcola | 23:06fac173529e | 779 | } |
vpcola | 23:06fac173529e | 780 | |
vpcola | 23:06fac173529e | 781 | void MQTTThreadedClient::resetConnectionTimer() |
vpcola | 23:06fac173529e | 782 | { |
vpcola | 23:06fac173529e | 783 | if (keepAliveInterval > 0) |
vpcola | 23:06fac173529e | 784 | { |
vpcola | 23:06fac173529e | 785 | comTimer.reset(); |
vpcola | 23:06fac173529e | 786 | comTimer.start(); |
vpcola | 23:06fac173529e | 787 | } |
vpcola | 23:06fac173529e | 788 | } |
vpcola | 23:06fac173529e | 789 | |
vpcola | 23:06fac173529e | 790 | bool MQTTThreadedClient::hasConnectionTimedOut() |
vpcola | 23:06fac173529e | 791 | { |
vpcola | 23:06fac173529e | 792 | if (keepAliveInterval > 0 ) { |
vpcola | 23:06fac173529e | 793 | // Check connection timer |
vpcola | 23:06fac173529e | 794 | if (comTimer.read_ms() > keepAliveInterval) |
vpcola | 23:06fac173529e | 795 | return true; |
vpcola | 23:06fac173529e | 796 | else |
vpcola | 23:06fac173529e | 797 | return false; |
vpcola | 23:06fac173529e | 798 | } |
vpcola | 23:06fac173529e | 799 | |
vpcola | 23:06fac173529e | 800 | return false; |
vpcola | 23:06fac173529e | 801 | } |
vpcola | 23:06fac173529e | 802 | |
vpcola | 23:06fac173529e | 803 | void MQTTThreadedClient::sendPingRequest() |
vpcola | 23:06fac173529e | 804 | { |
vpcola | 23:06fac173529e | 805 | int len = MQTTSerialize_pingreq(sendbuf, MAX_MQTT_PACKET_SIZE); |
vpcola | 23:06fac173529e | 806 | if (len > 0 && (sendPacket(len) == SUCCESS)) // send the ping packet |
vpcola | 23:06fac173529e | 807 | { |
vpcola | 30:b2aed80037db | 808 | DBG("[Thread:%d]Ping request sent successfully ...\r\n", Thread::gettid()); |
vpcola | 23:06fac173529e | 809 | } |
vpcola | 23:06fac173529e | 810 | } |
vpcola | 23:06fac173529e | 811 | |
vpcola | 23:06fac173529e | 812 | void MQTTThreadedClient::startListener() |
vpcola | 23:06fac173529e | 813 | { |
vpcola | 23:06fac173529e | 814 | int pType; |
vpcola | 26:4b21de8043a5 | 815 | int numsubs; |
vpcola | 23:06fac173529e | 816 | // Continuesly listens for packets and dispatch |
vpcola | 23:06fac173529e | 817 | // message handlers ... |
vpcola | 26:4b21de8043a5 | 818 | if (useTLS) |
vpcola | 26:4b21de8043a5 | 819 | { |
vpcola | 26:4b21de8043a5 | 820 | initTLS(); |
vpcola | 26:4b21de8043a5 | 821 | } |
vpcola | 26:4b21de8043a5 | 822 | |
vpcola | 26:4b21de8043a5 | 823 | while(true) |
vpcola | 26:4b21de8043a5 | 824 | { |
vpcola | 26:4b21de8043a5 | 825 | |
vpcola | 26:4b21de8043a5 | 826 | // Attempt to reconnect and login |
vpcola | 26:4b21de8043a5 | 827 | if ( connect() < 0 ) |
vpcola | 23:06fac173529e | 828 | { |
vpcola | 26:4b21de8043a5 | 829 | disconnect(); |
vpcola | 26:4b21de8043a5 | 830 | // Wait for a few secs and reconnect ... |
vpcola | 26:4b21de8043a5 | 831 | Thread::wait(6000); |
vpcola | 26:4b21de8043a5 | 832 | continue; |
vpcola | 23:06fac173529e | 833 | } |
vpcola | 23:06fac173529e | 834 | |
vpcola | 26:4b21de8043a5 | 835 | numsubs = processSubscriptions(); |
vpcola | 30:b2aed80037db | 836 | DBG("Subscribed %d topics ...\r\n", numsubs); |
vpcola | 26:4b21de8043a5 | 837 | |
vpcola | 26:4b21de8043a5 | 838 | // loop read |
vpcola | 26:4b21de8043a5 | 839 | while(true) |
vpcola | 26:4b21de8043a5 | 840 | { |
vpcola | 25:326f00faa092 | 841 | pType = readPacket(); |
vpcola | 26:4b21de8043a5 | 842 | switch(pType) |
vpcola | 26:4b21de8043a5 | 843 | { |
vpcola | 25:326f00faa092 | 844 | case TIMEOUT: |
vpcola | 25:326f00faa092 | 845 | // No data available from the network ... |
vpcola | 25:326f00faa092 | 846 | break; |
vpcola | 25:326f00faa092 | 847 | case FAILURE: |
vpcola | 26:4b21de8043a5 | 848 | { |
vpcola | 30:b2aed80037db | 849 | DBG("readPacket returned failure \r\n"); |
vpcola | 26:4b21de8043a5 | 850 | goto reconnect; |
vpcola | 26:4b21de8043a5 | 851 | } |
vpcola | 25:326f00faa092 | 852 | case BUFFER_OVERFLOW: |
vpcola | 25:326f00faa092 | 853 | { |
vpcola | 25:326f00faa092 | 854 | // TODO: Network error, do we disconnect and reconnect? |
vpcola | 30:b2aed80037db | 855 | DBG("[Thread:%d]Failure or buffer overflow problem ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 856 | MBED_ASSERT(false); |
vpcola | 25:326f00faa092 | 857 | } |
vpcola | 25:326f00faa092 | 858 | break; |
vpcola | 25:326f00faa092 | 859 | /** |
vpcola | 25:326f00faa092 | 860 | * The rest of the return codes below (all positive) is about MQTT |
vpcola | 25:326f00faa092 | 861 | * response codes |
vpcola | 25:326f00faa092 | 862 | **/ |
vpcola | 25:326f00faa092 | 863 | case CONNACK: |
vpcola | 25:326f00faa092 | 864 | case PUBACK: |
vpcola | 25:326f00faa092 | 865 | case SUBACK: |
vpcola | 25:326f00faa092 | 866 | break; |
vpcola | 25:326f00faa092 | 867 | case PUBLISH: |
vpcola | 25:326f00faa092 | 868 | { |
vpcola | 30:b2aed80037db | 869 | DBG("[Thread:%d]Publish received!....\r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 870 | // We receive data from the MQTT server .. |
vpcola | 25:326f00faa092 | 871 | if (handlePublishMsg() < 0) { |
vpcola | 30:b2aed80037db | 872 | DBG("[Thread:%d]Error handling PUBLISH message ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 873 | break; |
vpcola | 25:326f00faa092 | 874 | } |
vpcola | 25:326f00faa092 | 875 | } |
vpcola | 25:326f00faa092 | 876 | break; |
vpcola | 25:326f00faa092 | 877 | case PINGRESP: |
vpcola | 25:326f00faa092 | 878 | { |
vpcola | 30:b2aed80037db | 879 | DBG("[Thread:%d]Got ping response ...\r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 880 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 881 | } |
vpcola | 25:326f00faa092 | 882 | break; |
vpcola | 25:326f00faa092 | 883 | default: |
vpcola | 30:b2aed80037db | 884 | DBG("[Thread:%d]Unknown/Not handled message from server pType[%d]\r\n", Thread::gettid(), pType); |
vpcola | 25:326f00faa092 | 885 | } |
vpcola | 25:326f00faa092 | 886 | |
vpcola | 25:326f00faa092 | 887 | // Check if its time to send a keepAlive packet |
vpcola | 25:326f00faa092 | 888 | if (hasConnectionTimedOut()) { |
vpcola | 25:326f00faa092 | 889 | // Queue the ping request so that other |
vpcola | 25:326f00faa092 | 890 | // pending operations queued above will go first |
vpcola | 25:326f00faa092 | 891 | queue.call(this, &MQTTThreadedClient::sendPingRequest); |
vpcola | 25:326f00faa092 | 892 | } |
vpcola | 25:326f00faa092 | 893 | |
vpcola | 25:326f00faa092 | 894 | // Check if we have messages on the message queue |
vpcola | 25:326f00faa092 | 895 | osEvent evt = mqueue.get(10); |
vpcola | 25:326f00faa092 | 896 | if (evt.status == osEventMessage) { |
vpcola | 25:326f00faa092 | 897 | |
vpcola | 30:b2aed80037db | 898 | DBG("[Thread:%d]Got message to publish! ... \r\n", Thread::gettid()); |
vpcola | 25:326f00faa092 | 899 | |
vpcola | 25:326f00faa092 | 900 | // Unpack the message |
vpcola | 25:326f00faa092 | 901 | PubMessage * message = (PubMessage *)evt.value.p; |
vpcola | 25:326f00faa092 | 902 | |
vpcola | 25:326f00faa092 | 903 | // Send the packet, do not queue the call |
vpcola | 25:326f00faa092 | 904 | // like the ping above .. |
vpcola | 25:326f00faa092 | 905 | if ( sendPublish(*message) == SUCCESS) { |
vpcola | 25:326f00faa092 | 906 | // Reset timers if we have been able to send successfully |
vpcola | 25:326f00faa092 | 907 | resetConnectionTimer(); |
vpcola | 25:326f00faa092 | 908 | } else { |
vpcola | 25:326f00faa092 | 909 | // Disconnected? |
vpcola | 25:326f00faa092 | 910 | goto reconnect; |
vpcola | 25:326f00faa092 | 911 | } |
vpcola | 25:326f00faa092 | 912 | |
vpcola | 25:326f00faa092 | 913 | // Free the message from mempool after using |
vpcola | 25:326f00faa092 | 914 | mpool.free(message); |
vpcola | 25:326f00faa092 | 915 | } |
vpcola | 25:326f00faa092 | 916 | |
vpcola | 25:326f00faa092 | 917 | // Dispatch any queued events ... |
vpcola | 25:326f00faa092 | 918 | queue.dispatch(100); |
vpcola | 25:326f00faa092 | 919 | } // end while loop |
vpcola | 25:326f00faa092 | 920 | |
vpcola | 25:326f00faa092 | 921 | reconnect: |
vpcola | 26:4b21de8043a5 | 922 | // reconnect? |
vpcola | 30:b2aed80037db | 923 | DBG("Client disconnected!! ... retrying ...\r\n"); |
vpcola | 26:4b21de8043a5 | 924 | disconnect(); |
vpcola | 26:4b21de8043a5 | 925 | |
vpcola | 26:4b21de8043a5 | 926 | }; |
vpcola | 23:06fac173529e | 927 | } |
vpcola | 26:4b21de8043a5 | 928 | |
vpcola | 26:4b21de8043a5 | 929 | void MQTTThreadedClient::stopListener() |
vpcola | 26:4b21de8043a5 | 930 | { |
vpcola | 26:4b21de8043a5 | 931 | // TODO: Set a signal/flag that the running thread |
vpcola | 26:4b21de8043a5 | 932 | // will check if its ok to stop ... |
vpcola | 30:b2aed80037db | 933 | } |
vpcola | 30:b2aed80037db | 934 | |
vpcola | 26:4b21de8043a5 | 935 | } |