Version of niMQTT library which includes separate process for pings and ability to publish retained messages.
Dependencies: EthernetInterface mbed-rtos mbed
Fork of niMQTT by
Diff: niMQTT.cpp
- Revision:
- 4:afbc7b066cff
- Parent:
- 1:4faa96fa4350
- Child:
- 5:3d21020a2826
--- a/niMQTT.cpp Mon Aug 12 09:46:28 2013 +0000 +++ b/niMQTT.cpp Mon Aug 12 15:30:04 2013 +0000 @@ -47,7 +47,7 @@ if (waiting_connack > 0) printf("CONNACK not received !\r\n"); if (waiting_suback > 0) printf("SUBCONNACK not received !\r\n"); if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n"); - // TODO launch connect/sub/pingrep again ? + reconnect(); return -1; } @@ -69,7 +69,7 @@ case SUBACK: suback(); break; case UNSUBACK: suback(true); break; case PINGRESP: pingresp(); break; - default: waiting_new_packet = true; printf("BAD HEADER: 0x%x\r\n", header_received); return -1; + default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1; } return 0; @@ -122,13 +122,21 @@ int niMQTT::connack() { if (debug) printf("CONNACK Received\r\n"); if (waiting_connack > 0) waiting_connack--; - else printf("CONNACK UNEXPECTED !\r\n"); + else { + printf("CONNACK UNEXPECTED !\r\n"); + reconnect(); + return -2; + } char resp[3]; socket->receive(resp, 3); waiting_new_packet = true; - if (resp[0] != 0x2) printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]); + if (resp[0] != 0x2) { + printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]); + reconnect(); + return -2; + } switch (resp[2]) { case 0: printf("Connection Accepted\r\n"); break; case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break; @@ -240,7 +248,11 @@ int niMQTT::suback(bool unsub) { if (debug) printf("SUBACK received\r\n"); if (waiting_suback > 0) waiting_suback--; - else printf("SUBACK UNEXPECTED !\r\n"); + else { + printf("SUBACK UNEXPECTED !\r\n"); + reconnect(); + return -2; + } char command = (unsub) ? UNSUBACK : SUBACK; // TODO @@ -275,13 +287,21 @@ int niMQTT::pingresp() { if (debug) printf("PINGRESP Received\r\n"); if (waiting_pingresp > 0) waiting_pingresp--; - else printf("PINGRESP Unexpected !\r\n"); + else { + printf("PINGRESP Unexpected !\r\n"); + reconnect(); + return -2; + } char resp; socket->receive(&resp, 1); waiting_new_packet = true; - if (resp != 0) printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp); + if (resp != 0) { + printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp); + reconnect(); + return -2; + } return (resp == 0); } @@ -300,6 +320,17 @@ delete socket; } +void niMQTT::reconnect() { + if (debug) printf("Reconnecting...\r\n"); + disconnect(); + socket->close(); + + do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); + socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds + + connect(); +} + void niMQTT::thread_starter(void const *p) { niMQTT *instance = (niMQTT*)p; instance->thread_worker();