result of success : 1 to 0, max msg size: 127B to 16383B, fixed existing bugs

Fork of MQTTClient by wakwak koba

Committer:
wakwak_koba
Date:
Mon Jan 04 07:50:26 2016 +0000
Revision:
11:2a8c86bfdeca
Parent:
10:6fa55ad359f0
fixed comment

Who changed what in which revision?

UserRevisionLine numberNew contents of line
SomeRandomBloke 4:9e25b78e0352 1 /*
SomeRandomBloke 0:260fb10c0755 2 MQTTClient.cpp
SomeRandomBloke 0:260fb10c0755 3 Based on MQTTClient from http://ceit.uq.edu.au/content/mqttclient-mbed-version-20
SomeRandomBloke 0:260fb10c0755 4 A simple MQTT client for mbed, version 2.0
SomeRandomBloke 0:260fb10c0755 5 By Yilun FAN, @CEIT, @JAN 2011
SomeRandomBloke 0:260fb10c0755 6
SomeRandomBloke 0:260fb10c0755 7 Bug fixes and additions by Andrew Lindsay (andrew [at] thiseldo [dot] co [dot] uk)
SomeRandomBloke 0:260fb10c0755 8
SomeRandomBloke 0:260fb10c0755 9 Permission is hereby granted, free of charge, to any person obtaining a copy
SomeRandomBloke 0:260fb10c0755 10 of this software and associated documentation files (the "Software"), to deal
SomeRandomBloke 0:260fb10c0755 11 in the Software without restriction, including without limitation the rights
SomeRandomBloke 0:260fb10c0755 12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
SomeRandomBloke 0:260fb10c0755 13 copies of the Software, and to permit persons to whom the Software is
SomeRandomBloke 0:260fb10c0755 14 furnished to do so, subject to the following conditions:
SomeRandomBloke 0:260fb10c0755 15
SomeRandomBloke 0:260fb10c0755 16 The above copyright notice and this permission notice shall be included in
SomeRandomBloke 0:260fb10c0755 17 all copies or substantial portions of the Software.
SomeRandomBloke 0:260fb10c0755 18
SomeRandomBloke 0:260fb10c0755 19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
SomeRandomBloke 0:260fb10c0755 20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
SomeRandomBloke 0:260fb10c0755 21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
SomeRandomBloke 0:260fb10c0755 22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
SomeRandomBloke 0:260fb10c0755 23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
SomeRandomBloke 0:260fb10c0755 24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
SomeRandomBloke 0:260fb10c0755 25 THE SOFTWARE.
SomeRandomBloke 0:260fb10c0755 26 */
SomeRandomBloke 0:260fb10c0755 27
SomeRandomBloke 0:260fb10c0755 28 #include "MQTTClient.h"
SomeRandomBloke 0:260fb10c0755 29
SomeRandomBloke 1:a50b6c866a3b 30 /** Default Constructor
SomeRandomBloke 1:a50b6c866a3b 31 */
SomeRandomBloke 1:a50b6c866a3b 32 MQTTClient::MQTTClient() {}
SomeRandomBloke 1:a50b6c866a3b 33
SomeRandomBloke 1:a50b6c866a3b 34 /** Default Destructor
SomeRandomBloke 1:a50b6c866a3b 35 */
SomeRandomBloke 1:a50b6c866a3b 36 MQTTClient::~MQTTClient() {}
SomeRandomBloke 1:a50b6c866a3b 37
SomeRandomBloke 3:ac326c5b065c 38 /** Alternative Constructor with parameters
SomeRandomBloke 9:a0e39cea763a 39 *
SomeRandomBloke 9:a0e39cea763a 40 * Allow object to be constructed with minimum parameters.
SomeRandomBloke 1:a50b6c866a3b 41 *
SomeRandomBloke 1:a50b6c866a3b 42 * @param server The IP address of the server to connect to
SomeRandomBloke 1:a50b6c866a3b 43 * @param port The TCP/IP port on the server to connect to
SomeRandomBloke 1:a50b6c866a3b 44 * @param callback Callback function to handle subscription to topics
SomeRandomBloke 1:a50b6c866a3b 45 */
SomeRandomBloke 0:260fb10c0755 46 MQTTClient::MQTTClient(IpAddr server, int port, void (*callback)(char*, char*)) {
SomeRandomBloke 0:260fb10c0755 47 this->port = port;
SomeRandomBloke 0:260fb10c0755 48 callback_server = callback;
wakwak_koba 10:6fa55ad359f0 49
wakwak_koba 10:6fa55ad359f0 50 host.setPort(port);
wakwak_koba 10:6fa55ad359f0 51 host.setIp(server);
wakwak_koba 10:6fa55ad359f0 52 host.setName("localhost");
wakwak_koba 10:6fa55ad359f0 53
SomeRandomBloke 9:a0e39cea763a 54 this->userName = NULL;
SomeRandomBloke 9:a0e39cea763a 55 this->password = NULL;
SomeRandomBloke 0:260fb10c0755 56 connected = false;
SomeRandomBloke 0:260fb10c0755 57 sessionOpened = false;
wakwak_koba 10:6fa55ad359f0 58
wakwak_koba 10:6fa55ad359f0 59 pTCPSocket = NULL;
SomeRandomBloke 0:260fb10c0755 60 timer.start();
SomeRandomBloke 0:260fb10c0755 61 }
SomeRandomBloke 0:260fb10c0755 62
wakwak_koba 11:2a8c86bfdeca 63 /** Alternative Constructor with parameters
wakwak_koba 11:2a8c86bfdeca 64 *
wakwak_koba 11:2a8c86bfdeca 65 * Allow object to be constructed with minimum parameters.
wakwak_koba 11:2a8c86bfdeca 66 *
wakwak_koba 11:2a8c86bfdeca 67 * @param server The IP address of the server to connect to
wakwak_koba 11:2a8c86bfdeca 68 * @param port The TCP/IP port on the server to connect to
wakwak_koba 11:2a8c86bfdeca 69 * @param id The client name shown on MQTT server.
wakwak_koba 11:2a8c86bfdeca 70 * @param userName Pointer to username string, zero terminated. Null for not used
wakwak_koba 11:2a8c86bfdeca 71 * @param password Pointer to password string, zero terminated. Null for not used
wakwak_koba 11:2a8c86bfdeca 72 * @param callback Callback function to handle subscription to topics
wakwak_koba 11:2a8c86bfdeca 73 */
wakwak_koba 10:6fa55ad359f0 74 MQTTClient::MQTTClient(IpAddr server, int port, char *id, char *userName, char *password, void (*callback)(char*, char*)) {
SomeRandomBloke 0:260fb10c0755 75 this->port = port;
SomeRandomBloke 0:260fb10c0755 76 callback_server = callback;
wakwak_koba 10:6fa55ad359f0 77
wakwak_koba 10:6fa55ad359f0 78 host.setPort(port);
wakwak_koba 10:6fa55ad359f0 79 host.setIp(server);
wakwak_koba 10:6fa55ad359f0 80 host.setName("localhost");
wakwak_koba 10:6fa55ad359f0 81
wakwak_koba 10:6fa55ad359f0 82 this->userName = userName;
wakwak_koba 10:6fa55ad359f0 83 this->password = password;
wakwak_koba 10:6fa55ad359f0 84 clientId = id;
SomeRandomBloke 0:260fb10c0755 85 connected = false;
SomeRandomBloke 0:260fb10c0755 86 sessionOpened = false;
wakwak_koba 10:6fa55ad359f0 87
wakwak_koba 10:6fa55ad359f0 88 pTCPSocket = NULL;
SomeRandomBloke 0:260fb10c0755 89 timer.start();
SomeRandomBloke 0:260fb10c0755 90 }
SomeRandomBloke 0:260fb10c0755 91
SomeRandomBloke 1:a50b6c866a3b 92 /** Send a message of specified size
SomeRandomBloke 9:a0e39cea763a 93 *
SomeRandomBloke 1:a50b6c866a3b 94 * @param msg message string
SomeRandomBloke 1:a50b6c866a3b 95 * @param size Size of the message to send
wakwak_koba 10:6fa55ad359f0 96 * @returns value to indicate message sent successfully or not, -1 for error, 0 for success.
SomeRandomBloke 1:a50b6c866a3b 97 */
SomeRandomBloke 0:260fb10c0755 98 int MQTTClient::send_data(const char* msg, int size) {
wakwak_koba 10:6fa55ad359f0 99 writtable = false;
wakwak_koba 10:6fa55ad359f0 100
wakwak_koba 10:6fa55ad359f0 101 /* printf("send_data length %d\n", size);
wakwak_koba 10:6fa55ad359f0 102 for (int i = 0; i < size; i++) {
wakwak_koba 10:6fa55ad359f0 103 printf("%x, ", msg[i]);
wakwak_koba 10:6fa55ad359f0 104 }
wakwak_koba 10:6fa55ad359f0 105 printf("\r\n"); */
wakwak_koba 10:6fa55ad359f0 106
SomeRandomBloke 0:260fb10c0755 107 int transLen = pTCPSocket->send(msg, size);
wakwak_koba 10:6fa55ad359f0 108 pool();
wakwak_koba 10:6fa55ad359f0 109
SomeRandomBloke 0:260fb10c0755 110 /*Check send length matches the message length*/
SomeRandomBloke 0:260fb10c0755 111 if (transLen != size) {
SomeRandomBloke 0:260fb10c0755 112 for (int i = 0; i < size; i++) {
SomeRandomBloke 0:260fb10c0755 113 printf("%x, ", msg[i]);
SomeRandomBloke 0:260fb10c0755 114 }
SomeRandomBloke 0:260fb10c0755 115 printf("Error on send.\r\n");
SomeRandomBloke 0:260fb10c0755 116 return -1;
SomeRandomBloke 0:260fb10c0755 117 }
wakwak_koba 10:6fa55ad359f0 118
wakwak_koba 10:6fa55ad359f0 119 return 0;
SomeRandomBloke 0:260fb10c0755 120 }
SomeRandomBloke 0:260fb10c0755 121
SomeRandomBloke 1:a50b6c866a3b 122 /** Start a MQTT session, build CONNECT packet
SomeRandomBloke 9:a0e39cea763a 123 *
SomeRandomBloke 1:a50b6c866a3b 124 * @param id The client name shown on MQTT server.
wakwak_koba 10:6fa55ad359f0 125 * @returns -1 for error, 0 for success
SomeRandomBloke 1:a50b6c866a3b 126 */
SomeRandomBloke 0:260fb10c0755 127 int MQTTClient::open_session(char* id) {
SomeRandomBloke 0:260fb10c0755 128 /*variable header*/
wakwak_koba 10:6fa55ad359f0 129 char var_header[] = {0x00,0x06,0x4d,0x51,0x49,0x73,0x64,0x70,0x03,0x02,0x00,keepalive/500,0x00,strlen(id)};
SomeRandomBloke 0:260fb10c0755 130
SomeRandomBloke 0:260fb10c0755 131 /*fixed header: 2 bytes, big endian*/
SomeRandomBloke 0:260fb10c0755 132 char fixed_header[] = {MQTTCONNECT,12+strlen(id)+2};
SomeRandomBloke 0:260fb10c0755 133 short usernameLen = strlen( userName );
SomeRandomBloke 0:260fb10c0755 134 short passwordLen = strlen( password );
SomeRandomBloke 0:260fb10c0755 135 var_header[9] |= (usernameLen > 0 ? 0x80 : 0x00 );
SomeRandomBloke 0:260fb10c0755 136 var_header[9] |= (passwordLen > 0 ? 0x40 : 0x00 );
SomeRandomBloke 0:260fb10c0755 137
wakwak_koba 10:6fa55ad359f0 138 printf("fixed %d, var %d id %d, username %d, password %d\n",sizeof(fixed_header), sizeof(var_header), strlen(id), usernameLen, passwordLen );
wakwak_koba 10:6fa55ad359f0 139 char packet[sizeof(fixed_header) + sizeof(var_header) + strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) + passwordLen + (passwordLen > 0 ? 2 : 0) ];
SomeRandomBloke 0:260fb10c0755 140
SomeRandomBloke 0:260fb10c0755 141 memset(packet,0,sizeof(packet));
SomeRandomBloke 0:260fb10c0755 142 memcpy(packet,fixed_header,sizeof(fixed_header));
SomeRandomBloke 0:260fb10c0755 143 memcpy(packet+sizeof(fixed_header),var_header,sizeof(var_header));
SomeRandomBloke 0:260fb10c0755 144 memcpy(packet+sizeof(fixed_header)+sizeof(var_header), id, strlen(id));
SomeRandomBloke 0:260fb10c0755 145 if ( usernameLen > 0 ) {
SomeRandomBloke 0:260fb10c0755 146 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)] = 0x00;
SomeRandomBloke 0:260fb10c0755 147 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id)+1] = usernameLen;
SomeRandomBloke 0:260fb10c0755 148 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+2, userName, usernameLen);
SomeRandomBloke 0:260fb10c0755 149 packet[1] += (usernameLen + 2);
SomeRandomBloke 0:260fb10c0755 150 }
SomeRandomBloke 0:260fb10c0755 151 if ( passwordLen > 0 ) {
SomeRandomBloke 0:260fb10c0755 152 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0) ] = 0x00;
SomeRandomBloke 0:260fb10c0755 153 packet[sizeof(fixed_header)+sizeof(var_header)+strlen(id) + usernameLen + (usernameLen > 0 ? 2 : 0)+1] = passwordLen;
SomeRandomBloke 0:260fb10c0755 154 memcpy(packet+sizeof(fixed_header)+sizeof(var_header)+strlen(id)+ usernameLen + (usernameLen > 0 ? 2 : 0)+2, password, passwordLen);
SomeRandomBloke 0:260fb10c0755 155 packet[1] += (passwordLen + 2);
SomeRandomBloke 0:260fb10c0755 156 }
SomeRandomBloke 0:260fb10c0755 157
wakwak_koba 10:6fa55ad359f0 158 /* printf("Packet length %d\n", sizeof(packet));
SomeRandomBloke 0:260fb10c0755 159 for (int i = 0; i < sizeof(packet); i++) {
SomeRandomBloke 0:260fb10c0755 160 printf("%x, ", packet[i]);
SomeRandomBloke 0:260fb10c0755 161 }
wakwak_koba 10:6fa55ad359f0 162 printf("\r\n");*/
wakwak_koba 10:6fa55ad359f0 163
wakwak_koba 10:6fa55ad359f0 164 if (send_data(packet, sizeof(packet))) {
SomeRandomBloke 0:260fb10c0755 165 return -1;
SomeRandomBloke 0:260fb10c0755 166 }
wakwak_koba 10:6fa55ad359f0 167 printf("Sent\n");
wakwak_koba 10:6fa55ad359f0 168 return 0;
SomeRandomBloke 0:260fb10c0755 169 }
SomeRandomBloke 0:260fb10c0755 170
SomeRandomBloke 1:a50b6c866a3b 171 /** Open TCP port, connect to server on given IP address.
SomeRandomBloke 9:a0e39cea763a 172 *
wakwak_koba 10:6fa55ad359f0 173 * @returns -1: If connect to server failed. -2: Failed to open session on server. 0: Connection accessed.
wakwak_koba 10:6fa55ad359f0 174 */
wakwak_koba 10:6fa55ad359f0 175 int MQTTClient::connect() {
wakwak_koba 10:6fa55ad359f0 176 return connect(clientId);
wakwak_koba 10:6fa55ad359f0 177 }
wakwak_koba 10:6fa55ad359f0 178
wakwak_koba 10:6fa55ad359f0 179 /** Open TCP port, connect to server on given IP address.
wakwak_koba 10:6fa55ad359f0 180 *
SomeRandomBloke 1:a50b6c866a3b 181 * @param id The client name shown on MQTT server.
wakwak_koba 10:6fa55ad359f0 182 * @returns -1: If connect to server failed. -2: Failed to open session on server. 0: Connection accessed.
SomeRandomBloke 1:a50b6c866a3b 183 */
SomeRandomBloke 0:260fb10c0755 184 int MQTTClient::connect(char* id) {
SomeRandomBloke 0:260fb10c0755 185 /*Initial TCP socket*/
wakwak_koba 10:6fa55ad359f0 186 if(!pTCPSocket) {
wakwak_koba 10:6fa55ad359f0 187 pTCPSocket = new TCPSocket;
wakwak_koba 10:6fa55ad359f0 188 pTCPSocket->setOnEvent(this, &MQTTClient::onTCPSocketEvent);
wakwak_koba 10:6fa55ad359f0 189 }
SomeRandomBloke 0:260fb10c0755 190
SomeRandomBloke 0:260fb10c0755 191 /*Trying to connect to host*/
SomeRandomBloke 0:260fb10c0755 192 printf("Trying to connect to host..\r\n\r\n");
SomeRandomBloke 0:260fb10c0755 193 TCPSocketErr err = pTCPSocket->connect(host);
SomeRandomBloke 0:260fb10c0755 194
SomeRandomBloke 0:260fb10c0755 195 Net::poll();
SomeRandomBloke 0:260fb10c0755 196 if (err) {
SomeRandomBloke 0:260fb10c0755 197 printf("Error connecting to host [%d]\r\n", (int) err);
SomeRandomBloke 0:260fb10c0755 198 return -1;
SomeRandomBloke 0:260fb10c0755 199 }
SomeRandomBloke 9:a0e39cea763a 200 printf("Connect to host..\r\n\r\n");
SomeRandomBloke 0:260fb10c0755 201
SomeRandomBloke 0:260fb10c0755 202 /*Wait TCP connection with server to be established*/
SomeRandomBloke 0:260fb10c0755 203 int i = 0;
SomeRandomBloke 0:260fb10c0755 204 while (!connected) {
SomeRandomBloke 0:260fb10c0755 205 Net::poll();
SomeRandomBloke 0:260fb10c0755 206 wait(1);
SomeRandomBloke 0:260fb10c0755 207 i++;
SomeRandomBloke 0:260fb10c0755 208 printf("Wait for connections %d..\r\n", i);
SomeRandomBloke 0:260fb10c0755 209 if (i == 35) {//If wait too long, give up.
SomeRandomBloke 0:260fb10c0755 210 return -1;
SomeRandomBloke 0:260fb10c0755 211 }
SomeRandomBloke 0:260fb10c0755 212 }
SomeRandomBloke 0:260fb10c0755 213
SomeRandomBloke 0:260fb10c0755 214 /*Send open session message to server*/
wakwak_koba 10:6fa55ad359f0 215 if(open_session(id)) {
wakwak_koba 10:6fa55ad359f0 216 printf("Error open session [%s]\r\n", id);
wakwak_koba 10:6fa55ad359f0 217 return -1;
wakwak_koba 10:6fa55ad359f0 218 }
SomeRandomBloke 0:260fb10c0755 219
SomeRandomBloke 0:260fb10c0755 220 /*Wait server notice of open sesion*/
SomeRandomBloke 7:c12d2bfe40e2 221 i = 0;
SomeRandomBloke 0:260fb10c0755 222 while (!sessionOpened) {
SomeRandomBloke 0:260fb10c0755 223 Net::poll();
SomeRandomBloke 0:260fb10c0755 224 wait(1);
SomeRandomBloke 9:a0e39cea763a 225 if (!connected || ++i >40) {
SomeRandomBloke 0:260fb10c0755 226 break;
SomeRandomBloke 0:260fb10c0755 227 }
SomeRandomBloke 9:a0e39cea763a 228 printf("Wait for session %d..\r\n", i);
SomeRandomBloke 0:260fb10c0755 229 }
wakwak_koba 10:6fa55ad359f0 230 // if (!connected) { wakwak_koba
wakwak_koba 10:6fa55ad359f0 231 if (!connected || !sessionOpened) {
SomeRandomBloke 0:260fb10c0755 232 return -2;
SomeRandomBloke 0:260fb10c0755 233 }
SomeRandomBloke 0:260fb10c0755 234 lastActivity = timer.read_ms();
wakwak_koba 10:6fa55ad359f0 235 return 0;
SomeRandomBloke 0:260fb10c0755 236 }
SomeRandomBloke 0:260fb10c0755 237
SomeRandomBloke 1:a50b6c866a3b 238 /** Publish a message on a topic.
SomeRandomBloke 1:a50b6c866a3b 239 *
SomeRandomBloke 1:a50b6c866a3b 240 * @param pub_topic The topic name the massage will be publish on.
SomeRandomBloke 1:a50b6c866a3b 241 * @param msg The massage to be published.
wakwak_koba 11:2a8c86bfdeca 242 * @param retain Retain flag.
wakwak_koba 10:6fa55ad359f0 243 * @returns -1: Failed to publish message. 0: Publish sucessed.
SomeRandomBloke 1:a50b6c866a3b 244 */
wakwak_koba 10:6fa55ad359f0 245 int MQTTClient::publish(char* pub_topic, char* msg, bool retain) {
wakwak_koba 10:6fa55ad359f0 246 uint8_t var_header_pub[strlen(pub_topic)+2];
SomeRandomBloke 0:260fb10c0755 247 strcpy((char *)&var_header_pub[2], pub_topic);
SomeRandomBloke 0:260fb10c0755 248 var_header_pub[0] = 0;
SomeRandomBloke 0:260fb10c0755 249 var_header_pub[1] = strlen(pub_topic);
wakwak_koba 10:6fa55ad359f0 250
wakwak_koba 10:6fa55ad359f0 251 // uint8_t fixed_header_pub[] = {MQTTPUBLISH,sizeof(var_header_pub)+strlen(msg)};
wakwak_koba 10:6fa55ad359f0 252 int var_len = sizeof(var_header_pub)+strlen(msg);
wakwak_koba 10:6fa55ad359f0 253 uint8_t fixed_header_pub[1 + (var_len < 128 ? 1 : 2)];
wakwak_koba 10:6fa55ad359f0 254 fixed_header_pub[0] = MQTTPUBLISH | (retain ? 0x01 : 0x00);
SomeRandomBloke 0:260fb10c0755 255
wakwak_koba 10:6fa55ad359f0 256 if(var_len < 128)
wakwak_koba 10:6fa55ad359f0 257 fixed_header_pub[1] = var_len;
wakwak_koba 10:6fa55ad359f0 258 else {
wakwak_koba 10:6fa55ad359f0 259 fixed_header_pub[1] = (var_len % 0x80) | 0x80;
wakwak_koba 10:6fa55ad359f0 260 fixed_header_pub[2] = (var_len / 0x80);
wakwak_koba 10:6fa55ad359f0 261 }
wakwak_koba 10:6fa55ad359f0 262
SomeRandomBloke 0:260fb10c0755 263 uint8_t packet_pub[sizeof(fixed_header_pub)+sizeof(var_header_pub)+strlen(msg)];
SomeRandomBloke 0:260fb10c0755 264 memset(packet_pub,0,sizeof(packet_pub));
SomeRandomBloke 0:260fb10c0755 265 memcpy(packet_pub,fixed_header_pub,sizeof(fixed_header_pub));
SomeRandomBloke 0:260fb10c0755 266 memcpy(packet_pub+sizeof(fixed_header_pub),var_header_pub,sizeof(var_header_pub));
SomeRandomBloke 0:260fb10c0755 267 memcpy(packet_pub+sizeof(fixed_header_pub)+sizeof(var_header_pub),msg,strlen(msg));
SomeRandomBloke 0:260fb10c0755 268
wakwak_koba 10:6fa55ad359f0 269 if (send_data((char*)packet_pub, sizeof(packet_pub))) {
SomeRandomBloke 0:260fb10c0755 270 return -1;
SomeRandomBloke 0:260fb10c0755 271 }
wakwak_koba 10:6fa55ad359f0 272
wakwak_koba 10:6fa55ad359f0 273 return 0;
SomeRandomBloke 0:260fb10c0755 274 }
SomeRandomBloke 0:260fb10c0755 275
SomeRandomBloke 1:a50b6c866a3b 276 /** Disconnect from server
SomeRandomBloke 1:a50b6c866a3b 277 */
SomeRandomBloke 0:260fb10c0755 278 void MQTTClient::disconnect() {
SomeRandomBloke 0:260fb10c0755 279 char packet_224[] = {0xe0, 0x00};
SomeRandomBloke 0:260fb10c0755 280 send_data((char*)packet_224, 2);
SomeRandomBloke 0:260fb10c0755 281
wakwak_koba 10:6fa55ad359f0 282 pTCPSocket->close();
wakwak_koba 10:6fa55ad359f0 283 delete pTCPSocket;
wakwak_koba 10:6fa55ad359f0 284 pTCPSocket = NULL;
SomeRandomBloke 0:260fb10c0755 285 connected = false;
SomeRandomBloke 0:260fb10c0755 286 }
SomeRandomBloke 0:260fb10c0755 287
SomeRandomBloke 1:a50b6c866a3b 288 /** Read data from receive packet
SomeRandomBloke 9:a0e39cea763a 289 * Determine what needs to be done with packet.
SomeRandomBloke 1:a50b6c866a3b 290 */
SomeRandomBloke 0:260fb10c0755 291 void MQTTClient::read_data() {
SomeRandomBloke 0:260fb10c0755 292 char buffer[1024];
SomeRandomBloke 0:260fb10c0755 293 int len = 0, readLen;
SomeRandomBloke 0:260fb10c0755 294
SomeRandomBloke 0:260fb10c0755 295 while ((readLen = pTCPSocket->recv(buffer, 1024)) != 0) {
SomeRandomBloke 0:260fb10c0755 296 len += readLen;
SomeRandomBloke 0:260fb10c0755 297 }
SomeRandomBloke 0:260fb10c0755 298
SomeRandomBloke 0:260fb10c0755 299 buffer[len] = '\0';
SomeRandomBloke 0:260fb10c0755 300
SomeRandomBloke 0:260fb10c0755 301
wakwak_koba 10:6fa55ad359f0 302 /* printf("Read length: %d %d\r\n", len, readLen);
SomeRandomBloke 0:260fb10c0755 303
SomeRandomBloke 0:260fb10c0755 304 for (int i = 0; i < len; i++) {
SomeRandomBloke 0:260fb10c0755 305 printf("%2X ", buffer[i]);
SomeRandomBloke 0:260fb10c0755 306 }
wakwak_koba 10:6fa55ad359f0 307 printf("\r\n"); */
SomeRandomBloke 0:260fb10c0755 308
SomeRandomBloke 0:260fb10c0755 309 char type = buffer[0]>>4;
SomeRandomBloke 0:260fb10c0755 310 if ( type == 2 ) { // CONNACK
SomeRandomBloke 0:260fb10c0755 311 printf("CONNACK\n");
SomeRandomBloke 0:260fb10c0755 312
SomeRandomBloke 0:260fb10c0755 313 } else if (type == 3) { // PUBLISH
SomeRandomBloke 0:260fb10c0755 314 if (callback_server) {
SomeRandomBloke 0:260fb10c0755 315 short index = 1;
SomeRandomBloke 0:260fb10c0755 316 short multiplier = 1;
SomeRandomBloke 9:a0e39cea763a 317 short value = 0;
SomeRandomBloke 0:260fb10c0755 318 uint8_t digit;
SomeRandomBloke 0:260fb10c0755 319 do {
SomeRandomBloke 9:a0e39cea763a 320 digit = buffer[index++];
SomeRandomBloke 9:a0e39cea763a 321 value += (digit & 127) * multiplier;
SomeRandomBloke 0:260fb10c0755 322 multiplier *= 128;
SomeRandomBloke 0:260fb10c0755 323 } while ((digit & 128) != 0);
SomeRandomBloke 0:260fb10c0755 324 printf( "variable length %d, index %d\n", value, index );
SomeRandomBloke 9:a0e39cea763a 325
SomeRandomBloke 0:260fb10c0755 326 // uint8_t tl = (buffer[2]<<3)+buffer[3];
SomeRandomBloke 0:260fb10c0755 327 uint8_t tl = (buffer[index]<<3)+buffer[index+1];
SomeRandomBloke 0:260fb10c0755 328 printf("Topic len %d\n",tl);
SomeRandomBloke 0:260fb10c0755 329 char topic[tl+1];
SomeRandomBloke 0:260fb10c0755 330 for (int i=0; i<tl; i++) {
SomeRandomBloke 0:260fb10c0755 331 topic[i] = buffer[index+2+i];
SomeRandomBloke 0:260fb10c0755 332 }
SomeRandomBloke 0:260fb10c0755 333 topic[tl] = 0;
SomeRandomBloke 0:260fb10c0755 334 // ignore msgID - only support QoS 0 subs
SomeRandomBloke 0:260fb10c0755 335 char *payload = buffer+index+2+tl;
SomeRandomBloke 0:260fb10c0755 336 callback_server(topic,(char*)payload);
SomeRandomBloke 0:260fb10c0755 337 }
SomeRandomBloke 0:260fb10c0755 338 } else if (type == 12) { // PINGREG -- Ask for alive
SomeRandomBloke 0:260fb10c0755 339 char packet_208[] = {0xd0, 0x00};
SomeRandomBloke 0:260fb10c0755 340 send_data((char*)packet_208, 2);
SomeRandomBloke 0:260fb10c0755 341 lastActivity = timer.read_ms();
SomeRandomBloke 0:260fb10c0755 342 }
SomeRandomBloke 0:260fb10c0755 343 }
SomeRandomBloke 0:260fb10c0755 344
SomeRandomBloke 1:a50b6c866a3b 345 /** Check for session opened
SomeRandomBloke 1:a50b6c866a3b 346 */
SomeRandomBloke 0:260fb10c0755 347 void MQTTClient::read_open_session() {
SomeRandomBloke 0:260fb10c0755 348 char buffer[32];
SomeRandomBloke 0:260fb10c0755 349 int len = 0, readLen;
SomeRandomBloke 0:260fb10c0755 350
SomeRandomBloke 0:260fb10c0755 351 while ((readLen = pTCPSocket->recv(buffer, 32)) != 0) {
SomeRandomBloke 0:260fb10c0755 352 len += readLen;
SomeRandomBloke 0:260fb10c0755 353 }
SomeRandomBloke 0:260fb10c0755 354
SomeRandomBloke 0:260fb10c0755 355 if (len == 4 && buffer[3] == 0) {
SomeRandomBloke 0:260fb10c0755 356 printf("Session opened\r\n");
SomeRandomBloke 0:260fb10c0755 357 sessionOpened = true;
SomeRandomBloke 0:260fb10c0755 358 }
SomeRandomBloke 0:260fb10c0755 359 }
SomeRandomBloke 0:260fb10c0755 360
SomeRandomBloke 1:a50b6c866a3b 361 /** Subscribe to a topic
SomeRandomBloke 9:a0e39cea763a 362 *
SomeRandomBloke 1:a50b6c866a3b 363 * @param topic The topic name to be subscribed.
wakwak_koba 10:6fa55ad359f0 364 * @returns -1: Failed to subscribe to topic. 0: Subscribe sucessed.
SomeRandomBloke 1:a50b6c866a3b 365 */
SomeRandomBloke 0:260fb10c0755 366 int MQTTClient::subscribe(char* topic) {
SomeRandomBloke 0:260fb10c0755 367
SomeRandomBloke 0:260fb10c0755 368 if (connected) {
SomeRandomBloke 0:260fb10c0755 369 uint8_t var_header_topic[] = {0,10};
SomeRandomBloke 0:260fb10c0755 370 uint8_t fixed_header_topic[] = {MQTTSUBSCRIBE,sizeof(var_header_topic)+strlen(topic)+3};
SomeRandomBloke 0:260fb10c0755 371
SomeRandomBloke 0:260fb10c0755 372 //utf topic
SomeRandomBloke 0:260fb10c0755 373 uint8_t utf_topic[strlen(topic)+3];
SomeRandomBloke 0:260fb10c0755 374 strcpy((char *)&utf_topic[2], topic);
SomeRandomBloke 0:260fb10c0755 375
SomeRandomBloke 0:260fb10c0755 376 utf_topic[0] = 0;
SomeRandomBloke 0:260fb10c0755 377 utf_topic[1] = strlen(topic);
SomeRandomBloke 0:260fb10c0755 378 utf_topic[sizeof(utf_topic)-1] = 0;
SomeRandomBloke 0:260fb10c0755 379
SomeRandomBloke 0:260fb10c0755 380 char packet_topic[sizeof(var_header_topic)+sizeof(fixed_header_topic)+strlen(topic)+3];
SomeRandomBloke 0:260fb10c0755 381 memset(packet_topic,0,sizeof(packet_topic));
SomeRandomBloke 0:260fb10c0755 382 memcpy(packet_topic,fixed_header_topic,sizeof(fixed_header_topic));
SomeRandomBloke 0:260fb10c0755 383 memcpy(packet_topic+sizeof(fixed_header_topic),var_header_topic,sizeof(var_header_topic));
SomeRandomBloke 0:260fb10c0755 384 memcpy(packet_topic+sizeof(fixed_header_topic)+sizeof(var_header_topic),utf_topic,sizeof(utf_topic));
SomeRandomBloke 0:260fb10c0755 385
wakwak_koba 10:6fa55ad359f0 386 if (send_data(packet_topic, sizeof(packet_topic))) {
SomeRandomBloke 0:260fb10c0755 387 return -1;
SomeRandomBloke 0:260fb10c0755 388 }
wakwak_koba 10:6fa55ad359f0 389 return 0;
SomeRandomBloke 0:260fb10c0755 390 }
SomeRandomBloke 0:260fb10c0755 391 return -1;
SomeRandomBloke 0:260fb10c0755 392 }
SomeRandomBloke 0:260fb10c0755 393
SomeRandomBloke 1:a50b6c866a3b 394 /** Send heartbeat/keep-alive message
SomeRandomBloke 9:a0e39cea763a 395 *
SomeRandomBloke 1:a50b6c866a3b 396 */
SomeRandomBloke 0:260fb10c0755 397 void MQTTClient::live() {
SomeRandomBloke 0:260fb10c0755 398 if (connected) {
SomeRandomBloke 0:260fb10c0755 399 int t = timer.read_ms();
wakwak_koba 10:6fa55ad359f0 400 if (t - lastActivity > keepalive) {
SomeRandomBloke 0:260fb10c0755 401 //Send 192 0 to broker
wakwak_koba 10:6fa55ad359f0 402 //printf("Send 192\r\n");
SomeRandomBloke 0:260fb10c0755 403 char packet_192[] = {0xc0, 0x00};
SomeRandomBloke 0:260fb10c0755 404 send_data((char*)packet_192, 2);
SomeRandomBloke 0:260fb10c0755 405 lastActivity = t;
SomeRandomBloke 0:260fb10c0755 406 }
SomeRandomBloke 0:260fb10c0755 407 }
SomeRandomBloke 0:260fb10c0755 408 }
SomeRandomBloke 0:260fb10c0755 409
wakwak_koba 10:6fa55ad359f0 410 void MQTTClient::pool() {
wakwak_koba 10:6fa55ad359f0 411 Net::poll();
wakwak_koba 10:6fa55ad359f0 412 }
wakwak_koba 10:6fa55ad359f0 413
SomeRandomBloke 1:a50b6c866a3b 414 /** TCP Socket event handling
SomeRandomBloke 1:a50b6c866a3b 415 *
SomeRandomBloke 1:a50b6c866a3b 416 * @param e Event object
SomeRandomBloke 1:a50b6c866a3b 417 */
SomeRandomBloke 0:260fb10c0755 418 void MQTTClient::onTCPSocketEvent(TCPSocketEvent e) {
SomeRandomBloke 0:260fb10c0755 419 switch (e) {
SomeRandomBloke 0:260fb10c0755 420 case TCPSOCKET_ACCEPT:
SomeRandomBloke 0:260fb10c0755 421 printf("New TCPSocketEvent: TCPSOCKET_ACCEPT\r\n");
SomeRandomBloke 0:260fb10c0755 422 break;
SomeRandomBloke 0:260fb10c0755 423 case TCPSOCKET_CONNECTED:
SomeRandomBloke 0:260fb10c0755 424 printf("New TCPSocketEvent: TCPSOCKET_CONNECTED\r\n");
SomeRandomBloke 0:260fb10c0755 425 connected = true;
SomeRandomBloke 0:260fb10c0755 426 break;
SomeRandomBloke 0:260fb10c0755 427 case TCPSOCKET_WRITEABLE:
wakwak_koba 10:6fa55ad359f0 428 // printf("New TCPSocketEvent: TCPSOCKET_WRITEABLE\r\n");
wakwak_koba 10:6fa55ad359f0 429 writtable = true;
SomeRandomBloke 0:260fb10c0755 430 break;
SomeRandomBloke 0:260fb10c0755 431 case TCPSOCKET_READABLE:
SomeRandomBloke 0:260fb10c0755 432 printf("New TCPSocketEvent: TCPSOCKET_READABLE\r\n");
SomeRandomBloke 0:260fb10c0755 433 if (!sessionOpened) {
SomeRandomBloke 0:260fb10c0755 434 read_open_session();
SomeRandomBloke 0:260fb10c0755 435 }
SomeRandomBloke 0:260fb10c0755 436 read_data();
SomeRandomBloke 0:260fb10c0755 437 break;
SomeRandomBloke 0:260fb10c0755 438 case TCPSOCKET_CONTIMEOUT:
SomeRandomBloke 0:260fb10c0755 439 printf("New TCPSocketEvent: TCPSOCKET_CONTIMEOUT\r\n");
SomeRandomBloke 0:260fb10c0755 440 break;
SomeRandomBloke 0:260fb10c0755 441 case TCPSOCKET_CONRST:
SomeRandomBloke 0:260fb10c0755 442 printf("New TCPSocketEvent: TCPSOCKET_CONRST\r\n");
SomeRandomBloke 0:260fb10c0755 443 break;
SomeRandomBloke 0:260fb10c0755 444 case TCPSOCKET_CONABRT:
SomeRandomBloke 0:260fb10c0755 445 printf("New TCPSocketEvent: TCPSOCKET_CONABRT\r\n");
SomeRandomBloke 0:260fb10c0755 446 break;
SomeRandomBloke 0:260fb10c0755 447 case TCPSOCKET_ERROR:
SomeRandomBloke 0:260fb10c0755 448 printf("New TCPSocketEvent: TCPSOCKET_ERROR\r\n");
SomeRandomBloke 0:260fb10c0755 449 break;
SomeRandomBloke 0:260fb10c0755 450 case TCPSOCKET_DISCONNECTED:
SomeRandomBloke 0:260fb10c0755 451 printf("New TCPSocketEvent: TCPSOCKET_DISCONNECTED\r\n");
SomeRandomBloke 0:260fb10c0755 452 pTCPSocket->close();
SomeRandomBloke 0:260fb10c0755 453 connected = false;
SomeRandomBloke 0:260fb10c0755 454 break;
SomeRandomBloke 0:260fb10c0755 455 }
SomeRandomBloke 0:260fb10c0755 456 }