niMQTT
Dependencies: EthernetInterface mbed-rtos
Fork of niMQTT by
niMQTT.cpp
00001 #include "niMQTT.h" 00002 00003 niMQTT::niMQTT(char *server, void (*callback)(const char*, const char*), 00004 char *id, int port, char *username, char *password, bool debug): 00005 server(server), port(port), id(id), callback(callback), username(username), 00006 password(password), 00007 debug(debug), connected(true), message_id(0), 00008 ping_thread(&niMQTT::ping_thread_starter, this), 00009 recv_thread(&niMQTT::recv_thread_starter, this), 00010 waiting_new_packet(true), packet_sent(false), waiting_connack(0), 00011 waiting_suback(0), waiting_pingresp(0) 00012 { 00013 init(); 00014 } 00015 00016 int niMQTT::init() { 00017 if (debug) printf("*init\r\n"); 00018 socket = new TCPSocketConnection; 00019 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); 00020 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds 00021 00022 printf("Socket connected.\r\n"); 00023 00024 ping_thread.signal_set(START_THREAD); 00025 recv_thread.signal_set(START_THREAD); 00026 00027 return connect(); 00028 } 00029 00030 int niMQTT::send(char *packet, int size) { 00031 //if (debug) { 00032 printf("*send: "); 00033 for(int i=0; i<size; i++) printf("0x%x ", packet[i]); 00034 printf("\r\n"); 00035 //} 00036 00037 int j = -1; 00038 do j = socket->send_all(packet, size); while (j < 0); 00039 00040 if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size); 00041 else if (debug) printf("packet sent\r\n"); 00042 packet_sent = true; 00043 00044 return (j == size); 00045 } 00046 00047 int niMQTT::recv() { 00048 if (debug) printf("*recv\r\n"); 00049 00050 int timeout = 0; 00051 while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1); 00052 if (timeout >= TIMEOUT/100) { 00053 printf("RECV TIMEOUT\r\n"); 00054 if (waiting_connack > 0) printf("CONNACK not received !\r\n"); 00055 if (waiting_suback > 0) printf("SUBACK not received !\r\n"); 00056 if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n"); 00057 reconnect(); 00058 return -1; 00059 } 00060 00061 if (debug) printf("Receiving new packet...\r\n"); 00062 00063 char header_received; 00064 socket->receive(&header_received, 1); 00065 if (debug) printf("Received 0x%x\r\n", header_received); 00066 00067 waiting_new_packet = false; 00068 //bool DUP = ((header_received & 4) == 4); 00069 //int QoS = (header_received & 6); 00070 //bool RETAIN = ((header_received & 1) == 1); 00071 00072 switch (header_received & 0xf0) { 00073 case CONNACK: connack(); break; 00074 case PUBLISH: publish_received(); break; 00075 case PUBACK: puback_received(); break; 00076 case SUBACK: suback(); break; 00077 case UNSUBACK: suback(true); break; 00078 case PINGRESP: pingresp(); break; 00079 default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1; 00080 } 00081 00082 return 0; 00083 } 00084 00085 int niMQTT::connect() { 00086 if (debug) printf("*connect\r\n"); 00087 int username_length = strlen(username); 00088 int password_length = strlen(password); 00089 int id_length = strlen(id); 00090 00091 int use_username = (username_length != 0); 00092 int use_password = (password_length != 0); 00093 00094 char variable_header[] = {0,6,77,81,73,115,100,112,3, 00095 use_username << 7 | use_password << 6, 00096 KEEP_ALIVE / 256, KEEP_ALIVE % 256 }; 00097 00098 int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password); 00099 int packet_length = 2 + remaining_length; 00100 00101 char fixed_header[] = { CONNECT, remaining_length }; 00102 00103 char packet[packet_length]; 00104 memcpy(packet, fixed_header, 2); 00105 memcpy(packet + 2, variable_header, 12); 00106 00107 // Adds the payload: id 00108 char id_size[2] = { id_length / 256, id_length % 256 }; 00109 memcpy(packet + 14, id_size, 2); 00110 memcpy(packet + 16, id, id_length); 00111 00112 // Adds username & Password to the payload 00113 if (use_username) { 00114 char username_size[2] = { username_length / 256, username_length % 256 }; 00115 memcpy(packet + 16 + id_length, username_size, 2); 00116 memcpy(packet + 18 + id_length, username, username_length); 00117 } 00118 if (use_password) { 00119 char password_size[2] = { password_length / 256, password_length % 256 }; 00120 memcpy(packet + 18 + id_length + username_length, password_size, 2); 00121 memcpy(packet + 20 + id_length + username_length, password, password_length); 00122 } 00123 00124 waiting_connack++; 00125 00126 return send(packet, packet_length); 00127 } 00128 00129 int niMQTT::connack() { 00130 if (debug) printf("CONNACK Received\r\n"); 00131 if (waiting_connack > 0) waiting_connack--; 00132 else { 00133 printf("CONNACK UNEXPECTED !\r\n"); 00134 reconnect(); 00135 return -2; 00136 } 00137 00138 char resp[3]; 00139 socket->receive(resp, 3); 00140 waiting_new_packet = true; 00141 00142 if (resp[0] != 0x2) { 00143 printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]); 00144 reconnect(); 00145 return -2; 00146 } 00147 switch (resp[2]) { 00148 case 0: printf("Connection Accepted\r\n"); break; 00149 case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break; 00150 case 2: printf("Connection Refused: identifier rejected\r\n"); break; 00151 case 3: printf("Connection Refused: server unavailable\r\n"); break; 00152 case 4: printf("Connection Refused: bad user name or password\r\n"); break; 00153 case 5: printf("Connection Refused: not authorized\r\n"); break; 00154 default: printf("I have no idea what I am doing\r\n"); 00155 } 00156 00157 return (resp[2] == 0); 00158 } 00159 00160 int niMQTT::pub(char *topic, char *message) { 00161 if (debug) printf("*pub\r\n"); 00162 int topic_length = strlen(topic); 00163 int message_length = strlen(message); 00164 00165 int remaining_length = topic_length + message_length + 2; 00166 int remaining_length_2 = remaining_length_length(remaining_length); 00167 int packet_length = 1 + remaining_length + remaining_length_2; 00168 00169 char header = PUBLISH; 00170 char packet[packet_length]; 00171 // header 00172 memcpy(packet, &header, 1); 00173 get_remaining_length(remaining_length, packet); 00174 00175 // variable header: topic name 00176 char topic_size[2] = { topic_length / 256, topic_length % 256 }; 00177 memcpy(packet + 1 + remaining_length_2, topic_size, 2); 00178 memcpy(packet + 3 + remaining_length_2, topic, topic_length); 00179 00180 // payload: message 00181 memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length); 00182 00183 return send(packet, packet_length); 00184 } 00185 00186 void niMQTT::publish_received() { 00187 //remaining length 00188 int remaining_length = decode_remaining_length(); 00189 00190 // topic 00191 char mqtt_utf8_length[2]; 00192 socket->receive(mqtt_utf8_length, 2); 00193 int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1]; 00194 00195 if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length); 00196 00197 char topic[utf8_length + 1]; 00198 socket->receive(topic, utf8_length); 00199 topic[utf8_length] = 0; 00200 00201 // payload 00202 int message_length = remaining_length - utf8_length - 2; 00203 char message[message_length + 1]; 00204 socket->receive(message, message_length); 00205 message[message_length] = 0; 00206 00207 waiting_new_packet = true; 00208 00209 call_callback(topic, message); 00210 } 00211 00212 int niMQTT::puback() { 00213 if (debug) printf("*puback\r\n"); 00214 char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 }; 00215 return send(fixed_header, 4); 00216 } 00217 00218 int niMQTT::puback_received() { 00219 waiting_new_packet = true; 00220 return 0; // TODO 00221 } 00222 00223 int niMQTT::sub(char *topic, bool unsub) { 00224 if (debug) printf("*sub\r\n"); 00225 char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE; 00226 int topic_length = strlen(topic); 00227 00228 int remaining_length = topic_length + 5; 00229 int remaining_length_2 = remaining_length_length(remaining_length); 00230 int packet_length = 1 + remaining_length + remaining_length_2; 00231 00232 char header = command | LEAST_ONCE; 00233 char packet[packet_length]; 00234 // header 00235 memcpy(packet, &header, 1); 00236 get_remaining_length(remaining_length, packet); 00237 00238 // variable header: message identifier 00239 message_id++; 00240 char variable_header[] = { message_id / 256, message_id % 256 }; 00241 memcpy(packet + 1 + remaining_length_2, variable_header, 2); 00242 00243 // payload: topic name & requested QoS 00244 char topic_size[2] = { topic_length / 256, topic_length % 256 }; 00245 char requested_qos = MOST_ONCE; 00246 memcpy(packet + 3 + remaining_length_2, topic_size, 2); 00247 memcpy(packet + 5 + remaining_length_2, topic, topic_length); 00248 memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1); 00249 00250 waiting_suback++; 00251 00252 return send(packet, packet_length); 00253 } 00254 00255 int niMQTT::suback(bool unsub) { 00256 if (debug) printf("SUBACK received\r\n"); 00257 if (waiting_suback > 0) waiting_suback--; 00258 else { 00259 printf("SUBACK UNEXPECTED !\r\n"); 00260 reconnect(); 00261 return -2; 00262 } 00263 00264 //char command = (unsub) ? UNSUBACK : SUBACK; 00265 00266 int remaining_length = decode_remaining_length(); 00267 00268 // Variable Header 00269 char var_resp[remaining_length]; 00270 socket->receive(var_resp, remaining_length); 00271 waiting_new_packet = true; 00272 if (debug) { 00273 printf("suback: "); 00274 for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]); 00275 printf("\r\n"); 00276 } 00277 00278 if (var_resp[0] * 256 + var_resp[1] != message_id) { 00279 printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id); 00280 } 00281 00282 // here we should do things about the QoS if /unsuback, but let's say it's 0. 00283 00284 return (var_resp[0] * 256 + var_resp[1] == message_id); 00285 } 00286 00287 int niMQTT::pingreq () { 00288 if (debug) printf("*pingreq\r\n"); 00289 char fixed_header[] = { PINGREQ, 0 }; 00290 waiting_pingresp++; 00291 return send(fixed_header, 2); 00292 } 00293 00294 int niMQTT::pingresp() { 00295 if (debug) printf("PINGRESP Received\r\n"); 00296 if (waiting_pingresp > 0) waiting_pingresp--; 00297 else { 00298 printf("PINGRESP Unexpected !\r\n"); 00299 reconnect(); 00300 return -2; 00301 } 00302 00303 char resp; 00304 socket->receive(&resp, 1); 00305 waiting_new_packet = true; 00306 00307 if (resp != 0) { 00308 printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp); 00309 reconnect(); 00310 return -2; 00311 } 00312 00313 return (resp == 0); 00314 } 00315 00316 int niMQTT::disconnect() { 00317 if (debug) printf("*disconnect\r\n"); 00318 char fixed_header[] = { DISCONNECT, 0 }; 00319 return send(fixed_header, 2); 00320 } 00321 00322 niMQTT::~niMQTT() { 00323 if (debug) printf("*~niMQTT()\r\n"); 00324 connected = false; 00325 disconnect(); 00326 socket->close(); 00327 delete socket; 00328 } 00329 00330 void niMQTT::reconnect() { 00331 if (debug) printf("Reconnecting...\r\n"); 00332 disconnect(); 00333 socket->close(); 00334 00335 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); 00336 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds 00337 00338 connect(); 00339 } 00340 00341 void niMQTT::ping_thread_starter(void const *p) { 00342 niMQTT *instance = (niMQTT*)p; 00343 instance->ping_thread_worker(); 00344 } 00345 00346 void niMQTT::recv_thread_starter(void const *p) { 00347 niMQTT *instance = (niMQTT*)p; 00348 instance->recv_thread_worker(); 00349 } 00350 00351 void niMQTT::ping_thread_worker() { 00352 if (debug) printf("*ping_thread_worker\r\n"); 00353 ping_thread.signal_wait(START_THREAD); 00354 while (connected) { 00355 if (debug) printf("New loop in ping thread worker\r\n"); 00356 Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds 00357 if (!packet_sent) pingreq(); 00358 packet_sent = false; 00359 } 00360 } 00361 00362 void niMQTT::recv_thread_worker() { 00363 if (debug) printf("*recv_thread_worker\r\n"); 00364 recv_thread.signal_wait(START_THREAD); 00365 while (connected) { 00366 if (debug) printf("New loop in recv thread worker\r\n"); 00367 recv(); 00368 packet_sent = false; 00369 } 00370 } 00371 00372 void niMQTT::get_remaining_length(int remaining_length, char *packet) { 00373 int X = remaining_length; 00374 int n = 1; 00375 char digit; 00376 do { 00377 digit = X % 0x80; 00378 X /= 0x80; 00379 if (X > 0) digit |= 0x80; 00380 memcpy(packet + n, &digit, 1); 00381 n++; 00382 } while (X > 0); 00383 } 00384 00385 int niMQTT::decode_remaining_length() { 00386 int multiplier = 1; 00387 int value = 0; 00388 char digit = 0; 00389 do { 00390 while (socket->receive(&digit, 1) < 0) wait(0.1); 00391 value += (digit & 127) * multiplier; 00392 multiplier *= 128; 00393 } while ((digit & 0x80) != 0); 00394 return value; 00395 } 00396 00397 int niMQTT::remaining_length_length(int remaining_length) { 00398 int X = remaining_length; 00399 int rll = 0; 00400 do { 00401 rll++; 00402 X /= 0x80; 00403 } while (X > 0); 00404 return rll; 00405 } 00406 00407 void niMQTT::call_callback(const char *topic, const char *message) { 00408 callback(topic, message); 00409 }
Generated on Fri Jul 15 2022 11:03:03 by 1.7.2