Removed blocking keepalive code
Dependencies: EthernetInterface mbed-rtos
Fork of niMQTT by
Revision 0:d4dfed20c6ea, committed 2013-08-07
- Comitter:
- Nim65s
- Date:
- Wed Aug 07 12:57:21 2013 +0000
- Child:
- 1:4faa96fa4350
- Commit message:
- First fonctionnal release
Changed in this revision
| niMQTT.cpp | Show annotated file Show diff for this revision Revisions of this file |
| niMQTT.h | Show annotated file Show diff for this revision Revisions of this file |
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/niMQTT.cpp Wed Aug 07 12:57:21 2013 +0000
@@ -0,0 +1,353 @@
+#include "niMQTT.h"
+
+niMQTT::niMQTT(char *server, int port, char *id, void (*callback)(char*, char*), char *username, char *password, bool debug):
+ server(server), port(port), id(id), callback(callback), username(username), password(password),
+ debug(debug), connected(true), message_id(0), thread(&niMQTT::thread_starter, this),
+ waiting_new_packet(true), packet_sent(false), waiting_connack(0), waiting_suback(0), waiting_pingresp(0) {
+ init();
+}
+
+int niMQTT::init() {
+ if (debug) printf("*init\r\n");
+ socket = new TCPSocketConnection;
+ do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
+ socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
+
+ printf("Socket connected.\r\n");
+
+ thread.signal_set(START_THREAD);
+
+ return connect();
+}
+
+int niMQTT::send(char *packet, int size) {
+ //if (debug) {
+ printf("*send: ");
+ for(int i=0; i<size; i++) printf("0x%x ", packet[i]);
+ printf("\r\n");
+ //}
+
+ int j = -1;
+ do j = socket->send_all(packet, size); while (j < 0);
+
+ if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size);
+ else if (debug) printf("packet sent\r\n");
+ packet_sent = true;
+
+ return (j == size);
+}
+
+int niMQTT::recv() {
+ if (debug) printf("*recv\r\n");
+
+ int timeout = 0;
+ while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1);
+ if (timeout >= TIMEOUT/100) {
+ printf("RECV TIMEOUT\r\n");
+ if (waiting_connack > 0) printf("CONNACK not received !\r\n");
+ if (waiting_suback > 0) printf("SUBCONNACK not received !\r\n");
+ if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n");
+ // TODO launch connect/sub/pingrep again ?
+ return -1;
+ }
+
+ if (debug) printf("Receiving new packet...\r\n");
+
+ char header_received;
+ socket->receive(&header_received, 1);
+ if (debug) printf("Received 0x%x\r\n", header_received);
+
+ waiting_new_packet = false;
+ bool DUP = ((header_received & 4) == 4);
+ int QoS = (header_received & 6);
+ bool RETAIN = ((header_received & 1) == 1);
+
+ switch (header_received & 0xf0) {
+ case CONNACK: connack(); break;
+ case PUBLISH: publish_received(); break;
+ case PUBACK: puback_received(); break;
+ case SUBACK: suback(); break;
+ case UNSUBACK: suback(true); break;
+ case PINGRESP: pingresp(); break;
+ default: waiting_new_packet = true; printf("BAD HEADER: 0x%x\r\n", header_received); return -1;
+ }
+
+ return 0;
+}
+
+int niMQTT::connect() {
+ if (debug) printf("*connect\r\n");
+ int username_length = strlen(username);
+ int password_length = strlen(password);
+ int id_length = strlen(id);
+
+ int use_username = (username_length != 0);
+ int use_password = (password_length != 0);
+
+ char variable_header[] = {0,6,77,81,73,115,100,112,3,
+ use_username << 7 | use_password << 6,
+ KEEP_ALIVE / 256, KEEP_ALIVE % 256 };
+
+ int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password);
+ int packet_length = 2 + remaining_length;
+
+ char fixed_header[] = { CONNECT, remaining_length };
+
+ char packet[packet_length];
+ memcpy(packet, fixed_header, 2);
+ memcpy(packet + 2, variable_header, 12);
+
+ // Adds the payload: id
+ char id_size[2] = { id_length / 256, id_length % 256 };
+ memcpy(packet + 14, id_size, 2);
+ memcpy(packet + 16, id, id_length);
+
+ // Adds username & Password to the payload
+ if (use_username) {
+ char username_size[2] = { username_length / 256, username_length % 256 };
+ memcpy(packet + 16 + id_length, username_size, 2);
+ memcpy(packet + 18 + id_length, username, username_length);
+ }
+ if (use_password) {
+ char password_size[2] = { password_length / 256, password_length % 256 };
+ memcpy(packet + 18 + id_length + username_length, password_size, 2);
+ memcpy(packet + 20 + id_length + username_length, password, password_length);
+ }
+
+ waiting_connack++;
+
+ return send(packet, packet_length);
+}
+
+int niMQTT::connack() {
+ if (debug) printf("CONNACK Received\r\n");
+ if (waiting_connack > 0) waiting_connack--;
+ else printf("CONNACK UNEXPECTED !\r\n");
+
+ char resp[3];
+ socket->receive(resp, 3);
+ waiting_new_packet = true;
+
+ if (resp[0] != 0x2) printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]);
+ switch (resp[2]) {
+ case 0: printf("Connection Accepted\r\n"); break;
+ case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break;
+ case 2: printf("Connection Refused: identifier rejected\r\n"); break;
+ case 3: printf("Connection Refused: server unavailable\r\n"); break;
+ case 4: printf("Connection Refused: bad user name or password\r\n"); break;
+ case 5: printf("Connection Refused: not authorized\r\n"); break;
+ default: printf("I have no idea what I am doing\r\n");
+ }
+
+ return (resp[2] == 0);
+}
+
+int niMQTT::pub(char *topic, char *message) {
+ if (debug) printf("*pub\r\n");
+ int topic_length = strlen(topic);
+ int message_length = strlen(message);
+
+ int remaining_length = topic_length + message_length + 2;
+ int remaining_length_2 = remaining_length_length(remaining_length);
+ int packet_length = 1 + remaining_length + remaining_length_2;
+
+ char header = PUBLISH;
+ char packet[packet_length];
+ // header
+ memcpy(packet, &header, 1);
+ get_remaining_length(remaining_length, packet);
+
+ // variable header: topic name
+ char topic_size[2] = { topic_length / 256, topic_length % 256 };
+ memcpy(packet + 1 + remaining_length_2, topic_size, 2);
+ memcpy(packet + 3 + remaining_length_2, topic, topic_length);
+
+ // payload: message
+ memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length);
+
+ return send(packet, packet_length);
+}
+
+void niMQTT::publish_received() {
+ //remaining length
+ int remaining_length = decode_remaining_length();
+
+ // topic
+ char mqtt_utf8_length[2];
+ socket->receive(mqtt_utf8_length, 2);
+ int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1];
+
+ if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length);
+
+ char topic[utf8_length + 1];
+ socket->receive(topic, utf8_length);
+ topic[utf8_length] = 0;
+
+ // payload
+ int message_length = remaining_length - utf8_length - 2;
+ char message[message_length + 1];
+ socket->receive(message, message_length);
+ message[message_length] = 0;
+
+ waiting_new_packet = true;
+
+ callback(topic, message);
+}
+
+int niMQTT::puback() {
+ if (debug) printf("*puback\r\n");
+ char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 };
+ return send(fixed_header, 4);
+}
+
+int niMQTT::puback_received() {
+ waiting_new_packet = true;
+ return 0; // TODO
+}
+
+int niMQTT::sub(char *topic, bool unsub) {
+ if (debug) printf("*sub\r\n");
+ char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE;
+ int topic_length = strlen(topic);
+
+ int remaining_length = topic_length + 5;
+ int remaining_length_2 = remaining_length_length(remaining_length);
+ int packet_length = 1 + remaining_length + remaining_length_2;
+
+ char header = command | LEAST_ONCE;
+ char packet[packet_length];
+ // header
+ memcpy(packet, &header, 1);
+ get_remaining_length(remaining_length, packet);
+
+ // variable header: message identifier
+ message_id++;
+ char variable_header[] = { message_id / 256, message_id % 256 };
+ memcpy(packet + 1 + remaining_length_2, variable_header, 2);
+
+ // payload: topic name & requested QoS
+ char topic_size[2] = { topic_length / 256, topic_length % 256 };
+ char requested_qos = MOST_ONCE;
+ memcpy(packet + 3 + remaining_length_2, topic_size, 2);
+ memcpy(packet + 5 + remaining_length_2, topic, topic_length);
+ memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1);
+
+ waiting_suback++;
+
+ return send(packet, packet_length);
+}
+
+int niMQTT::suback(bool unsub) {
+ if (debug) printf("SUBACK received\r\n");
+ if (waiting_suback > 0) waiting_suback--;
+ else printf("SUBACK UNEXPECTED !\r\n");
+
+ char command = (unsub) ? UNSUBACK : SUBACK; // TODO
+
+ int remaining_length = decode_remaining_length();
+
+ // Variable Header
+ char var_resp[remaining_length];
+ socket->receive(var_resp, remaining_length);
+ waiting_new_packet = true;
+ if (debug) {
+ printf("suback: ");
+ for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]);
+ printf("\r\n");
+ }
+
+ if (var_resp[0] * 256 + var_resp[1] != message_id) {
+ printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id);
+ }
+
+ // here we should do things about the QoS if /unsuback, but let's say it's 0.
+
+ return (var_resp[0] * 256 + var_resp[1] == message_id);
+}
+
+int niMQTT::pingreq () {
+ if (debug) printf("*pingreq\r\n");
+ char fixed_header[] = { PINGREQ, 0 };
+ waiting_pingresp++;
+ return send(fixed_header, 2);
+}
+
+int niMQTT::pingresp() {
+ if (debug) printf("PINGRESP Received\r\n");
+ if (waiting_pingresp > 0) waiting_pingresp--;
+ else printf("PINGRESP Unexpected !\r\n");
+
+ char resp;
+ socket->receive(&resp, 1);
+ waiting_new_packet = true;
+
+ if (resp != 0) printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp);
+
+ return (resp == 0);
+}
+
+int niMQTT::disconnect() {
+ if (debug) printf("*disconnect\r\n");
+ char fixed_header[] = { DISCONNECT, 0 };
+ return send(fixed_header, 2);
+}
+
+niMQTT::~niMQTT() {
+ if (debug) printf("*~niMQTT()\r\n");
+ connected = false;
+ disconnect();
+ socket->close();
+ delete socket;
+}
+
+void niMQTT::thread_starter(void const *p) {
+ niMQTT *instance = (niMQTT*)p;
+ instance->thread_worker();
+}
+
+void niMQTT::thread_worker() {
+ if (debug) printf("*thread_worker\r\n");
+ thread.signal_wait(START_THREAD);
+ while (connected) {
+ if (debug) printf("New loop in thread worker\r\n");
+ recv();
+ Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds
+ if (!packet_sent) pingreq();
+ packet_sent = false;
+ }
+}
+
+void niMQTT::get_remaining_length(int remaining_length, char *packet) {
+ int X = remaining_length;
+ int n = 1;
+ char digit;
+ do {
+ digit = X % 0x80;
+ X /= 0x80;
+ if (X > 0) digit |= 0x80;
+ memcpy(packet + n, &digit, 1);
+ n++;
+ } while (X > 0);
+}
+
+int niMQTT::decode_remaining_length() {
+ int multiplier = 1;
+ int value = 0;
+ char digit = 0;
+ do {
+ while (socket->receive(&digit, 1) < 0) wait(0.1);
+ value += (digit & 127) * multiplier;
+ multiplier *= 128;
+ } while ((digit & 0x80) != 0);
+ return value;
+}
+
+int niMQTT::remaining_length_length(int remaining_length) {
+ int X = remaining_length;
+ int rll = 0;
+ do {
+ rll++;
+ X /= 0x80;
+ } while (X > 0);
+ return rll;
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/niMQTT.h Wed Aug 07 12:57:21 2013 +0000
@@ -0,0 +1,123 @@
+#ifndef NIMQTT_H
+#define NIMQTT_H
+
+#include "mbed.h"
+#include "rtos.h"
+#include "EthernetInterface.h"
+
+enum { // Message Type
+ ZERO,
+ CONNECT_NUM,
+ CONNACK_NUM,
+ PUBLISH_NUM,
+ PUBACK_NUM,
+ PUBREC_NUM,
+ PUBREL_NUM,
+ PUBCOMP_NUM,
+ SUBSCRIBE_NUM,
+ SUBACK_NUM,
+ UNSUBSCRIBE_NUM,
+ UNSUBACK_NUM,
+ PINGREQ_NUM,
+ PINGRESP_NUM,
+ DISCONNECT_NUM
+};
+
+#define CONNECT CONNECT_NUM << 4
+#define CONNACK CONNACK_NUM << 4
+#define PUBLISH PUBLISH_NUM << 4
+#define PUBACK PUBACK_NUM << 4
+#define PUBREC PUBREC_NUM << 4
+#define PUBREL PUBREL_NUM << 4
+#define PUBCOMP PUBCOMP_NUM << 4
+#define SUBSCRIBE SUBSCRIBE_NUM << 4
+#define SUBACK SUBACK_NUM << 4
+#define UNSUBSCRIBE UNSUBSCRIBE_NUM << 4
+#define UNSUBACK UNSUBACK_NUM << 4
+#define PINGREQ PINGREQ_NUM << 4
+#define PINGRESP PINGRESP_NUM << 4
+#define DISCONNECT DISCONNECT_NUM << 4
+
+enum { // QoS level
+ MOST_ONCE_NUM,
+ LEAST_ONCE_NUM,
+ EXACTLY_ONCE_NUM
+};
+
+#define MOST_ONCE MOST_ONCE_NUM << 1
+#define LEAST_ONCE LEAST_ONCE_NUM << 1
+#define EXACTLY_ONCE EXACTLY_ONCE_NUM << 1
+
+#define KEEP_ALIVE 300 // seconds
+#define TIMEOUT 1000 // ms
+
+#define START_THREAD 1
+
+class niMQTT {
+ public:
+ /** Initialise and launch the MQTT Client
+ * \param server the address of your server
+ * \param port the port of your server
+ * \param id the id of this client (should be unique)
+ * \param callback a callback to execute on receiving a PUBLISH
+ * \param username your username for the server
+ * \param password your password for the server
+ * \param debug get a more verbose output
+ */
+ niMQTT(char *server, int port=1884, char *id="mbed", void (*callback)(char *, char*), char *username="", char *password="", bool debug=false);
+ ~niMQTT();
+
+ /* Publish a message on a topic
+ * \param topic the topic
+ * \param message the message
+ */
+ int pub(char *topic, char *message);
+
+ /** Subscribe to a topic
+ * \param topic the topic
+ */
+ int sub(char *topic, bool unsub=false);
+
+ private:
+ char *server;
+ int port;
+ char *id;
+ void (*callback)(char *, char*);
+ char *username;
+ char *password;
+
+ bool debug;
+ bool connected;
+ int message_id;
+
+ TCPSocketConnection *socket;
+
+ int remaining_length_length(int remaining_length);
+ void get_remaining_length(int remaining_length, char *packet);
+ int send(char *packet, int size);
+
+ int decode_remaining_length();
+ int recv();
+
+ int init();
+
+ int connect();
+ int connack();
+ int suback(bool unsub=false);
+ int puback();
+ int puback_received();
+ int pingreq();
+ int pingresp();
+ int disconnect();
+
+ static void thread_starter(void const *p);
+ void thread_worker();
+ Thread thread;
+
+ void publish_received();
+
+ bool waiting_new_packet, packet_sent;
+ int waiting_connack, waiting_suback, waiting_pingresp;
+};
+
+#endif
