Removed blocking keepalive code
Dependencies: EthernetInterface mbed-rtos
Fork of niMQTT by
niMQTT.cpp@1:4faa96fa4350, 2013-08-07 (annotated)
- Committer:
- Nim65s
- Date:
- Wed Aug 07 13:33:54 2013 +0000
- Revision:
- 1:4faa96fa4350
- Parent:
- 0:d4dfed20c6ea
- Child:
- 4:afbc7b066cff
Better parameter order
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
Nim65s | 0:d4dfed20c6ea | 1 | #include "niMQTT.h" |
Nim65s | 0:d4dfed20c6ea | 2 | |
Nim65s | 1:4faa96fa4350 | 3 | niMQTT::niMQTT(char *server, void (*callback)(char*, char*), char *id, int port, char *username, char *password, bool debug): |
Nim65s | 0:d4dfed20c6ea | 4 | server(server), port(port), id(id), callback(callback), username(username), password(password), |
Nim65s | 0:d4dfed20c6ea | 5 | debug(debug), connected(true), message_id(0), thread(&niMQTT::thread_starter, this), |
Nim65s | 0:d4dfed20c6ea | 6 | waiting_new_packet(true), packet_sent(false), waiting_connack(0), waiting_suback(0), waiting_pingresp(0) { |
Nim65s | 0:d4dfed20c6ea | 7 | init(); |
Nim65s | 0:d4dfed20c6ea | 8 | } |
Nim65s | 0:d4dfed20c6ea | 9 | |
Nim65s | 0:d4dfed20c6ea | 10 | int niMQTT::init() { |
Nim65s | 0:d4dfed20c6ea | 11 | if (debug) printf("*init\r\n"); |
Nim65s | 0:d4dfed20c6ea | 12 | socket = new TCPSocketConnection; |
Nim65s | 0:d4dfed20c6ea | 13 | do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); |
Nim65s | 0:d4dfed20c6ea | 14 | socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds |
Nim65s | 0:d4dfed20c6ea | 15 | |
Nim65s | 0:d4dfed20c6ea | 16 | printf("Socket connected.\r\n"); |
Nim65s | 0:d4dfed20c6ea | 17 | |
Nim65s | 0:d4dfed20c6ea | 18 | thread.signal_set(START_THREAD); |
Nim65s | 0:d4dfed20c6ea | 19 | |
Nim65s | 0:d4dfed20c6ea | 20 | return connect(); |
Nim65s | 0:d4dfed20c6ea | 21 | } |
Nim65s | 0:d4dfed20c6ea | 22 | |
Nim65s | 0:d4dfed20c6ea | 23 | int niMQTT::send(char *packet, int size) { |
Nim65s | 0:d4dfed20c6ea | 24 | //if (debug) { |
Nim65s | 0:d4dfed20c6ea | 25 | printf("*send: "); |
Nim65s | 0:d4dfed20c6ea | 26 | for(int i=0; i<size; i++) printf("0x%x ", packet[i]); |
Nim65s | 0:d4dfed20c6ea | 27 | printf("\r\n"); |
Nim65s | 0:d4dfed20c6ea | 28 | //} |
Nim65s | 0:d4dfed20c6ea | 29 | |
Nim65s | 0:d4dfed20c6ea | 30 | int j = -1; |
Nim65s | 0:d4dfed20c6ea | 31 | do j = socket->send_all(packet, size); while (j < 0); |
Nim65s | 0:d4dfed20c6ea | 32 | |
Nim65s | 0:d4dfed20c6ea | 33 | if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size); |
Nim65s | 0:d4dfed20c6ea | 34 | else if (debug) printf("packet sent\r\n"); |
Nim65s | 0:d4dfed20c6ea | 35 | packet_sent = true; |
Nim65s | 0:d4dfed20c6ea | 36 | |
Nim65s | 0:d4dfed20c6ea | 37 | return (j == size); |
Nim65s | 0:d4dfed20c6ea | 38 | } |
Nim65s | 0:d4dfed20c6ea | 39 | |
Nim65s | 0:d4dfed20c6ea | 40 | int niMQTT::recv() { |
Nim65s | 0:d4dfed20c6ea | 41 | if (debug) printf("*recv\r\n"); |
Nim65s | 0:d4dfed20c6ea | 42 | |
Nim65s | 0:d4dfed20c6ea | 43 | int timeout = 0; |
Nim65s | 0:d4dfed20c6ea | 44 | while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1); |
Nim65s | 0:d4dfed20c6ea | 45 | if (timeout >= TIMEOUT/100) { |
Nim65s | 0:d4dfed20c6ea | 46 | printf("RECV TIMEOUT\r\n"); |
Nim65s | 0:d4dfed20c6ea | 47 | if (waiting_connack > 0) printf("CONNACK not received !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 48 | if (waiting_suback > 0) printf("SUBCONNACK not received !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 49 | if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 50 | // TODO launch connect/sub/pingrep again ? |
Nim65s | 0:d4dfed20c6ea | 51 | return -1; |
Nim65s | 0:d4dfed20c6ea | 52 | } |
Nim65s | 0:d4dfed20c6ea | 53 | |
Nim65s | 0:d4dfed20c6ea | 54 | if (debug) printf("Receiving new packet...\r\n"); |
Nim65s | 0:d4dfed20c6ea | 55 | |
Nim65s | 0:d4dfed20c6ea | 56 | char header_received; |
Nim65s | 0:d4dfed20c6ea | 57 | socket->receive(&header_received, 1); |
Nim65s | 0:d4dfed20c6ea | 58 | if (debug) printf("Received 0x%x\r\n", header_received); |
Nim65s | 0:d4dfed20c6ea | 59 | |
Nim65s | 0:d4dfed20c6ea | 60 | waiting_new_packet = false; |
Nim65s | 0:d4dfed20c6ea | 61 | bool DUP = ((header_received & 4) == 4); |
Nim65s | 0:d4dfed20c6ea | 62 | int QoS = (header_received & 6); |
Nim65s | 0:d4dfed20c6ea | 63 | bool RETAIN = ((header_received & 1) == 1); |
Nim65s | 0:d4dfed20c6ea | 64 | |
Nim65s | 0:d4dfed20c6ea | 65 | switch (header_received & 0xf0) { |
Nim65s | 0:d4dfed20c6ea | 66 | case CONNACK: connack(); break; |
Nim65s | 0:d4dfed20c6ea | 67 | case PUBLISH: publish_received(); break; |
Nim65s | 0:d4dfed20c6ea | 68 | case PUBACK: puback_received(); break; |
Nim65s | 0:d4dfed20c6ea | 69 | case SUBACK: suback(); break; |
Nim65s | 0:d4dfed20c6ea | 70 | case UNSUBACK: suback(true); break; |
Nim65s | 0:d4dfed20c6ea | 71 | case PINGRESP: pingresp(); break; |
Nim65s | 0:d4dfed20c6ea | 72 | default: waiting_new_packet = true; printf("BAD HEADER: 0x%x\r\n", header_received); return -1; |
Nim65s | 0:d4dfed20c6ea | 73 | } |
Nim65s | 0:d4dfed20c6ea | 74 | |
Nim65s | 0:d4dfed20c6ea | 75 | return 0; |
Nim65s | 0:d4dfed20c6ea | 76 | } |
Nim65s | 0:d4dfed20c6ea | 77 | |
Nim65s | 0:d4dfed20c6ea | 78 | int niMQTT::connect() { |
Nim65s | 0:d4dfed20c6ea | 79 | if (debug) printf("*connect\r\n"); |
Nim65s | 0:d4dfed20c6ea | 80 | int username_length = strlen(username); |
Nim65s | 0:d4dfed20c6ea | 81 | int password_length = strlen(password); |
Nim65s | 0:d4dfed20c6ea | 82 | int id_length = strlen(id); |
Nim65s | 0:d4dfed20c6ea | 83 | |
Nim65s | 0:d4dfed20c6ea | 84 | int use_username = (username_length != 0); |
Nim65s | 0:d4dfed20c6ea | 85 | int use_password = (password_length != 0); |
Nim65s | 0:d4dfed20c6ea | 86 | |
Nim65s | 0:d4dfed20c6ea | 87 | char variable_header[] = {0,6,77,81,73,115,100,112,3, |
Nim65s | 0:d4dfed20c6ea | 88 | use_username << 7 | use_password << 6, |
Nim65s | 0:d4dfed20c6ea | 89 | KEEP_ALIVE / 256, KEEP_ALIVE % 256 }; |
Nim65s | 0:d4dfed20c6ea | 90 | |
Nim65s | 0:d4dfed20c6ea | 91 | int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password); |
Nim65s | 0:d4dfed20c6ea | 92 | int packet_length = 2 + remaining_length; |
Nim65s | 0:d4dfed20c6ea | 93 | |
Nim65s | 0:d4dfed20c6ea | 94 | char fixed_header[] = { CONNECT, remaining_length }; |
Nim65s | 0:d4dfed20c6ea | 95 | |
Nim65s | 0:d4dfed20c6ea | 96 | char packet[packet_length]; |
Nim65s | 0:d4dfed20c6ea | 97 | memcpy(packet, fixed_header, 2); |
Nim65s | 0:d4dfed20c6ea | 98 | memcpy(packet + 2, variable_header, 12); |
Nim65s | 0:d4dfed20c6ea | 99 | |
Nim65s | 0:d4dfed20c6ea | 100 | // Adds the payload: id |
Nim65s | 0:d4dfed20c6ea | 101 | char id_size[2] = { id_length / 256, id_length % 256 }; |
Nim65s | 0:d4dfed20c6ea | 102 | memcpy(packet + 14, id_size, 2); |
Nim65s | 0:d4dfed20c6ea | 103 | memcpy(packet + 16, id, id_length); |
Nim65s | 0:d4dfed20c6ea | 104 | |
Nim65s | 0:d4dfed20c6ea | 105 | // Adds username & Password to the payload |
Nim65s | 0:d4dfed20c6ea | 106 | if (use_username) { |
Nim65s | 0:d4dfed20c6ea | 107 | char username_size[2] = { username_length / 256, username_length % 256 }; |
Nim65s | 0:d4dfed20c6ea | 108 | memcpy(packet + 16 + id_length, username_size, 2); |
Nim65s | 0:d4dfed20c6ea | 109 | memcpy(packet + 18 + id_length, username, username_length); |
Nim65s | 0:d4dfed20c6ea | 110 | } |
Nim65s | 0:d4dfed20c6ea | 111 | if (use_password) { |
Nim65s | 0:d4dfed20c6ea | 112 | char password_size[2] = { password_length / 256, password_length % 256 }; |
Nim65s | 0:d4dfed20c6ea | 113 | memcpy(packet + 18 + id_length + username_length, password_size, 2); |
Nim65s | 0:d4dfed20c6ea | 114 | memcpy(packet + 20 + id_length + username_length, password, password_length); |
Nim65s | 0:d4dfed20c6ea | 115 | } |
Nim65s | 0:d4dfed20c6ea | 116 | |
Nim65s | 0:d4dfed20c6ea | 117 | waiting_connack++; |
Nim65s | 0:d4dfed20c6ea | 118 | |
Nim65s | 0:d4dfed20c6ea | 119 | return send(packet, packet_length); |
Nim65s | 0:d4dfed20c6ea | 120 | } |
Nim65s | 0:d4dfed20c6ea | 121 | |
Nim65s | 0:d4dfed20c6ea | 122 | int niMQTT::connack() { |
Nim65s | 0:d4dfed20c6ea | 123 | if (debug) printf("CONNACK Received\r\n"); |
Nim65s | 0:d4dfed20c6ea | 124 | if (waiting_connack > 0) waiting_connack--; |
Nim65s | 0:d4dfed20c6ea | 125 | else printf("CONNACK UNEXPECTED !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 126 | |
Nim65s | 0:d4dfed20c6ea | 127 | char resp[3]; |
Nim65s | 0:d4dfed20c6ea | 128 | socket->receive(resp, 3); |
Nim65s | 0:d4dfed20c6ea | 129 | waiting_new_packet = true; |
Nim65s | 0:d4dfed20c6ea | 130 | |
Nim65s | 0:d4dfed20c6ea | 131 | if (resp[0] != 0x2) printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]); |
Nim65s | 0:d4dfed20c6ea | 132 | switch (resp[2]) { |
Nim65s | 0:d4dfed20c6ea | 133 | case 0: printf("Connection Accepted\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 134 | case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 135 | case 2: printf("Connection Refused: identifier rejected\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 136 | case 3: printf("Connection Refused: server unavailable\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 137 | case 4: printf("Connection Refused: bad user name or password\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 138 | case 5: printf("Connection Refused: not authorized\r\n"); break; |
Nim65s | 0:d4dfed20c6ea | 139 | default: printf("I have no idea what I am doing\r\n"); |
Nim65s | 0:d4dfed20c6ea | 140 | } |
Nim65s | 0:d4dfed20c6ea | 141 | |
Nim65s | 0:d4dfed20c6ea | 142 | return (resp[2] == 0); |
Nim65s | 0:d4dfed20c6ea | 143 | } |
Nim65s | 0:d4dfed20c6ea | 144 | |
Nim65s | 0:d4dfed20c6ea | 145 | int niMQTT::pub(char *topic, char *message) { |
Nim65s | 0:d4dfed20c6ea | 146 | if (debug) printf("*pub\r\n"); |
Nim65s | 0:d4dfed20c6ea | 147 | int topic_length = strlen(topic); |
Nim65s | 0:d4dfed20c6ea | 148 | int message_length = strlen(message); |
Nim65s | 0:d4dfed20c6ea | 149 | |
Nim65s | 0:d4dfed20c6ea | 150 | int remaining_length = topic_length + message_length + 2; |
Nim65s | 0:d4dfed20c6ea | 151 | int remaining_length_2 = remaining_length_length(remaining_length); |
Nim65s | 0:d4dfed20c6ea | 152 | int packet_length = 1 + remaining_length + remaining_length_2; |
Nim65s | 0:d4dfed20c6ea | 153 | |
Nim65s | 0:d4dfed20c6ea | 154 | char header = PUBLISH; |
Nim65s | 0:d4dfed20c6ea | 155 | char packet[packet_length]; |
Nim65s | 0:d4dfed20c6ea | 156 | // header |
Nim65s | 0:d4dfed20c6ea | 157 | memcpy(packet, &header, 1); |
Nim65s | 0:d4dfed20c6ea | 158 | get_remaining_length(remaining_length, packet); |
Nim65s | 0:d4dfed20c6ea | 159 | |
Nim65s | 0:d4dfed20c6ea | 160 | // variable header: topic name |
Nim65s | 0:d4dfed20c6ea | 161 | char topic_size[2] = { topic_length / 256, topic_length % 256 }; |
Nim65s | 0:d4dfed20c6ea | 162 | memcpy(packet + 1 + remaining_length_2, topic_size, 2); |
Nim65s | 0:d4dfed20c6ea | 163 | memcpy(packet + 3 + remaining_length_2, topic, topic_length); |
Nim65s | 0:d4dfed20c6ea | 164 | |
Nim65s | 0:d4dfed20c6ea | 165 | // payload: message |
Nim65s | 0:d4dfed20c6ea | 166 | memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length); |
Nim65s | 0:d4dfed20c6ea | 167 | |
Nim65s | 0:d4dfed20c6ea | 168 | return send(packet, packet_length); |
Nim65s | 0:d4dfed20c6ea | 169 | } |
Nim65s | 0:d4dfed20c6ea | 170 | |
Nim65s | 0:d4dfed20c6ea | 171 | void niMQTT::publish_received() { |
Nim65s | 0:d4dfed20c6ea | 172 | //remaining length |
Nim65s | 0:d4dfed20c6ea | 173 | int remaining_length = decode_remaining_length(); |
Nim65s | 0:d4dfed20c6ea | 174 | |
Nim65s | 0:d4dfed20c6ea | 175 | // topic |
Nim65s | 0:d4dfed20c6ea | 176 | char mqtt_utf8_length[2]; |
Nim65s | 0:d4dfed20c6ea | 177 | socket->receive(mqtt_utf8_length, 2); |
Nim65s | 0:d4dfed20c6ea | 178 | int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1]; |
Nim65s | 0:d4dfed20c6ea | 179 | |
Nim65s | 0:d4dfed20c6ea | 180 | if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length); |
Nim65s | 0:d4dfed20c6ea | 181 | |
Nim65s | 0:d4dfed20c6ea | 182 | char topic[utf8_length + 1]; |
Nim65s | 0:d4dfed20c6ea | 183 | socket->receive(topic, utf8_length); |
Nim65s | 0:d4dfed20c6ea | 184 | topic[utf8_length] = 0; |
Nim65s | 0:d4dfed20c6ea | 185 | |
Nim65s | 0:d4dfed20c6ea | 186 | // payload |
Nim65s | 0:d4dfed20c6ea | 187 | int message_length = remaining_length - utf8_length - 2; |
Nim65s | 0:d4dfed20c6ea | 188 | char message[message_length + 1]; |
Nim65s | 0:d4dfed20c6ea | 189 | socket->receive(message, message_length); |
Nim65s | 0:d4dfed20c6ea | 190 | message[message_length] = 0; |
Nim65s | 0:d4dfed20c6ea | 191 | |
Nim65s | 0:d4dfed20c6ea | 192 | waiting_new_packet = true; |
Nim65s | 0:d4dfed20c6ea | 193 | |
Nim65s | 0:d4dfed20c6ea | 194 | callback(topic, message); |
Nim65s | 0:d4dfed20c6ea | 195 | } |
Nim65s | 0:d4dfed20c6ea | 196 | |
Nim65s | 0:d4dfed20c6ea | 197 | int niMQTT::puback() { |
Nim65s | 0:d4dfed20c6ea | 198 | if (debug) printf("*puback\r\n"); |
Nim65s | 0:d4dfed20c6ea | 199 | char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 }; |
Nim65s | 0:d4dfed20c6ea | 200 | return send(fixed_header, 4); |
Nim65s | 0:d4dfed20c6ea | 201 | } |
Nim65s | 0:d4dfed20c6ea | 202 | |
Nim65s | 0:d4dfed20c6ea | 203 | int niMQTT::puback_received() { |
Nim65s | 0:d4dfed20c6ea | 204 | waiting_new_packet = true; |
Nim65s | 0:d4dfed20c6ea | 205 | return 0; // TODO |
Nim65s | 0:d4dfed20c6ea | 206 | } |
Nim65s | 0:d4dfed20c6ea | 207 | |
Nim65s | 0:d4dfed20c6ea | 208 | int niMQTT::sub(char *topic, bool unsub) { |
Nim65s | 0:d4dfed20c6ea | 209 | if (debug) printf("*sub\r\n"); |
Nim65s | 0:d4dfed20c6ea | 210 | char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE; |
Nim65s | 0:d4dfed20c6ea | 211 | int topic_length = strlen(topic); |
Nim65s | 0:d4dfed20c6ea | 212 | |
Nim65s | 0:d4dfed20c6ea | 213 | int remaining_length = topic_length + 5; |
Nim65s | 0:d4dfed20c6ea | 214 | int remaining_length_2 = remaining_length_length(remaining_length); |
Nim65s | 0:d4dfed20c6ea | 215 | int packet_length = 1 + remaining_length + remaining_length_2; |
Nim65s | 0:d4dfed20c6ea | 216 | |
Nim65s | 0:d4dfed20c6ea | 217 | char header = command | LEAST_ONCE; |
Nim65s | 0:d4dfed20c6ea | 218 | char packet[packet_length]; |
Nim65s | 0:d4dfed20c6ea | 219 | // header |
Nim65s | 0:d4dfed20c6ea | 220 | memcpy(packet, &header, 1); |
Nim65s | 0:d4dfed20c6ea | 221 | get_remaining_length(remaining_length, packet); |
Nim65s | 0:d4dfed20c6ea | 222 | |
Nim65s | 0:d4dfed20c6ea | 223 | // variable header: message identifier |
Nim65s | 0:d4dfed20c6ea | 224 | message_id++; |
Nim65s | 0:d4dfed20c6ea | 225 | char variable_header[] = { message_id / 256, message_id % 256 }; |
Nim65s | 0:d4dfed20c6ea | 226 | memcpy(packet + 1 + remaining_length_2, variable_header, 2); |
Nim65s | 0:d4dfed20c6ea | 227 | |
Nim65s | 0:d4dfed20c6ea | 228 | // payload: topic name & requested QoS |
Nim65s | 0:d4dfed20c6ea | 229 | char topic_size[2] = { topic_length / 256, topic_length % 256 }; |
Nim65s | 0:d4dfed20c6ea | 230 | char requested_qos = MOST_ONCE; |
Nim65s | 0:d4dfed20c6ea | 231 | memcpy(packet + 3 + remaining_length_2, topic_size, 2); |
Nim65s | 0:d4dfed20c6ea | 232 | memcpy(packet + 5 + remaining_length_2, topic, topic_length); |
Nim65s | 0:d4dfed20c6ea | 233 | memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1); |
Nim65s | 0:d4dfed20c6ea | 234 | |
Nim65s | 0:d4dfed20c6ea | 235 | waiting_suback++; |
Nim65s | 0:d4dfed20c6ea | 236 | |
Nim65s | 0:d4dfed20c6ea | 237 | return send(packet, packet_length); |
Nim65s | 0:d4dfed20c6ea | 238 | } |
Nim65s | 0:d4dfed20c6ea | 239 | |
Nim65s | 0:d4dfed20c6ea | 240 | int niMQTT::suback(bool unsub) { |
Nim65s | 0:d4dfed20c6ea | 241 | if (debug) printf("SUBACK received\r\n"); |
Nim65s | 0:d4dfed20c6ea | 242 | if (waiting_suback > 0) waiting_suback--; |
Nim65s | 0:d4dfed20c6ea | 243 | else printf("SUBACK UNEXPECTED !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 244 | |
Nim65s | 0:d4dfed20c6ea | 245 | char command = (unsub) ? UNSUBACK : SUBACK; // TODO |
Nim65s | 0:d4dfed20c6ea | 246 | |
Nim65s | 0:d4dfed20c6ea | 247 | int remaining_length = decode_remaining_length(); |
Nim65s | 0:d4dfed20c6ea | 248 | |
Nim65s | 0:d4dfed20c6ea | 249 | // Variable Header |
Nim65s | 0:d4dfed20c6ea | 250 | char var_resp[remaining_length]; |
Nim65s | 0:d4dfed20c6ea | 251 | socket->receive(var_resp, remaining_length); |
Nim65s | 0:d4dfed20c6ea | 252 | waiting_new_packet = true; |
Nim65s | 0:d4dfed20c6ea | 253 | if (debug) { |
Nim65s | 0:d4dfed20c6ea | 254 | printf("suback: "); |
Nim65s | 0:d4dfed20c6ea | 255 | for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]); |
Nim65s | 0:d4dfed20c6ea | 256 | printf("\r\n"); |
Nim65s | 0:d4dfed20c6ea | 257 | } |
Nim65s | 0:d4dfed20c6ea | 258 | |
Nim65s | 0:d4dfed20c6ea | 259 | if (var_resp[0] * 256 + var_resp[1] != message_id) { |
Nim65s | 0:d4dfed20c6ea | 260 | printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id); |
Nim65s | 0:d4dfed20c6ea | 261 | } |
Nim65s | 0:d4dfed20c6ea | 262 | |
Nim65s | 0:d4dfed20c6ea | 263 | // here we should do things about the QoS if /unsuback, but let's say it's 0. |
Nim65s | 0:d4dfed20c6ea | 264 | |
Nim65s | 0:d4dfed20c6ea | 265 | return (var_resp[0] * 256 + var_resp[1] == message_id); |
Nim65s | 0:d4dfed20c6ea | 266 | } |
Nim65s | 0:d4dfed20c6ea | 267 | |
Nim65s | 0:d4dfed20c6ea | 268 | int niMQTT::pingreq () { |
Nim65s | 0:d4dfed20c6ea | 269 | if (debug) printf("*pingreq\r\n"); |
Nim65s | 0:d4dfed20c6ea | 270 | char fixed_header[] = { PINGREQ, 0 }; |
Nim65s | 0:d4dfed20c6ea | 271 | waiting_pingresp++; |
Nim65s | 0:d4dfed20c6ea | 272 | return send(fixed_header, 2); |
Nim65s | 0:d4dfed20c6ea | 273 | } |
Nim65s | 0:d4dfed20c6ea | 274 | |
Nim65s | 0:d4dfed20c6ea | 275 | int niMQTT::pingresp() { |
Nim65s | 0:d4dfed20c6ea | 276 | if (debug) printf("PINGRESP Received\r\n"); |
Nim65s | 0:d4dfed20c6ea | 277 | if (waiting_pingresp > 0) waiting_pingresp--; |
Nim65s | 0:d4dfed20c6ea | 278 | else printf("PINGRESP Unexpected !\r\n"); |
Nim65s | 0:d4dfed20c6ea | 279 | |
Nim65s | 0:d4dfed20c6ea | 280 | char resp; |
Nim65s | 0:d4dfed20c6ea | 281 | socket->receive(&resp, 1); |
Nim65s | 0:d4dfed20c6ea | 282 | waiting_new_packet = true; |
Nim65s | 0:d4dfed20c6ea | 283 | |
Nim65s | 0:d4dfed20c6ea | 284 | if (resp != 0) printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp); |
Nim65s | 0:d4dfed20c6ea | 285 | |
Nim65s | 0:d4dfed20c6ea | 286 | return (resp == 0); |
Nim65s | 0:d4dfed20c6ea | 287 | } |
Nim65s | 0:d4dfed20c6ea | 288 | |
Nim65s | 0:d4dfed20c6ea | 289 | int niMQTT::disconnect() { |
Nim65s | 0:d4dfed20c6ea | 290 | if (debug) printf("*disconnect\r\n"); |
Nim65s | 0:d4dfed20c6ea | 291 | char fixed_header[] = { DISCONNECT, 0 }; |
Nim65s | 0:d4dfed20c6ea | 292 | return send(fixed_header, 2); |
Nim65s | 0:d4dfed20c6ea | 293 | } |
Nim65s | 0:d4dfed20c6ea | 294 | |
Nim65s | 0:d4dfed20c6ea | 295 | niMQTT::~niMQTT() { |
Nim65s | 0:d4dfed20c6ea | 296 | if (debug) printf("*~niMQTT()\r\n"); |
Nim65s | 0:d4dfed20c6ea | 297 | connected = false; |
Nim65s | 0:d4dfed20c6ea | 298 | disconnect(); |
Nim65s | 0:d4dfed20c6ea | 299 | socket->close(); |
Nim65s | 0:d4dfed20c6ea | 300 | delete socket; |
Nim65s | 0:d4dfed20c6ea | 301 | } |
Nim65s | 0:d4dfed20c6ea | 302 | |
Nim65s | 0:d4dfed20c6ea | 303 | void niMQTT::thread_starter(void const *p) { |
Nim65s | 0:d4dfed20c6ea | 304 | niMQTT *instance = (niMQTT*)p; |
Nim65s | 0:d4dfed20c6ea | 305 | instance->thread_worker(); |
Nim65s | 0:d4dfed20c6ea | 306 | } |
Nim65s | 0:d4dfed20c6ea | 307 | |
Nim65s | 0:d4dfed20c6ea | 308 | void niMQTT::thread_worker() { |
Nim65s | 0:d4dfed20c6ea | 309 | if (debug) printf("*thread_worker\r\n"); |
Nim65s | 0:d4dfed20c6ea | 310 | thread.signal_wait(START_THREAD); |
Nim65s | 0:d4dfed20c6ea | 311 | while (connected) { |
Nim65s | 0:d4dfed20c6ea | 312 | if (debug) printf("New loop in thread worker\r\n"); |
Nim65s | 0:d4dfed20c6ea | 313 | recv(); |
Nim65s | 0:d4dfed20c6ea | 314 | Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds |
Nim65s | 0:d4dfed20c6ea | 315 | if (!packet_sent) pingreq(); |
Nim65s | 0:d4dfed20c6ea | 316 | packet_sent = false; |
Nim65s | 0:d4dfed20c6ea | 317 | } |
Nim65s | 0:d4dfed20c6ea | 318 | } |
Nim65s | 0:d4dfed20c6ea | 319 | |
Nim65s | 0:d4dfed20c6ea | 320 | void niMQTT::get_remaining_length(int remaining_length, char *packet) { |
Nim65s | 0:d4dfed20c6ea | 321 | int X = remaining_length; |
Nim65s | 0:d4dfed20c6ea | 322 | int n = 1; |
Nim65s | 0:d4dfed20c6ea | 323 | char digit; |
Nim65s | 0:d4dfed20c6ea | 324 | do { |
Nim65s | 0:d4dfed20c6ea | 325 | digit = X % 0x80; |
Nim65s | 0:d4dfed20c6ea | 326 | X /= 0x80; |
Nim65s | 0:d4dfed20c6ea | 327 | if (X > 0) digit |= 0x80; |
Nim65s | 0:d4dfed20c6ea | 328 | memcpy(packet + n, &digit, 1); |
Nim65s | 0:d4dfed20c6ea | 329 | n++; |
Nim65s | 0:d4dfed20c6ea | 330 | } while (X > 0); |
Nim65s | 0:d4dfed20c6ea | 331 | } |
Nim65s | 0:d4dfed20c6ea | 332 | |
Nim65s | 0:d4dfed20c6ea | 333 | int niMQTT::decode_remaining_length() { |
Nim65s | 0:d4dfed20c6ea | 334 | int multiplier = 1; |
Nim65s | 0:d4dfed20c6ea | 335 | int value = 0; |
Nim65s | 0:d4dfed20c6ea | 336 | char digit = 0; |
Nim65s | 0:d4dfed20c6ea | 337 | do { |
Nim65s | 0:d4dfed20c6ea | 338 | while (socket->receive(&digit, 1) < 0) wait(0.1); |
Nim65s | 0:d4dfed20c6ea | 339 | value += (digit & 127) * multiplier; |
Nim65s | 0:d4dfed20c6ea | 340 | multiplier *= 128; |
Nim65s | 0:d4dfed20c6ea | 341 | } while ((digit & 0x80) != 0); |
Nim65s | 0:d4dfed20c6ea | 342 | return value; |
Nim65s | 0:d4dfed20c6ea | 343 | } |
Nim65s | 0:d4dfed20c6ea | 344 | |
Nim65s | 0:d4dfed20c6ea | 345 | int niMQTT::remaining_length_length(int remaining_length) { |
Nim65s | 0:d4dfed20c6ea | 346 | int X = remaining_length; |
Nim65s | 0:d4dfed20c6ea | 347 | int rll = 0; |
Nim65s | 0:d4dfed20c6ea | 348 | do { |
Nim65s | 0:d4dfed20c6ea | 349 | rll++; |
Nim65s | 0:d4dfed20c6ea | 350 | X /= 0x80; |
Nim65s | 0:d4dfed20c6ea | 351 | } while (X > 0); |
Nim65s | 0:d4dfed20c6ea | 352 | return rll; |
Nim65s | 0:d4dfed20c6ea | 353 | } |