Version of niMQTT library which includes separate process for pings and ability to publish retained messages.

Dependencies:   EthernetInterface mbed-rtos mbed

Fork of niMQTT by Juan Carlos Jimenez

Committer:
cosm
Date:
Thu Jul 03 18:04:36 2014 +0000
Revision:
12:ae25ab913163
Parent:
11:d508e0b7ed4d
fix default issue

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Nim65s 0:d4dfed20c6ea 1 #include "niMQTT.h"
Nim65s 0:d4dfed20c6ea 2
Palantir 9:3be69efa4402 3 niMQTT::niMQTT(char *server, void (*callback)(const char*, const char*),
Palantir 9:3be69efa4402 4 char *id, int port, char *username, char *password, bool debug):
Palantir 9:3be69efa4402 5 server(server), port(port), id(id), callback(callback), username(username),
Palantir 9:3be69efa4402 6 password(password),
Palantir 9:3be69efa4402 7 debug(debug), connected(true), message_id(0),
Palantir 9:3be69efa4402 8 ping_thread(&niMQTT::ping_thread_starter, this),
Palantir 9:3be69efa4402 9 recv_thread(&niMQTT::recv_thread_starter, this),
Palantir 9:3be69efa4402 10 waiting_new_packet(true), packet_sent(false), waiting_connack(0),
Palantir 9:3be69efa4402 11 waiting_suback(0), waiting_pingresp(0)
Palantir 9:3be69efa4402 12 {
Nim65s 0:d4dfed20c6ea 13 init();
Nim65s 0:d4dfed20c6ea 14 }
Nim65s 0:d4dfed20c6ea 15
Nim65s 0:d4dfed20c6ea 16 int niMQTT::init() {
Nim65s 0:d4dfed20c6ea 17 if (debug) printf("*init\r\n");
Nim65s 0:d4dfed20c6ea 18 socket = new TCPSocketConnection;
Nim65s 0:d4dfed20c6ea 19 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
Nim65s 0:d4dfed20c6ea 20 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
Nim65s 0:d4dfed20c6ea 21
Nim65s 0:d4dfed20c6ea 22 printf("Socket connected.\r\n");
Nim65s 0:d4dfed20c6ea 23
Palantir 9:3be69efa4402 24 ping_thread.signal_set(START_THREAD);
Palantir 9:3be69efa4402 25 recv_thread.signal_set(START_THREAD);
Nim65s 0:d4dfed20c6ea 26
Nim65s 0:d4dfed20c6ea 27 return connect();
Nim65s 0:d4dfed20c6ea 28 }
Nim65s 0:d4dfed20c6ea 29
Nim65s 0:d4dfed20c6ea 30 int niMQTT::send(char *packet, int size) {
Nim65s 0:d4dfed20c6ea 31 //if (debug) {
Nim65s 0:d4dfed20c6ea 32 printf("*send: ");
Nim65s 0:d4dfed20c6ea 33 for(int i=0; i<size; i++) printf("0x%x ", packet[i]);
Nim65s 0:d4dfed20c6ea 34 printf("\r\n");
Nim65s 0:d4dfed20c6ea 35 //}
Nim65s 0:d4dfed20c6ea 36
Nim65s 0:d4dfed20c6ea 37 int j = -1;
Nim65s 0:d4dfed20c6ea 38 do j = socket->send_all(packet, size); while (j < 0);
Nim65s 0:d4dfed20c6ea 39
Nim65s 0:d4dfed20c6ea 40 if (j != size) printf ("%d bytes sent (%d expected)...\r\n", j, size);
Nim65s 0:d4dfed20c6ea 41 else if (debug) printf("packet sent\r\n");
Nim65s 0:d4dfed20c6ea 42 packet_sent = true;
Nim65s 0:d4dfed20c6ea 43
Nim65s 0:d4dfed20c6ea 44 return (j == size);
Nim65s 0:d4dfed20c6ea 45 }
Nim65s 0:d4dfed20c6ea 46
Nim65s 0:d4dfed20c6ea 47 int niMQTT::recv() {
Nim65s 0:d4dfed20c6ea 48 if (debug) printf("*recv\r\n");
Nim65s 0:d4dfed20c6ea 49
Nim65s 0:d4dfed20c6ea 50 int timeout = 0;
Nim65s 0:d4dfed20c6ea 51 while (!waiting_new_packet && timeout++ != TIMEOUT/100) wait(0.1);
Nim65s 0:d4dfed20c6ea 52 if (timeout >= TIMEOUT/100) {
Nim65s 0:d4dfed20c6ea 53 printf("RECV TIMEOUT\r\n");
Nim65s 0:d4dfed20c6ea 54 if (waiting_connack > 0) printf("CONNACK not received !\r\n");
Nim65s 7:d01d8f0bac58 55 if (waiting_suback > 0) printf("SUBACK not received !\r\n");
Nim65s 0:d4dfed20c6ea 56 if (waiting_pingresp > 0) printf("PINGRESP not received !\r\n");
Nim65s 4:afbc7b066cff 57 reconnect();
Nim65s 0:d4dfed20c6ea 58 return -1;
Nim65s 0:d4dfed20c6ea 59 }
Nim65s 0:d4dfed20c6ea 60
Nim65s 0:d4dfed20c6ea 61 if (debug) printf("Receiving new packet...\r\n");
Nim65s 0:d4dfed20c6ea 62
Nim65s 0:d4dfed20c6ea 63 char header_received;
Nim65s 0:d4dfed20c6ea 64 socket->receive(&header_received, 1);
Nim65s 0:d4dfed20c6ea 65 if (debug) printf("Received 0x%x\r\n", header_received);
Nim65s 0:d4dfed20c6ea 66
Nim65s 0:d4dfed20c6ea 67 waiting_new_packet = false;
Nim65s 5:3d21020a2826 68 //bool DUP = ((header_received & 4) == 4);
Nim65s 5:3d21020a2826 69 //int QoS = (header_received & 6);
Nim65s 5:3d21020a2826 70 //bool RETAIN = ((header_received & 1) == 1);
Nim65s 0:d4dfed20c6ea 71
Nim65s 0:d4dfed20c6ea 72 switch (header_received & 0xf0) {
Nim65s 0:d4dfed20c6ea 73 case CONNACK: connack(); break;
Nim65s 0:d4dfed20c6ea 74 case PUBLISH: publish_received(); break;
Nim65s 0:d4dfed20c6ea 75 case PUBACK: puback_received(); break;
Nim65s 0:d4dfed20c6ea 76 case SUBACK: suback(); break;
Nim65s 0:d4dfed20c6ea 77 case UNSUBACK: suback(true); break;
Nim65s 0:d4dfed20c6ea 78 case PINGRESP: pingresp(); break;
Nim65s 4:afbc7b066cff 79 default: waiting_new_packet = true; reconnect(); printf("BAD HEADER: 0x%x\r\n", header_received); return -1;
Nim65s 0:d4dfed20c6ea 80 }
Nim65s 0:d4dfed20c6ea 81
Nim65s 0:d4dfed20c6ea 82 return 0;
Nim65s 0:d4dfed20c6ea 83 }
Nim65s 0:d4dfed20c6ea 84
Nim65s 0:d4dfed20c6ea 85 int niMQTT::connect() {
Nim65s 0:d4dfed20c6ea 86 if (debug) printf("*connect\r\n");
Nim65s 0:d4dfed20c6ea 87 int username_length = strlen(username);
Nim65s 0:d4dfed20c6ea 88 int password_length = strlen(password);
Nim65s 0:d4dfed20c6ea 89 int id_length = strlen(id);
Nim65s 0:d4dfed20c6ea 90
Nim65s 0:d4dfed20c6ea 91 int use_username = (username_length != 0);
Nim65s 0:d4dfed20c6ea 92 int use_password = (password_length != 0);
Nim65s 0:d4dfed20c6ea 93
Nim65s 0:d4dfed20c6ea 94 char variable_header[] = {0,6,77,81,73,115,100,112,3,
Nim65s 0:d4dfed20c6ea 95 use_username << 7 | use_password << 6,
Nim65s 0:d4dfed20c6ea 96 KEEP_ALIVE / 256, KEEP_ALIVE % 256 };
Nim65s 0:d4dfed20c6ea 97
Nim65s 0:d4dfed20c6ea 98 int remaining_length = 14 + id_length + username_length + password_length + 2*(use_username + use_password);
Nim65s 0:d4dfed20c6ea 99 int packet_length = 2 + remaining_length;
Nim65s 0:d4dfed20c6ea 100
Nim65s 0:d4dfed20c6ea 101 char fixed_header[] = { CONNECT, remaining_length };
Nim65s 0:d4dfed20c6ea 102
Nim65s 0:d4dfed20c6ea 103 char packet[packet_length];
Nim65s 0:d4dfed20c6ea 104 memcpy(packet, fixed_header, 2);
Nim65s 0:d4dfed20c6ea 105 memcpy(packet + 2, variable_header, 12);
Nim65s 0:d4dfed20c6ea 106
Nim65s 0:d4dfed20c6ea 107 // Adds the payload: id
Nim65s 0:d4dfed20c6ea 108 char id_size[2] = { id_length / 256, id_length % 256 };
Nim65s 0:d4dfed20c6ea 109 memcpy(packet + 14, id_size, 2);
Nim65s 0:d4dfed20c6ea 110 memcpy(packet + 16, id, id_length);
Nim65s 0:d4dfed20c6ea 111
Nim65s 0:d4dfed20c6ea 112 // Adds username & Password to the payload
Nim65s 0:d4dfed20c6ea 113 if (use_username) {
Nim65s 0:d4dfed20c6ea 114 char username_size[2] = { username_length / 256, username_length % 256 };
Nim65s 0:d4dfed20c6ea 115 memcpy(packet + 16 + id_length, username_size, 2);
Nim65s 0:d4dfed20c6ea 116 memcpy(packet + 18 + id_length, username, username_length);
Nim65s 0:d4dfed20c6ea 117 }
Nim65s 0:d4dfed20c6ea 118 if (use_password) {
Nim65s 0:d4dfed20c6ea 119 char password_size[2] = { password_length / 256, password_length % 256 };
Nim65s 0:d4dfed20c6ea 120 memcpy(packet + 18 + id_length + username_length, password_size, 2);
Nim65s 0:d4dfed20c6ea 121 memcpy(packet + 20 + id_length + username_length, password, password_length);
Nim65s 0:d4dfed20c6ea 122 }
Nim65s 0:d4dfed20c6ea 123
Nim65s 0:d4dfed20c6ea 124 waiting_connack++;
Nim65s 0:d4dfed20c6ea 125
Nim65s 0:d4dfed20c6ea 126 return send(packet, packet_length);
Nim65s 0:d4dfed20c6ea 127 }
Nim65s 0:d4dfed20c6ea 128
Nim65s 0:d4dfed20c6ea 129 int niMQTT::connack() {
Nim65s 0:d4dfed20c6ea 130 if (debug) printf("CONNACK Received\r\n");
Nim65s 0:d4dfed20c6ea 131 if (waiting_connack > 0) waiting_connack--;
Nim65s 4:afbc7b066cff 132 else {
Nim65s 4:afbc7b066cff 133 printf("CONNACK UNEXPECTED !\r\n");
Nim65s 4:afbc7b066cff 134 reconnect();
Nim65s 4:afbc7b066cff 135 return -2;
Nim65s 4:afbc7b066cff 136 }
Nim65s 0:d4dfed20c6ea 137
Nim65s 0:d4dfed20c6ea 138 char resp[3];
Nim65s 0:d4dfed20c6ea 139 socket->receive(resp, 3);
Nim65s 0:d4dfed20c6ea 140 waiting_new_packet = true;
Nim65s 0:d4dfed20c6ea 141
Nim65s 4:afbc7b066cff 142 if (resp[0] != 0x2) {
Nim65s 4:afbc7b066cff 143 printf("Wrong second byte of CONNACK, get 0x%x instead of 0x2\r\n", resp[1]);
Nim65s 4:afbc7b066cff 144 reconnect();
Nim65s 4:afbc7b066cff 145 return -2;
Nim65s 4:afbc7b066cff 146 }
Nim65s 0:d4dfed20c6ea 147 switch (resp[2]) {
Nim65s 0:d4dfed20c6ea 148 case 0: printf("Connection Accepted\r\n"); break;
Nim65s 0:d4dfed20c6ea 149 case 1: printf("Connection Refused: unacceptable protocol version\r\n"); break;
Nim65s 0:d4dfed20c6ea 150 case 2: printf("Connection Refused: identifier rejected\r\n"); break;
Nim65s 0:d4dfed20c6ea 151 case 3: printf("Connection Refused: server unavailable\r\n"); break;
Nim65s 0:d4dfed20c6ea 152 case 4: printf("Connection Refused: bad user name or password\r\n"); break;
Nim65s 0:d4dfed20c6ea 153 case 5: printf("Connection Refused: not authorized\r\n"); break;
Nim65s 0:d4dfed20c6ea 154 default: printf("I have no idea what I am doing\r\n");
Nim65s 0:d4dfed20c6ea 155 }
Nim65s 0:d4dfed20c6ea 156
Nim65s 0:d4dfed20c6ea 157 return (resp[2] == 0);
Nim65s 0:d4dfed20c6ea 158 }
Nim65s 0:d4dfed20c6ea 159
cosm 12:ae25ab913163 160 int niMQTT::pub(char *topic, char *message, bool retain) {
Nim65s 0:d4dfed20c6ea 161 if (debug) printf("*pub\r\n");
Nim65s 0:d4dfed20c6ea 162 int topic_length = strlen(topic);
Nim65s 0:d4dfed20c6ea 163 int message_length = strlen(message);
Nim65s 0:d4dfed20c6ea 164
Nim65s 0:d4dfed20c6ea 165 int remaining_length = topic_length + message_length + 2;
Nim65s 0:d4dfed20c6ea 166 int remaining_length_2 = remaining_length_length(remaining_length);
Nim65s 0:d4dfed20c6ea 167 int packet_length = 1 + remaining_length + remaining_length_2;
Nim65s 0:d4dfed20c6ea 168
cosm 10:28cfb74b0584 169 //set retained flag
cosm 10:28cfb74b0584 170 uint8_t retained_pub = PUBLISH;
cosm 10:28cfb74b0584 171 if(retain){
cosm 10:28cfb74b0584 172 retained_pub = retained_pub + 1;
cosm 10:28cfb74b0584 173 }
cosm 10:28cfb74b0584 174
cosm 10:28cfb74b0584 175 char header = retained_pub;
Nim65s 0:d4dfed20c6ea 176 char packet[packet_length];
Nim65s 0:d4dfed20c6ea 177 // header
Nim65s 0:d4dfed20c6ea 178 memcpy(packet, &header, 1);
Nim65s 0:d4dfed20c6ea 179 get_remaining_length(remaining_length, packet);
Nim65s 0:d4dfed20c6ea 180
Nim65s 0:d4dfed20c6ea 181 // variable header: topic name
Nim65s 0:d4dfed20c6ea 182 char topic_size[2] = { topic_length / 256, topic_length % 256 };
Nim65s 0:d4dfed20c6ea 183 memcpy(packet + 1 + remaining_length_2, topic_size, 2);
Nim65s 0:d4dfed20c6ea 184 memcpy(packet + 3 + remaining_length_2, topic, topic_length);
Nim65s 0:d4dfed20c6ea 185
Nim65s 0:d4dfed20c6ea 186 // payload: message
Nim65s 0:d4dfed20c6ea 187 memcpy(packet + 3 + remaining_length_2 + topic_length, message, message_length);
Nim65s 0:d4dfed20c6ea 188
Nim65s 0:d4dfed20c6ea 189 return send(packet, packet_length);
Nim65s 0:d4dfed20c6ea 190 }
Nim65s 0:d4dfed20c6ea 191
Nim65s 0:d4dfed20c6ea 192 void niMQTT::publish_received() {
Nim65s 0:d4dfed20c6ea 193 //remaining length
Nim65s 0:d4dfed20c6ea 194 int remaining_length = decode_remaining_length();
Nim65s 0:d4dfed20c6ea 195
Nim65s 0:d4dfed20c6ea 196 // topic
Nim65s 0:d4dfed20c6ea 197 char mqtt_utf8_length[2];
Nim65s 0:d4dfed20c6ea 198 socket->receive(mqtt_utf8_length, 2);
Nim65s 0:d4dfed20c6ea 199 int utf8_length = mqtt_utf8_length[0] * 256 + mqtt_utf8_length[1];
Nim65s 0:d4dfed20c6ea 200
Nim65s 0:d4dfed20c6ea 201 if (debug) printf("PUBLISH Received: %i, %i\r\n", remaining_length, utf8_length);
Nim65s 0:d4dfed20c6ea 202
Nim65s 0:d4dfed20c6ea 203 char topic[utf8_length + 1];
Nim65s 0:d4dfed20c6ea 204 socket->receive(topic, utf8_length);
Nim65s 0:d4dfed20c6ea 205 topic[utf8_length] = 0;
Nim65s 0:d4dfed20c6ea 206
Nim65s 0:d4dfed20c6ea 207 // payload
Nim65s 0:d4dfed20c6ea 208 int message_length = remaining_length - utf8_length - 2;
Nim65s 0:d4dfed20c6ea 209 char message[message_length + 1];
Nim65s 0:d4dfed20c6ea 210 socket->receive(message, message_length);
Nim65s 0:d4dfed20c6ea 211 message[message_length] = 0;
Nim65s 0:d4dfed20c6ea 212
Nim65s 0:d4dfed20c6ea 213 waiting_new_packet = true;
Nim65s 0:d4dfed20c6ea 214
Nim65s 8:438958bb9df3 215 call_callback(topic, message);
Nim65s 0:d4dfed20c6ea 216 }
Nim65s 0:d4dfed20c6ea 217
Nim65s 0:d4dfed20c6ea 218 int niMQTT::puback() {
Nim65s 0:d4dfed20c6ea 219 if (debug) printf("*puback\r\n");
Nim65s 0:d4dfed20c6ea 220 char fixed_header[] = { PUBACK, 2, message_id / 256, message_id % 256 };
Nim65s 0:d4dfed20c6ea 221 return send(fixed_header, 4);
Nim65s 0:d4dfed20c6ea 222 }
Nim65s 0:d4dfed20c6ea 223
Nim65s 0:d4dfed20c6ea 224 int niMQTT::puback_received() {
Nim65s 0:d4dfed20c6ea 225 waiting_new_packet = true;
Nim65s 0:d4dfed20c6ea 226 return 0; // TODO
Nim65s 0:d4dfed20c6ea 227 }
Nim65s 0:d4dfed20c6ea 228
Nim65s 0:d4dfed20c6ea 229 int niMQTT::sub(char *topic, bool unsub) {
Nim65s 0:d4dfed20c6ea 230 if (debug) printf("*sub\r\n");
Nim65s 0:d4dfed20c6ea 231 char command = (unsub) ? UNSUBSCRIBE : SUBSCRIBE;
Nim65s 0:d4dfed20c6ea 232 int topic_length = strlen(topic);
Nim65s 0:d4dfed20c6ea 233
Nim65s 0:d4dfed20c6ea 234 int remaining_length = topic_length + 5;
Nim65s 0:d4dfed20c6ea 235 int remaining_length_2 = remaining_length_length(remaining_length);
Nim65s 0:d4dfed20c6ea 236 int packet_length = 1 + remaining_length + remaining_length_2;
Nim65s 0:d4dfed20c6ea 237
Nim65s 0:d4dfed20c6ea 238 char header = command | LEAST_ONCE;
Nim65s 0:d4dfed20c6ea 239 char packet[packet_length];
Nim65s 0:d4dfed20c6ea 240 // header
Nim65s 0:d4dfed20c6ea 241 memcpy(packet, &header, 1);
Nim65s 0:d4dfed20c6ea 242 get_remaining_length(remaining_length, packet);
Nim65s 0:d4dfed20c6ea 243
Nim65s 0:d4dfed20c6ea 244 // variable header: message identifier
Nim65s 0:d4dfed20c6ea 245 message_id++;
Nim65s 0:d4dfed20c6ea 246 char variable_header[] = { message_id / 256, message_id % 256 };
Nim65s 0:d4dfed20c6ea 247 memcpy(packet + 1 + remaining_length_2, variable_header, 2);
Nim65s 0:d4dfed20c6ea 248
Nim65s 0:d4dfed20c6ea 249 // payload: topic name & requested QoS
Nim65s 0:d4dfed20c6ea 250 char topic_size[2] = { topic_length / 256, topic_length % 256 };
Nim65s 0:d4dfed20c6ea 251 char requested_qos = MOST_ONCE;
Nim65s 0:d4dfed20c6ea 252 memcpy(packet + 3 + remaining_length_2, topic_size, 2);
Nim65s 0:d4dfed20c6ea 253 memcpy(packet + 5 + remaining_length_2, topic, topic_length);
Nim65s 0:d4dfed20c6ea 254 memcpy(packet + 5 + remaining_length_2 + topic_length, &requested_qos, 1);
Nim65s 0:d4dfed20c6ea 255
Nim65s 0:d4dfed20c6ea 256 waiting_suback++;
Nim65s 0:d4dfed20c6ea 257
Nim65s 0:d4dfed20c6ea 258 return send(packet, packet_length);
Nim65s 0:d4dfed20c6ea 259 }
Nim65s 0:d4dfed20c6ea 260
Nim65s 0:d4dfed20c6ea 261 int niMQTT::suback(bool unsub) {
Nim65s 0:d4dfed20c6ea 262 if (debug) printf("SUBACK received\r\n");
Nim65s 0:d4dfed20c6ea 263 if (waiting_suback > 0) waiting_suback--;
Nim65s 4:afbc7b066cff 264 else {
Nim65s 4:afbc7b066cff 265 printf("SUBACK UNEXPECTED !\r\n");
Nim65s 4:afbc7b066cff 266 reconnect();
Nim65s 4:afbc7b066cff 267 return -2;
Nim65s 4:afbc7b066cff 268 }
Nim65s 0:d4dfed20c6ea 269
Nim65s 5:3d21020a2826 270 //char command = (unsub) ? UNSUBACK : SUBACK;
Nim65s 0:d4dfed20c6ea 271
Nim65s 0:d4dfed20c6ea 272 int remaining_length = decode_remaining_length();
Nim65s 0:d4dfed20c6ea 273
Nim65s 0:d4dfed20c6ea 274 // Variable Header
Nim65s 0:d4dfed20c6ea 275 char var_resp[remaining_length];
Nim65s 0:d4dfed20c6ea 276 socket->receive(var_resp, remaining_length);
Nim65s 0:d4dfed20c6ea 277 waiting_new_packet = true;
Nim65s 0:d4dfed20c6ea 278 if (debug) {
Nim65s 0:d4dfed20c6ea 279 printf("suback: ");
Nim65s 0:d4dfed20c6ea 280 for (int j=0; j<remaining_length; j++) printf("0x%x ", var_resp[j]);
Nim65s 0:d4dfed20c6ea 281 printf("\r\n");
Nim65s 0:d4dfed20c6ea 282 }
Nim65s 0:d4dfed20c6ea 283
Nim65s 0:d4dfed20c6ea 284 if (var_resp[0] * 256 + var_resp[1] != message_id) {
Nim65s 0:d4dfed20c6ea 285 printf("wrong message identifer in (UN)SUBACK, get %i instead of %i...\r\n", var_resp[0] * 256 + var_resp[1], message_id);
Nim65s 0:d4dfed20c6ea 286 }
Nim65s 0:d4dfed20c6ea 287
Nim65s 0:d4dfed20c6ea 288 // here we should do things about the QoS if /unsuback, but let's say it's 0.
Nim65s 0:d4dfed20c6ea 289
Nim65s 0:d4dfed20c6ea 290 return (var_resp[0] * 256 + var_resp[1] == message_id);
Nim65s 0:d4dfed20c6ea 291 }
Nim65s 0:d4dfed20c6ea 292
Nim65s 0:d4dfed20c6ea 293 int niMQTT::pingreq () {
Nim65s 0:d4dfed20c6ea 294 if (debug) printf("*pingreq\r\n");
Nim65s 0:d4dfed20c6ea 295 char fixed_header[] = { PINGREQ, 0 };
Nim65s 0:d4dfed20c6ea 296 waiting_pingresp++;
Nim65s 0:d4dfed20c6ea 297 return send(fixed_header, 2);
Nim65s 0:d4dfed20c6ea 298 }
Nim65s 0:d4dfed20c6ea 299
Nim65s 0:d4dfed20c6ea 300 int niMQTT::pingresp() {
Nim65s 0:d4dfed20c6ea 301 if (debug) printf("PINGRESP Received\r\n");
Nim65s 0:d4dfed20c6ea 302 if (waiting_pingresp > 0) waiting_pingresp--;
Nim65s 4:afbc7b066cff 303 else {
Nim65s 4:afbc7b066cff 304 printf("PINGRESP Unexpected !\r\n");
Nim65s 4:afbc7b066cff 305 reconnect();
Nim65s 4:afbc7b066cff 306 return -2;
Nim65s 4:afbc7b066cff 307 }
Nim65s 0:d4dfed20c6ea 308
Nim65s 0:d4dfed20c6ea 309 char resp;
Nim65s 0:d4dfed20c6ea 310 socket->receive(&resp, 1);
Nim65s 0:d4dfed20c6ea 311 waiting_new_packet = true;
Nim65s 0:d4dfed20c6ea 312
Nim65s 4:afbc7b066cff 313 if (resp != 0) {
Nim65s 4:afbc7b066cff 314 printf("Wrong second byte of PINGRESP, get 0x%x instead of 0x0\r\n", resp);
Nim65s 4:afbc7b066cff 315 reconnect();
Nim65s 4:afbc7b066cff 316 return -2;
Nim65s 4:afbc7b066cff 317 }
Nim65s 0:d4dfed20c6ea 318
Nim65s 0:d4dfed20c6ea 319 return (resp == 0);
Nim65s 0:d4dfed20c6ea 320 }
Nim65s 0:d4dfed20c6ea 321
Nim65s 0:d4dfed20c6ea 322 int niMQTT::disconnect() {
Nim65s 0:d4dfed20c6ea 323 if (debug) printf("*disconnect\r\n");
Nim65s 0:d4dfed20c6ea 324 char fixed_header[] = { DISCONNECT, 0 };
Nim65s 0:d4dfed20c6ea 325 return send(fixed_header, 2);
Nim65s 0:d4dfed20c6ea 326 }
Nim65s 0:d4dfed20c6ea 327
Nim65s 0:d4dfed20c6ea 328 niMQTT::~niMQTT() {
Nim65s 0:d4dfed20c6ea 329 if (debug) printf("*~niMQTT()\r\n");
Nim65s 0:d4dfed20c6ea 330 connected = false;
Nim65s 0:d4dfed20c6ea 331 disconnect();
Nim65s 0:d4dfed20c6ea 332 socket->close();
Nim65s 0:d4dfed20c6ea 333 delete socket;
Nim65s 0:d4dfed20c6ea 334 }
Nim65s 0:d4dfed20c6ea 335
Nim65s 4:afbc7b066cff 336 void niMQTT::reconnect() {
Nim65s 4:afbc7b066cff 337 if (debug) printf("Reconnecting...\r\n");
Nim65s 4:afbc7b066cff 338 disconnect();
Nim65s 4:afbc7b066cff 339 socket->close();
Nim65s 4:afbc7b066cff 340
Nim65s 4:afbc7b066cff 341 do printf("Socket connection...\r\n"); while (socket->connect(server, port) < 0);
Nim65s 4:afbc7b066cff 342 socket->set_blocking(true, KEEP_ALIVE*500); // KEEP_ALIVE / 2 in seconds
Nim65s 4:afbc7b066cff 343
Nim65s 4:afbc7b066cff 344 connect();
Nim65s 4:afbc7b066cff 345 }
Nim65s 4:afbc7b066cff 346
Palantir 9:3be69efa4402 347 void niMQTT::ping_thread_starter(void const *p) {
Nim65s 0:d4dfed20c6ea 348 niMQTT *instance = (niMQTT*)p;
Palantir 9:3be69efa4402 349 instance->ping_thread_worker();
Palantir 9:3be69efa4402 350 }
Palantir 9:3be69efa4402 351
Palantir 9:3be69efa4402 352 void niMQTT::recv_thread_starter(void const *p) {
Palantir 9:3be69efa4402 353 niMQTT *instance = (niMQTT*)p;
Palantir 9:3be69efa4402 354 instance->recv_thread_worker();
Nim65s 0:d4dfed20c6ea 355 }
Nim65s 0:d4dfed20c6ea 356
Palantir 9:3be69efa4402 357 void niMQTT::ping_thread_worker() {
Palantir 9:3be69efa4402 358 if (debug) printf("*ping_thread_worker\r\n");
Palantir 9:3be69efa4402 359 ping_thread.signal_wait(START_THREAD);
Nim65s 0:d4dfed20c6ea 360 while (connected) {
Palantir 9:3be69efa4402 361 if (debug) printf("New loop in ping thread worker\r\n");
Nim65s 0:d4dfed20c6ea 362 Thread::wait(KEEP_ALIVE*100); // KEEP_ALIVE / 10 in seconds
Nim65s 0:d4dfed20c6ea 363 if (!packet_sent) pingreq();
Nim65s 0:d4dfed20c6ea 364 packet_sent = false;
Nim65s 0:d4dfed20c6ea 365 }
Nim65s 0:d4dfed20c6ea 366 }
Nim65s 0:d4dfed20c6ea 367
Palantir 9:3be69efa4402 368 void niMQTT::recv_thread_worker() {
Palantir 9:3be69efa4402 369 if (debug) printf("*recv_thread_worker\r\n");
Palantir 9:3be69efa4402 370 recv_thread.signal_wait(START_THREAD);
Palantir 9:3be69efa4402 371 while (connected) {
Palantir 9:3be69efa4402 372 if (debug) printf("New loop in recv thread worker\r\n");
Palantir 9:3be69efa4402 373 recv();
Palantir 9:3be69efa4402 374 packet_sent = false;
Palantir 9:3be69efa4402 375 }
Palantir 9:3be69efa4402 376 }
Palantir 9:3be69efa4402 377
Nim65s 0:d4dfed20c6ea 378 void niMQTT::get_remaining_length(int remaining_length, char *packet) {
Nim65s 0:d4dfed20c6ea 379 int X = remaining_length;
Nim65s 0:d4dfed20c6ea 380 int n = 1;
Nim65s 0:d4dfed20c6ea 381 char digit;
Nim65s 0:d4dfed20c6ea 382 do {
Nim65s 0:d4dfed20c6ea 383 digit = X % 0x80;
Nim65s 0:d4dfed20c6ea 384 X /= 0x80;
Nim65s 0:d4dfed20c6ea 385 if (X > 0) digit |= 0x80;
Nim65s 0:d4dfed20c6ea 386 memcpy(packet + n, &digit, 1);
Nim65s 0:d4dfed20c6ea 387 n++;
Nim65s 0:d4dfed20c6ea 388 } while (X > 0);
Nim65s 0:d4dfed20c6ea 389 }
Nim65s 0:d4dfed20c6ea 390
Nim65s 0:d4dfed20c6ea 391 int niMQTT::decode_remaining_length() {
Nim65s 0:d4dfed20c6ea 392 int multiplier = 1;
Nim65s 0:d4dfed20c6ea 393 int value = 0;
Nim65s 0:d4dfed20c6ea 394 char digit = 0;
Nim65s 0:d4dfed20c6ea 395 do {
Nim65s 0:d4dfed20c6ea 396 while (socket->receive(&digit, 1) < 0) wait(0.1);
Nim65s 0:d4dfed20c6ea 397 value += (digit & 127) * multiplier;
Nim65s 0:d4dfed20c6ea 398 multiplier *= 128;
Nim65s 0:d4dfed20c6ea 399 } while ((digit & 0x80) != 0);
Nim65s 0:d4dfed20c6ea 400 return value;
Nim65s 0:d4dfed20c6ea 401 }
Nim65s 0:d4dfed20c6ea 402
Nim65s 0:d4dfed20c6ea 403 int niMQTT::remaining_length_length(int remaining_length) {
Nim65s 0:d4dfed20c6ea 404 int X = remaining_length;
Nim65s 0:d4dfed20c6ea 405 int rll = 0;
Nim65s 0:d4dfed20c6ea 406 do {
Nim65s 0:d4dfed20c6ea 407 rll++;
Nim65s 0:d4dfed20c6ea 408 X /= 0x80;
Nim65s 0:d4dfed20c6ea 409 } while (X > 0);
Nim65s 0:d4dfed20c6ea 410 return rll;
Nim65s 0:d4dfed20c6ea 411 }
Nim65s 8:438958bb9df3 412
Nim65s 8:438958bb9df3 413 void niMQTT::call_callback(const char *topic, const char *message) {
Nim65s 8:438958bb9df3 414 callback(topic, message);
Nim65s 8:438958bb9df3 415 }