An MQTT Client for the new etherNet Interface.

Dependencies:   EthernetInterface mbed-rtos

Dependents:   AV_MQTT niMQTT_example

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers niMQTT.cpp Source File

niMQTT.cpp

00001 #include "niMQTT.h"
00002 
00003 niMQTT::niMQTT(char *server, void (*callback)(const char*, const char*), char *id, int port, char *username, char *password, bool debug):
00004     server(server), port(port), id(id), callback(callback), username(username), password(password),
00005     debug(debug), connected(true), message_id(0), thread(&niMQTT::thread_starter, this),
00006     waiting_new_packet(true), packet_sent(false), waiting_connack(0), waiting_suback(0), waiting_pingresp(0) {
00007     init();
00008 }
00009 
00010 int niMQTT::init() {
00011     if (debug) printf("*init\r\n");
00012     socket = new TCPSocketConnection;
00013     do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
00014     socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
00015 
00016     printf("Socket connected.\r\n");
00017 
00018     thread.signal_set(START_THREAD);
00019 
00020     return connect();
00021 }
00022 
00023 int niMQTT::send(char *packet, int size) {
00024     //if (debug) {
00025         printf("*send: ");
00026         for(int i=0; i<size; i++) printf("0x%x ", packet[i]);
00027         printf("\r\n");
00028     //}
00029 
00030     int j = -1;
00031     do j = socket->send_all(packet, size); while (j < 0);
00032 
00033     if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size);
00034     else if (debug) printf("packet sent\r\n");
00035     packet_sent = true;
00036 
00037     return (j == size);
00038 }
00039 
00040 int niMQTT::recv() {
00041     if (debug) printf("*recv\r\n");
00042 
00043     int timeout = 0;
00044     while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1);
00045     if (timeout >= TIMEOUT/100) {
00046         printf("RECV TIMEOUT\r\n");
00047         if (waiting_connack > 0) printf("CONNACK not received !\r\n");
00048         if (waiting_suback > 0) printf("SUBACK not received !\r\n");
00049         if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n");
00050         reconnect();
00051         return -1;
00052     }
00053 
00054     if (debug) printf("Receiving new packet...\r\n");
00055 
00056     char header_received;
00057     socket->receive(&header_received, 1);
00058     if (debug) printf("Received 0x%x\r\n", header_received);
00059 
00060     waiting_new_packet = false;
00061     //bool DUP = ((header_received & 4) == 4);
00062     //int QoS = (header_received & 6);
00063     //bool RETAIN = ((header_received & 1) == 1);
00064 
00065     switch (header_received & 0xf0) {
00066         case CONNACK: connack(); break;
00067         case PUBLISH: publish_received(); break;
00068         case PUBACK: puback_received(); break;
00069         case SUBACK: suback(); break;
00070         case UNSUBACK: suback(true); break;
00071         case PINGRESP: pingresp(); break;
00072         default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1;
00073     }
00074 
00075     return 0;
00076 }
00077 
00078 int niMQTT::connect() {
00079     if (debug) printf("*connect\r\n");
00080     int username_length = strlen(username);
00081     int password_length = strlen(password);
00082     int id_length = strlen(id);
00083 
00084     int use_username = (username_length != 0);
00085     int use_password = (password_length != 0);
00086 
00087     char variable_header[] = {0,6,77,81,73,115,100,112,3,
00088         use_username << 7 | use_password << 6,
00089         KEEP_ALIVE / 256, KEEP_ALIVE % 256 };
00090 
00091     int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password);
00092     int packet_length = 2 + remaining_length;
00093 
00094     char fixed_header[] = { CONNECT, remaining_length };
00095 
00096     char packet[packet_length];
00097     memcpy(packet, fixed_header, 2);
00098     memcpy(packet + 2, variable_header, 12);
00099 
00100     // Adds the payload: id
00101     char id_size[2] = { id_length / 256, id_length % 256 };
00102     memcpy(packet + 14, id_size, 2);
00103     memcpy(packet + 16, id, id_length);
00104 
00105     // Adds username & Password to the payload
00106     if (use_username) {
00107         char username_size[2] = { username_length / 256, username_length % 256 };
00108         memcpy(packet + 16 + id_length, username_size, 2);
00109         memcpy(packet + 18 + id_length, username, username_length);
00110     }
00111     if (use_password) {
00112         char password_size[2] = { password_length / 256, password_length % 256 };
00113         memcpy(packet + 18 + id_length + username_length, password_size, 2);
00114         memcpy(packet + 20 + id_length + username_length, password, password_length);
00115     }
00116 
00117     waiting_connack++;
00118 
00119     return send(packet, packet_length);
00120 }
00121 
00122 int niMQTT::connack() {
00123     if (debug) printf("CONNACK Received\r\n");
00124     if (waiting_connack > 0) waiting_connack--;
00125     else {
00126         printf("CONNACK UNEXPECTED !\r\n");
00127         reconnect();
00128         return -2;
00129     }
00130 
00131     char resp[3];
00132     socket->receive(resp, 3);
00133     waiting_new_packet = true;
00134 
00135     if (resp[0] != 0x2) {
00136         printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]);
00137         reconnect();
00138         return -2;
00139     }
00140     switch (resp[2]) {
00141         case 0: printf("Connection Accepted\r\n"); break;
00142         case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break;
00143         case 2: printf("Connection Refused: identifier rejected\r\n"); break;
00144         case 3: printf("Connection Refused: server unavailable\r\n"); break;
00145         case 4: printf("Connection Refused: bad user name or password\r\n"); break;
00146         case 5: printf("Connection Refused: not authorized\r\n"); break;
00147         default: printf("I have no idea what I am doing\r\n");
00148     }
00149 
00150     return (resp[2] == 0);
00151 }
00152 
00153 int niMQTT::pub(char *topic, char *message) {
00154     if (debug) printf("*pub\r\n");
00155     int topic_length = strlen(topic);
00156     int message_length = strlen(message);
00157 
00158     int remaining_length = topic_length + message_length + 2;
00159     int remaining_length_2 = remaining_length_length(remaining_length);
00160     int packet_length = 1 + remaining_length + remaining_length_2;
00161 
00162     char header = PUBLISH;
00163     char packet[packet_length];
00164     // header
00165     memcpy(packet, &header, 1);
00166     get_remaining_length(remaining_length, packet);
00167 
00168     // variable header: topic name
00169     char topic_size[2] = { topic_length / 256, topic_length % 256 };
00170     memcpy(packet + 1 + remaining_length_2, topic_size, 2);
00171     memcpy(packet + 3 + remaining_length_2, topic, topic_length);
00172 
00173     // payload: message
00174     memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length);
00175 
00176     return send(packet, packet_length);
00177 }
00178 
00179 void niMQTT::publish_received() {
00180     //remaining length
00181     int remaining_length = decode_remaining_length();
00182 
00183     // topic
00184     char mqtt_utf8_length[2];
00185     socket->receive(mqtt_utf8_length, 2);
00186     int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1];
00187 
00188     if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length);
00189 
00190     char topic[utf8_length + 1];
00191     socket->receive(topic, utf8_length);
00192     topic[utf8_length] = 0;
00193 
00194     // payload
00195     int message_length = remaining_length - utf8_length - 2;
00196     char message[message_length + 1];
00197     socket->receive(message, message_length);
00198     message[message_length] = 0;
00199 
00200     waiting_new_packet = true;
00201 
00202     call_callback(topic, message);
00203 }
00204 
00205 int niMQTT::puback() {
00206     if (debug) printf("*puback\r\n");
00207     char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 };
00208     return send(fixed_header, 4);
00209 }
00210 
00211 int niMQTT::puback_received() {
00212     waiting_new_packet = true;
00213     return 0; // TODO
00214 }
00215 
00216 int niMQTT::sub(char *topic, bool unsub) {
00217     if (debug) printf("*sub\r\n");
00218     char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE;
00219     int topic_length = strlen(topic);
00220 
00221     int remaining_length = topic_length + 5;
00222     int remaining_length_2 = remaining_length_length(remaining_length);
00223     int packet_length = 1 + remaining_length + remaining_length_2;
00224 
00225     char header = command | LEAST_ONCE;
00226     char packet[packet_length];
00227     // header
00228     memcpy(packet, &header, 1);
00229     get_remaining_length(remaining_length, packet);
00230 
00231     // variable header: message identifier
00232     message_id++;
00233     char variable_header[] = { message_id / 256, message_id % 256 };
00234     memcpy(packet + 1 + remaining_length_2, variable_header, 2);
00235 
00236     // payload: topic name & requested QoS
00237     char topic_size[2] = { topic_length / 256, topic_length % 256 };
00238     char requested_qos = MOST_ONCE;
00239     memcpy(packet + 3 + remaining_length_2, topic_size, 2);
00240     memcpy(packet + 5 + remaining_length_2, topic, topic_length);
00241     memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1);
00242 
00243     waiting_suback++;
00244 
00245     return send(packet, packet_length);
00246 }
00247 
00248 int niMQTT::suback(bool unsub) {
00249     if (debug) printf("SUBACK received\r\n");
00250     if (waiting_suback > 0) waiting_suback--;
00251     else {
00252         printf("SUBACK UNEXPECTED !\r\n");
00253         reconnect();
00254         return -2;
00255     }
00256 
00257     //char command = (unsub) ? UNSUBACK : SUBACK;
00258 
00259     int remaining_length = decode_remaining_length();
00260 
00261     // Variable Header
00262     char var_resp[remaining_length];
00263     socket->receive(var_resp, remaining_length);
00264     waiting_new_packet = true;
00265     if (debug) {
00266         printf("suback: ");
00267         for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]);
00268         printf("\r\n");
00269     }
00270 
00271     if (var_resp[0] * 256 + var_resp[1] != message_id) {
00272         printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id);
00273     }
00274 
00275     // here we should do things about the QoS if /unsuback, but let's say it's 0.
00276 
00277     return (var_resp[0] * 256 + var_resp[1] == message_id);
00278 }
00279 
00280 int niMQTT::pingreq () {
00281     if (debug) printf("*pingreq\r\n");
00282     char fixed_header[] = { PINGREQ, 0 };
00283     waiting_pingresp++;
00284     return send(fixed_header, 2);
00285 }
00286 
00287 int niMQTT::pingresp() {
00288     if (debug) printf("PINGRESP Received\r\n");
00289     if (waiting_pingresp > 0) waiting_pingresp--;
00290     else {
00291         printf("PINGRESP Unexpected !\r\n");
00292         reconnect();
00293         return -2;
00294     }
00295 
00296     char resp;
00297     socket->receive(&resp, 1);
00298     waiting_new_packet = true;
00299 
00300     if (resp != 0) {
00301         printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp);
00302         reconnect();
00303         return -2;
00304     }
00305 
00306     return (resp == 0);
00307 }
00308 
00309 int niMQTT::disconnect() {
00310     if (debug) printf("*disconnect\r\n");
00311     char fixed_header[] = { DISCONNECT, 0 };
00312     return send(fixed_header, 2);
00313 }
00314 
00315 niMQTT::~niMQTT() {
00316     if (debug) printf("*~niMQTT()\r\n");
00317     connected = false;
00318     disconnect();
00319     socket->close();
00320     delete socket;
00321 }
00322 
00323 void niMQTT::reconnect() {
00324     if (debug) printf("Reconnecting...\r\n");
00325     disconnect();
00326     socket->close();
00327     
00328     do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
00329     socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
00330     
00331     connect();
00332 }
00333 
00334 void niMQTT::thread_starter(void const *p) {
00335     niMQTT *instance = (niMQTT*)p;
00336     instance->thread_worker();
00337 }
00338 
00339 void niMQTT::thread_worker() {
00340     if (debug) printf("*thread_worker\r\n");
00341     thread.signal_wait(START_THREAD);
00342     while (connected) {
00343         if (debug) printf("New loop in thread worker\r\n");
00344         recv();
00345         Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds
00346         if (!packet_sent) pingreq();
00347         packet_sent = false;
00348     }
00349 }
00350 
00351 void niMQTT::get_remaining_length(int remaining_length, char *packet) {
00352     int X = remaining_length;
00353     int n = 1;
00354     char digit;
00355     do {
00356         digit = X % 0x80;
00357         X /= 0x80;
00358         if (X > 0) digit |= 0x80;
00359         memcpy(packet + n, &digit, 1);
00360         n++;
00361     } while (X > 0);
00362 }
00363 
00364 int niMQTT::decode_remaining_length() {
00365     int multiplier = 1;
00366     int value = 0;
00367     char digit = 0;
00368     do {
00369         while (socket->receive(&digit, 1) < 0) wait(0.1);
00370         value += (digit & 127) * multiplier;
00371         multiplier *= 128;
00372     } while ((digit & 0x80) != 0);
00373     return value;
00374 }
00375 
00376 int niMQTT::remaining_length_length(int remaining_length) {
00377     int X = remaining_length;
00378     int rll = 0;
00379     do {
00380         rll++;
00381         X /= 0x80;
00382     } while (X > 0);
00383     return rll;
00384 }
00385 
00386 void niMQTT::call_callback(const char *topic, const char *message) {
00387     callback(topic, message);
00388 }