niMQTT

Dependencies:   EthernetInterface mbed-rtos

Fork of niMQTT by Guilhem Saurel

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers niMQTT.cpp Source File

niMQTT.cpp

00001 #include "niMQTT.h"
00002 
00003 niMQTT::niMQTT(char *server, void (*callback)(const char*, const char*), 
00004     char *id, int port, char *username, char *password, bool debug):
00005     server(server), port(port), id(id), callback(callback), username(username), 
00006     password(password),
00007     debug(debug), connected(true), message_id(0), 
00008     ping_thread(&niMQTT::ping_thread_starter, this),
00009     recv_thread(&niMQTT::recv_thread_starter, this),
00010     waiting_new_packet(true), packet_sent(false), waiting_connack(0), 
00011     waiting_suback(0), waiting_pingresp(0) 
00012 {
00013     init();
00014 }
00015 
00016 int niMQTT::init() {
00017     if (debug) printf("*init\r\n");
00018     socket = new TCPSocketConnection;
00019     do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
00020     socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
00021 
00022     printf("Socket connected.\r\n");
00023 
00024     ping_thread.signal_set(START_THREAD);
00025     recv_thread.signal_set(START_THREAD);
00026 
00027     return connect();
00028 }
00029 
00030 int niMQTT::send(char *packet, int size) {
00031     //if (debug) {
00032         printf("*send: ");
00033         for(int i=0; i<size; i++) printf("0x%x ", packet[i]);
00034         printf("\r\n");
00035     //}
00036 
00037     int j = -1;
00038     do j = socket->send_all(packet, size); while (j < 0);
00039 
00040     if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size);
00041     else if (debug) printf("packet sent\r\n");
00042     packet_sent = true;
00043 
00044     return (j == size);
00045 }
00046 
00047 int niMQTT::recv() {
00048     if (debug) printf("*recv\r\n");
00049 
00050     int timeout = 0;
00051     while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1);
00052     if (timeout >= TIMEOUT/100) {
00053         printf("RECV TIMEOUT\r\n");
00054         if (waiting_connack > 0) printf("CONNACK not received !\r\n");
00055         if (waiting_suback > 0) printf("SUBACK not received !\r\n");
00056         if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n");
00057         reconnect();
00058         return -1;
00059     }
00060 
00061     if (debug) printf("Receiving new packet...\r\n");
00062 
00063     char header_received;
00064     socket->receive(&header_received, 1);
00065     if (debug) printf("Received 0x%x\r\n", header_received);
00066 
00067     waiting_new_packet = false;
00068     //bool DUP = ((header_received & 4) == 4);
00069     //int QoS = (header_received & 6);
00070     //bool RETAIN = ((header_received & 1) == 1);
00071 
00072     switch (header_received & 0xf0) {
00073         case CONNACK: connack(); break;
00074         case PUBLISH: publish_received(); break;
00075         case PUBACK: puback_received(); break;
00076         case SUBACK: suback(); break;
00077         case UNSUBACK: suback(true); break;
00078         case PINGRESP: pingresp(); break;
00079         default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1;
00080     }
00081 
00082     return 0;
00083 }
00084 
00085 int niMQTT::connect() {
00086     if (debug) printf("*connect\r\n");
00087     int username_length = strlen(username);
00088     int password_length = strlen(password);
00089     int id_length = strlen(id);
00090 
00091     int use_username = (username_length != 0);
00092     int use_password = (password_length != 0);
00093 
00094     char variable_header[] = {0,6,77,81,73,115,100,112,3,
00095         use_username << 7 | use_password << 6,
00096         KEEP_ALIVE / 256, KEEP_ALIVE % 256 };
00097 
00098     int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password);
00099     int packet_length = 2 + remaining_length;
00100 
00101     char fixed_header[] = { CONNECT, remaining_length };
00102 
00103     char packet[packet_length];
00104     memcpy(packet, fixed_header, 2);
00105     memcpy(packet + 2, variable_header, 12);
00106 
00107     // Adds the payload: id
00108     char id_size[2] = { id_length / 256, id_length % 256 };
00109     memcpy(packet + 14, id_size, 2);
00110     memcpy(packet + 16, id, id_length);
00111 
00112     // Adds username & Password to the payload
00113     if (use_username) {
00114         char username_size[2] = { username_length / 256, username_length % 256 };
00115         memcpy(packet + 16 + id_length, username_size, 2);
00116         memcpy(packet + 18 + id_length, username, username_length);
00117     }
00118     if (use_password) {
00119         char password_size[2] = { password_length / 256, password_length % 256 };
00120         memcpy(packet + 18 + id_length + username_length, password_size, 2);
00121         memcpy(packet + 20 + id_length + username_length, password, password_length);
00122     }
00123 
00124     waiting_connack++;
00125 
00126     return send(packet, packet_length);
00127 }
00128 
00129 int niMQTT::connack() {
00130     if (debug) printf("CONNACK Received\r\n");
00131     if (waiting_connack > 0) waiting_connack--;
00132     else {
00133         printf("CONNACK UNEXPECTED !\r\n");
00134         reconnect();
00135         return -2;
00136     }
00137 
00138     char resp[3];
00139     socket->receive(resp, 3);
00140     waiting_new_packet = true;
00141 
00142     if (resp[0] != 0x2) {
00143         printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]);
00144         reconnect();
00145         return -2;
00146     }
00147     switch (resp[2]) {
00148         case 0: printf("Connection Accepted\r\n"); break;
00149         case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break;
00150         case 2: printf("Connection Refused: identifier rejected\r\n"); break;
00151         case 3: printf("Connection Refused: server unavailable\r\n"); break;
00152         case 4: printf("Connection Refused: bad user name or password\r\n"); break;
00153         case 5: printf("Connection Refused: not authorized\r\n"); break;
00154         default: printf("I have no idea what I am doing\r\n");
00155     }
00156 
00157     return (resp[2] == 0);
00158 }
00159 
00160 int niMQTT::pub(char *topic, char *message) {
00161     if (debug) printf("*pub\r\n");
00162     int topic_length = strlen(topic);
00163     int message_length = strlen(message);
00164 
00165     int remaining_length = topic_length + message_length + 2;
00166     int remaining_length_2 = remaining_length_length(remaining_length);
00167     int packet_length = 1 + remaining_length + remaining_length_2;
00168 
00169     char header = PUBLISH;
00170     char packet[packet_length];
00171     // header
00172     memcpy(packet, &header, 1);
00173     get_remaining_length(remaining_length, packet);
00174 
00175     // variable header: topic name
00176     char topic_size[2] = { topic_length / 256, topic_length % 256 };
00177     memcpy(packet + 1 + remaining_length_2, topic_size, 2);
00178     memcpy(packet + 3 + remaining_length_2, topic, topic_length);
00179 
00180     // payload: message
00181     memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length);
00182 
00183     return send(packet, packet_length);
00184 }
00185 
00186 void niMQTT::publish_received() {
00187     //remaining length
00188     int remaining_length = decode_remaining_length();
00189 
00190     // topic
00191     char mqtt_utf8_length[2];
00192     socket->receive(mqtt_utf8_length, 2);
00193     int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1];
00194 
00195     if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length);
00196 
00197     char topic[utf8_length + 1];
00198     socket->receive(topic, utf8_length);
00199     topic[utf8_length] = 0;
00200 
00201     // payload
00202     int message_length = remaining_length - utf8_length - 2;
00203     char message[message_length + 1];
00204     socket->receive(message, message_length);
00205     message[message_length] = 0;
00206 
00207     waiting_new_packet = true;
00208 
00209     call_callback(topic, message);
00210 }
00211 
00212 int niMQTT::puback() {
00213     if (debug) printf("*puback\r\n");
00214     char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 };
00215     return send(fixed_header, 4);
00216 }
00217 
00218 int niMQTT::puback_received() {
00219     waiting_new_packet = true;
00220     return 0; // TODO
00221 }
00222 
00223 int niMQTT::sub(char *topic, bool unsub) {
00224     if (debug) printf("*sub\r\n");
00225     char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE;
00226     int topic_length = strlen(topic);
00227 
00228     int remaining_length = topic_length + 5;
00229     int remaining_length_2 = remaining_length_length(remaining_length);
00230     int packet_length = 1 + remaining_length + remaining_length_2;
00231 
00232     char header = command | LEAST_ONCE;
00233     char packet[packet_length];
00234     // header
00235     memcpy(packet, &header, 1);
00236     get_remaining_length(remaining_length, packet);
00237 
00238     // variable header: message identifier
00239     message_id++;
00240     char variable_header[] = { message_id / 256, message_id % 256 };
00241     memcpy(packet + 1 + remaining_length_2, variable_header, 2);
00242 
00243     // payload: topic name & requested QoS
00244     char topic_size[2] = { topic_length / 256, topic_length % 256 };
00245     char requested_qos = MOST_ONCE;
00246     memcpy(packet + 3 + remaining_length_2, topic_size, 2);
00247     memcpy(packet + 5 + remaining_length_2, topic, topic_length);
00248     memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1);
00249 
00250     waiting_suback++;
00251 
00252     return send(packet, packet_length);
00253 }
00254 
00255 int niMQTT::suback(bool unsub) {
00256     if (debug) printf("SUBACK received\r\n");
00257     if (waiting_suback > 0) waiting_suback--;
00258     else {
00259         printf("SUBACK UNEXPECTED !\r\n");
00260         reconnect();
00261         return -2;
00262     }
00263 
00264     //char command = (unsub) ? UNSUBACK : SUBACK;
00265 
00266     int remaining_length = decode_remaining_length();
00267 
00268     // Variable Header
00269     char var_resp[remaining_length];
00270     socket->receive(var_resp, remaining_length);
00271     waiting_new_packet = true;
00272     if (debug) {
00273         printf("suback: ");
00274         for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]);
00275         printf("\r\n");
00276     }
00277 
00278     if (var_resp[0] * 256 + var_resp[1] != message_id) {
00279         printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id);
00280     }
00281 
00282     // here we should do things about the QoS if /unsuback, but let's say it's 0.
00283 
00284     return (var_resp[0] * 256 + var_resp[1] == message_id);
00285 }
00286 
00287 int niMQTT::pingreq () {
00288     if (debug) printf("*pingreq\r\n");
00289     char fixed_header[] = { PINGREQ, 0 };
00290     waiting_pingresp++;
00291     return send(fixed_header, 2);
00292 }
00293 
00294 int niMQTT::pingresp() {
00295     if (debug) printf("PINGRESP Received\r\n");
00296     if (waiting_pingresp > 0) waiting_pingresp--;
00297     else {
00298         printf("PINGRESP Unexpected !\r\n");
00299         reconnect();
00300         return -2;
00301     }
00302 
00303     char resp;
00304     socket->receive(&resp, 1);
00305     waiting_new_packet = true;
00306 
00307     if (resp != 0) {
00308         printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp);
00309         reconnect();
00310         return -2;
00311     }
00312 
00313     return (resp == 0);
00314 }
00315 
00316 int niMQTT::disconnect() {
00317     if (debug) printf("*disconnect\r\n");
00318     char fixed_header[] = { DISCONNECT, 0 };
00319     return send(fixed_header, 2);
00320 }
00321 
00322 niMQTT::~niMQTT() {
00323     if (debug) printf("*~niMQTT()\r\n");
00324     connected = false;
00325     disconnect();
00326     socket->close();
00327     delete socket;
00328 }
00329 
00330 void niMQTT::reconnect() {
00331     if (debug) printf("Reconnecting...\r\n");
00332     disconnect();
00333     socket->close();
00334     
00335     do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
00336     socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
00337     
00338     connect();
00339 }
00340 
00341 void niMQTT::ping_thread_starter(void const *p) {
00342     niMQTT *instance = (niMQTT*)p;
00343     instance->ping_thread_worker();
00344 }
00345 
00346 void niMQTT::recv_thread_starter(void const *p) {
00347     niMQTT *instance = (niMQTT*)p;
00348     instance->recv_thread_worker();
00349 }
00350 
00351 void niMQTT::ping_thread_worker() {
00352     if (debug) printf("*ping_thread_worker\r\n");
00353     ping_thread.signal_wait(START_THREAD);
00354     while (connected) {
00355         if (debug) printf("New loop in ping thread worker\r\n");
00356         Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds
00357         if (!packet_sent) pingreq();
00358         packet_sent = false;
00359     }
00360 }
00361 
00362 void niMQTT::recv_thread_worker() {
00363     if (debug) printf("*recv_thread_worker\r\n");
00364     recv_thread.signal_wait(START_THREAD);
00365     while (connected) {
00366         if (debug) printf("New loop in recv thread worker\r\n");
00367         recv();
00368         packet_sent = false;
00369     }
00370 }
00371 
00372 void niMQTT::get_remaining_length(int remaining_length, char *packet) {
00373     int X = remaining_length;
00374     int n = 1;
00375     char digit;
00376     do {
00377         digit = X % 0x80;
00378         X /= 0x80;
00379         if (X > 0) digit |= 0x80;
00380         memcpy(packet + n, &digit, 1);
00381         n++;
00382     } while (X > 0);
00383 }
00384 
00385 int niMQTT::decode_remaining_length() {
00386     int multiplier = 1;
00387     int value = 0;
00388     char digit = 0;
00389     do {
00390         while (socket->receive(&digit, 1) < 0) wait(0.1);
00391         value += (digit & 127) * multiplier;
00392         multiplier *= 128;
00393     } while ((digit & 0x80) != 0);
00394     return value;
00395 }
00396 
00397 int niMQTT::remaining_length_length(int remaining_length) {
00398     int X = remaining_length;
00399     int rll = 0;
00400     do {
00401         rll++;
00402         X /= 0x80;
00403     } while (X > 0);
00404     return rll;
00405 }
00406 
00407 void niMQTT::call_callback(const char *topic, const char *message) {
00408     callback(topic, message);
00409 }