An MQTT Client for the new etherNet Interface.
Dependencies: EthernetInterface mbed-rtos
Dependents: AV_MQTT niMQTT_example
niMQTT.cpp
00001 #include "niMQTT.h" 00002 00003 niMQTT::niMQTT(char *server, void (*callback)(const char*, const char*), char *id, int port, char *username, char *password, bool debug): 00004 server(server), port(port), id(id), callback(callback), username(username), password(password), 00005 debug(debug), connected(true), message_id(0), thread(&niMQTT::thread_starter, this), 00006 waiting_new_packet(true), packet_sent(false), waiting_connack(0), waiting_suback(0), waiting_pingresp(0) { 00007 init(); 00008 } 00009 00010 int niMQTT::init() { 00011 if (debug) printf("*init\r\n"); 00012 socket = new TCPSocketConnection; 00013 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); 00014 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds 00015 00016 printf("Socket connected.\r\n"); 00017 00018 thread.signal_set(START_THREAD); 00019 00020 return connect(); 00021 } 00022 00023 int niMQTT::send(char *packet, int size) { 00024 //if (debug) { 00025 printf("*send: "); 00026 for(int i=0; i<size; i++) printf("0x%x ", packet[i]); 00027 printf("\r\n"); 00028 //} 00029 00030 int j = -1; 00031 do j = socket->send_all(packet, size); while (j < 0); 00032 00033 if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size); 00034 else if (debug) printf("packet sent\r\n"); 00035 packet_sent = true; 00036 00037 return (j == size); 00038 } 00039 00040 int niMQTT::recv() { 00041 if (debug) printf("*recv\r\n"); 00042 00043 int timeout = 0; 00044 while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1); 00045 if (timeout >= TIMEOUT/100) { 00046 printf("RECV TIMEOUT\r\n"); 00047 if (waiting_connack > 0) printf("CONNACK not received !\r\n"); 00048 if (waiting_suback > 0) printf("SUBACK not received !\r\n"); 00049 if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n"); 00050 reconnect(); 00051 return -1; 00052 } 00053 00054 if (debug) printf("Receiving new packet...\r\n"); 00055 00056 char header_received; 00057 socket->receive(&header_received, 1); 00058 if (debug) printf("Received 0x%x\r\n", header_received); 00059 00060 waiting_new_packet = false; 00061 //bool DUP = ((header_received & 4) == 4); 00062 //int QoS = (header_received & 6); 00063 //bool RETAIN = ((header_received & 1) == 1); 00064 00065 switch (header_received & 0xf0) { 00066 case CONNACK: connack(); break; 00067 case PUBLISH: publish_received(); break; 00068 case PUBACK: puback_received(); break; 00069 case SUBACK: suback(); break; 00070 case UNSUBACK: suback(true); break; 00071 case PINGRESP: pingresp(); break; 00072 default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1; 00073 } 00074 00075 return 0; 00076 } 00077 00078 int niMQTT::connect() { 00079 if (debug) printf("*connect\r\n"); 00080 int username_length = strlen(username); 00081 int password_length = strlen(password); 00082 int id_length = strlen(id); 00083 00084 int use_username = (username_length != 0); 00085 int use_password = (password_length != 0); 00086 00087 char variable_header[] = {0,6,77,81,73,115,100,112,3, 00088 use_username << 7 | use_password << 6, 00089 KEEP_ALIVE / 256, KEEP_ALIVE % 256 }; 00090 00091 int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password); 00092 int packet_length = 2 + remaining_length; 00093 00094 char fixed_header[] = { CONNECT, remaining_length }; 00095 00096 char packet[packet_length]; 00097 memcpy(packet, fixed_header, 2); 00098 memcpy(packet + 2, variable_header, 12); 00099 00100 // Adds the payload: id 00101 char id_size[2] = { id_length / 256, id_length % 256 }; 00102 memcpy(packet + 14, id_size, 2); 00103 memcpy(packet + 16, id, id_length); 00104 00105 // Adds username & Password to the payload 00106 if (use_username) { 00107 char username_size[2] = { username_length / 256, username_length % 256 }; 00108 memcpy(packet + 16 + id_length, username_size, 2); 00109 memcpy(packet + 18 + id_length, username, username_length); 00110 } 00111 if (use_password) { 00112 char password_size[2] = { password_length / 256, password_length % 256 }; 00113 memcpy(packet + 18 + id_length + username_length, password_size, 2); 00114 memcpy(packet + 20 + id_length + username_length, password, password_length); 00115 } 00116 00117 waiting_connack++; 00118 00119 return send(packet, packet_length); 00120 } 00121 00122 int niMQTT::connack() { 00123 if (debug) printf("CONNACK Received\r\n"); 00124 if (waiting_connack > 0) waiting_connack--; 00125 else { 00126 printf("CONNACK UNEXPECTED !\r\n"); 00127 reconnect(); 00128 return -2; 00129 } 00130 00131 char resp[3]; 00132 socket->receive(resp, 3); 00133 waiting_new_packet = true; 00134 00135 if (resp[0] != 0x2) { 00136 printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]); 00137 reconnect(); 00138 return -2; 00139 } 00140 switch (resp[2]) { 00141 case 0: printf("Connection Accepted\r\n"); break; 00142 case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break; 00143 case 2: printf("Connection Refused: identifier rejected\r\n"); break; 00144 case 3: printf("Connection Refused: server unavailable\r\n"); break; 00145 case 4: printf("Connection Refused: bad user name or password\r\n"); break; 00146 case 5: printf("Connection Refused: not authorized\r\n"); break; 00147 default: printf("I have no idea what I am doing\r\n"); 00148 } 00149 00150 return (resp[2] == 0); 00151 } 00152 00153 int niMQTT::pub(char *topic, char *message) { 00154 if (debug) printf("*pub\r\n"); 00155 int topic_length = strlen(topic); 00156 int message_length = strlen(message); 00157 00158 int remaining_length = topic_length + message_length + 2; 00159 int remaining_length_2 = remaining_length_length(remaining_length); 00160 int packet_length = 1 + remaining_length + remaining_length_2; 00161 00162 char header = PUBLISH; 00163 char packet[packet_length]; 00164 // header 00165 memcpy(packet, &header, 1); 00166 get_remaining_length(remaining_length, packet); 00167 00168 // variable header: topic name 00169 char topic_size[2] = { topic_length / 256, topic_length % 256 }; 00170 memcpy(packet + 1 + remaining_length_2, topic_size, 2); 00171 memcpy(packet + 3 + remaining_length_2, topic, topic_length); 00172 00173 // payload: message 00174 memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length); 00175 00176 return send(packet, packet_length); 00177 } 00178 00179 void niMQTT::publish_received() { 00180 //remaining length 00181 int remaining_length = decode_remaining_length(); 00182 00183 // topic 00184 char mqtt_utf8_length[2]; 00185 socket->receive(mqtt_utf8_length, 2); 00186 int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1]; 00187 00188 if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length); 00189 00190 char topic[utf8_length + 1]; 00191 socket->receive(topic, utf8_length); 00192 topic[utf8_length] = 0; 00193 00194 // payload 00195 int message_length = remaining_length - utf8_length - 2; 00196 char message[message_length + 1]; 00197 socket->receive(message, message_length); 00198 message[message_length] = 0; 00199 00200 waiting_new_packet = true; 00201 00202 call_callback(topic, message); 00203 } 00204 00205 int niMQTT::puback() { 00206 if (debug) printf("*puback\r\n"); 00207 char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 }; 00208 return send(fixed_header, 4); 00209 } 00210 00211 int niMQTT::puback_received() { 00212 waiting_new_packet = true; 00213 return 0; // TODO 00214 } 00215 00216 int niMQTT::sub(char *topic, bool unsub) { 00217 if (debug) printf("*sub\r\n"); 00218 char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE; 00219 int topic_length = strlen(topic); 00220 00221 int remaining_length = topic_length + 5; 00222 int remaining_length_2 = remaining_length_length(remaining_length); 00223 int packet_length = 1 + remaining_length + remaining_length_2; 00224 00225 char header = command | LEAST_ONCE; 00226 char packet[packet_length]; 00227 // header 00228 memcpy(packet, &header, 1); 00229 get_remaining_length(remaining_length, packet); 00230 00231 // variable header: message identifier 00232 message_id++; 00233 char variable_header[] = { message_id / 256, message_id % 256 }; 00234 memcpy(packet + 1 + remaining_length_2, variable_header, 2); 00235 00236 // payload: topic name & requested QoS 00237 char topic_size[2] = { topic_length / 256, topic_length % 256 }; 00238 char requested_qos = MOST_ONCE; 00239 memcpy(packet + 3 + remaining_length_2, topic_size, 2); 00240 memcpy(packet + 5 + remaining_length_2, topic, topic_length); 00241 memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1); 00242 00243 waiting_suback++; 00244 00245 return send(packet, packet_length); 00246 } 00247 00248 int niMQTT::suback(bool unsub) { 00249 if (debug) printf("SUBACK received\r\n"); 00250 if (waiting_suback > 0) waiting_suback--; 00251 else { 00252 printf("SUBACK UNEXPECTED !\r\n"); 00253 reconnect(); 00254 return -2; 00255 } 00256 00257 //char command = (unsub) ? UNSUBACK : SUBACK; 00258 00259 int remaining_length = decode_remaining_length(); 00260 00261 // Variable Header 00262 char var_resp[remaining_length]; 00263 socket->receive(var_resp, remaining_length); 00264 waiting_new_packet = true; 00265 if (debug) { 00266 printf("suback: "); 00267 for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]); 00268 printf("\r\n"); 00269 } 00270 00271 if (var_resp[0] * 256 + var_resp[1] != message_id) { 00272 printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id); 00273 } 00274 00275 // here we should do things about the QoS if /unsuback, but let's say it's 0. 00276 00277 return (var_resp[0] * 256 + var_resp[1] == message_id); 00278 } 00279 00280 int niMQTT::pingreq () { 00281 if (debug) printf("*pingreq\r\n"); 00282 char fixed_header[] = { PINGREQ, 0 }; 00283 waiting_pingresp++; 00284 return send(fixed_header, 2); 00285 } 00286 00287 int niMQTT::pingresp() { 00288 if (debug) printf("PINGRESP Received\r\n"); 00289 if (waiting_pingresp > 0) waiting_pingresp--; 00290 else { 00291 printf("PINGRESP Unexpected !\r\n"); 00292 reconnect(); 00293 return -2; 00294 } 00295 00296 char resp; 00297 socket->receive(&resp, 1); 00298 waiting_new_packet = true; 00299 00300 if (resp != 0) { 00301 printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp); 00302 reconnect(); 00303 return -2; 00304 } 00305 00306 return (resp == 0); 00307 } 00308 00309 int niMQTT::disconnect() { 00310 if (debug) printf("*disconnect\r\n"); 00311 char fixed_header[] = { DISCONNECT, 0 }; 00312 return send(fixed_header, 2); 00313 } 00314 00315 niMQTT::~niMQTT() { 00316 if (debug) printf("*~niMQTT()\r\n"); 00317 connected = false; 00318 disconnect(); 00319 socket->close(); 00320 delete socket; 00321 } 00322 00323 void niMQTT::reconnect() { 00324 if (debug) printf("Reconnecting...\r\n"); 00325 disconnect(); 00326 socket->close(); 00327 00328 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0); 00329 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds 00330 00331 connect(); 00332 } 00333 00334 void niMQTT::thread_starter(void const *p) { 00335 niMQTT *instance = (niMQTT*)p; 00336 instance->thread_worker(); 00337 } 00338 00339 void niMQTT::thread_worker() { 00340 if (debug) printf("*thread_worker\r\n"); 00341 thread.signal_wait(START_THREAD); 00342 while (connected) { 00343 if (debug) printf("New loop in thread worker\r\n"); 00344 recv(); 00345 Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds 00346 if (!packet_sent) pingreq(); 00347 packet_sent = false; 00348 } 00349 } 00350 00351 void niMQTT::get_remaining_length(int remaining_length, char *packet) { 00352 int X = remaining_length; 00353 int n = 1; 00354 char digit; 00355 do { 00356 digit = X % 0x80; 00357 X /= 0x80; 00358 if (X > 0) digit |= 0x80; 00359 memcpy(packet + n, &digit, 1); 00360 n++; 00361 } while (X > 0); 00362 } 00363 00364 int niMQTT::decode_remaining_length() { 00365 int multiplier = 1; 00366 int value = 0; 00367 char digit = 0; 00368 do { 00369 while (socket->receive(&digit, 1) < 0) wait(0.1); 00370 value += (digit & 127) * multiplier; 00371 multiplier *= 128; 00372 } while ((digit & 0x80) != 0); 00373 return value; 00374 } 00375 00376 int niMQTT::remaining_length_length(int remaining_length) { 00377 int X = remaining_length; 00378 int rll = 0; 00379 do { 00380 rll++; 00381 X /= 0x80; 00382 } while (X > 0); 00383 return rll; 00384 } 00385 00386 void niMQTT::call_callback(const char *topic, const char *message) { 00387 callback(topic, message); 00388 }
Generated on Thu Jul 14 2022 07:20:53 by 1.7.2