MQTT lib for WISEAgent
Fork of MQTT by
Embed:
(wiki syntax)
Show/hide line numbers
MQTTAsync.h
00001 /******************************************************************************* 00002 * Copyright (c) 2014 IBM Corp. 00003 * 00004 * All rights reserved. This program and the accompanying materials 00005 * are made available under the terms of the Eclipse Public License v1.0 00006 * and Eclipse Distribution License v1.0 which accompany this distribution. 00007 * 00008 * The Eclipse Public License is available at 00009 * http://www.eclipse.org/legal/epl-v10.html 00010 * and the Eclipse Distribution License is available at 00011 * http://www.eclipse.org/org/documents/edl-v10.php. 00012 * 00013 * Contributors: 00014 * Ian Craggs - initial API and implementation and/or initial documentation 00015 *******************************************************************************/ 00016 00017 #if !defined(MQTTASYNC_H) 00018 #define MQTTASYNC_H 00019 00020 #include "FP.h" 00021 #include "MQTTPacket.h" 00022 #include "stdio.h" 00023 00024 00025 00026 namespace MQTT 00027 { 00028 00029 00030 enum QoS { QOS0, QOS1, QOS2 }; 00031 00032 00033 struct Message 00034 { 00035 enum QoS qos; 00036 bool retained; 00037 bool dup; 00038 unsigned short id; 00039 void *payload; 00040 size_t payloadlen; 00041 }; 00042 00043 00044 class PacketId 00045 { 00046 public: 00047 PacketId(); 00048 00049 int getNext(); 00050 00051 private: 00052 static const int MAX_PACKET_ID = 65535; 00053 int next; 00054 }; 00055 00056 typedef void (*messageHandler)(Message*); 00057 00058 typedef struct limits 00059 { 00060 int MAX_MQTT_PACKET_SIZE; // 00061 int MAX_MESSAGE_HANDLERS; // each subscription requires a message handler 00062 int MAX_CONCURRENT_OPERATIONS; // each command which runs concurrently can have a result handler, when we are in multi-threaded mode 00063 int command_timeout_ms; 00064 00065 limits() 00066 { 00067 MAX_MQTT_PACKET_SIZE = ADV_MAX_PACKET_SIZE; 00068 MAX_MESSAGE_HANDLERS = 5; 00069 MAX_CONCURRENT_OPERATIONS = 1; // 1 indicates single-threaded mode - set to >1 for multithreaded mode 00070 command_timeout_ms = 30000; 00071 } 00072 } Limits; 00073 00074 00075 /** 00076 * @class Async 00077 * @brief non-blocking, threaded MQTT client API 00078 * @param Network a network class which supports send, receive 00079 * @param Timer a timer class with the methods: 00080 */ 00081 template<class Network, class Timer, class Thread, class Mutex> class Async 00082 { 00083 00084 public: 00085 00086 struct Result 00087 { 00088 /* success or failure result data */ 00089 Async<Network, Timer, Thread, Mutex>* client; 00090 int rc; 00091 }; 00092 00093 typedef void (*resultHandler)(Result*); 00094 00095 Async(Network* network, const Limits limits = Limits()); 00096 00097 typedef struct 00098 { 00099 Async* client; 00100 Network* network; 00101 } connectionLostInfo; 00102 00103 typedef int (*connectionLostHandlers)(connectionLostInfo*); 00104 00105 /** Set the connection lost callback - called whenever the connection is lost and we should be connected 00106 * @param clh - pointer to the callback function 00107 */ 00108 void setConnectionLostHandler(connectionLostHandlers clh) 00109 { 00110 connectionLostHandler.attach(clh); 00111 } 00112 00113 /** Set the default message handling callback - used for any message which does not match a subscription message handler 00114 * @param mh - pointer to the callback function 00115 */ 00116 void setDefaultMessageHandler(messageHandler mh) 00117 { 00118 defaultMessageHandler.attach(mh); 00119 } 00120 00121 int connect(resultHandler fn, MQTTPacket_connectData* options = 0); 00122 00123 template<class T> 00124 int connect(void(T::*method)(Result *), MQTTPacket_connectData* options = 0, T *item = 0); // alternative to pass in pointer to member function 00125 00126 int publish(resultHandler rh, const char* topic, Message* message); 00127 00128 int subscribe(resultHandler rh, const char* topicFilter, enum QoS qos, messageHandler mh); 00129 00130 int unsubscribe(resultHandler rh, const char* topicFilter); 00131 00132 int disconnect(resultHandler rh); 00133 00134 private: 00135 00136 void run(void const *argument); 00137 int cycle(int timeout); 00138 int waitfor(int packet_type, Timer& atimer); 00139 int keepalive(); 00140 int findFreeOperation(); 00141 00142 int decodePacket(int* value, int timeout); 00143 int readPacket(int timeout); 00144 int sendPacket(int length, int timeout); 00145 int deliverMessage(MQTTString* topic, Message* message); 00146 00147 Thread* thread; 00148 Network* ipstack; 00149 00150 Limits limits; 00151 00152 char* buf; 00153 char* readbuf; 00154 00155 Timer ping_timer, connect_timer; 00156 unsigned int keepAliveInterval; 00157 bool ping_outstanding; 00158 00159 PacketId packetid; 00160 00161 typedef FP<void, Result*> resultHandlerFP; 00162 resultHandlerFP connectHandler; 00163 00164 typedef FP<void, Message*> messageHandlerFP; 00165 struct MessageHandlers 00166 { 00167 const char* topic; 00168 messageHandlerFP fp; 00169 } *messageHandlers; // Message handlers are indexed by subscription topic 00170 00171 // how many concurrent operations should we allow? Each one will require a function pointer 00172 struct Operations 00173 { 00174 unsigned short id; 00175 resultHandlerFP fp; 00176 const char* topic; // if this is a publish, store topic name in case republishing is required 00177 Message* message; // for publish, 00178 Timer timer; // to check if the command has timed out 00179 } *operations; // result handlers are indexed by packet ids 00180 00181 static void threadfn(void* arg); 00182 00183 messageHandlerFP defaultMessageHandler; 00184 00185 typedef FP<int, connectionLostInfo*> connectionLostFP; 00186 00187 connectionLostFP connectionLostHandler; 00188 00189 }; 00190 00191 } 00192 00193 00194 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::threadfn(void* arg) 00195 { 00196 ((Async<Network, Timer, Thread, Mutex>*) arg)->run(NULL); 00197 } 00198 00199 00200 template<class Network, class Timer, class Thread, class Mutex> MQTT::Async<Network, Timer, Thread, Mutex>::Async(Network* network, Limits limits) : limits(limits), packetid() 00201 { 00202 this->thread = 0; 00203 this->ipstack = network; 00204 this->ping_timer = Timer(); 00205 this->ping_outstanding = 0; 00206 00207 // How to make these memory allocations portable? I was hoping to avoid the heap 00208 printf("size=%d\n",limits.MAX_MQTT_PACKET_SIZE); 00209 buf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00210 readbuf = new char[limits.MAX_MQTT_PACKET_SIZE]; 00211 this->operations = new struct Operations[limits.MAX_CONCURRENT_OPERATIONS]; 00212 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00213 operations[i].id = 0; 00214 this->messageHandlers = new struct MessageHandlers[limits.MAX_MESSAGE_HANDLERS]; 00215 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00216 messageHandlers[i].topic = 0; 00217 } 00218 00219 00220 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::sendPacket(int length, int timeout) 00221 { 00222 int sent = 0; 00223 00224 while (sent < length) 00225 sent += ipstack->write(&buf[sent], length, timeout); 00226 if (sent == length) 00227 ping_timer.countdown(this->keepAliveInterval); // record the fact that we have successfully sent the packet 00228 return sent; 00229 } 00230 00231 00232 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::decodePacket(int* value, int timeout) 00233 { 00234 char c; 00235 int multiplier = 1; 00236 int len = 0; 00237 const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4; 00238 00239 *value = 0; 00240 do 00241 { 00242 int rc = MQTTPACKET_READ_ERROR; 00243 00244 if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) 00245 { 00246 rc = MQTTPACKET_READ_ERROR; /* bad data */ 00247 goto exit; 00248 } 00249 rc = ipstack->read(&c, 1, timeout); 00250 if (rc != 1) 00251 goto exit; 00252 *value += (c & 127) * multiplier; 00253 multiplier *= 128; 00254 } while ((c & 128) != 0); 00255 exit: 00256 return len; 00257 } 00258 00259 00260 /** 00261 * If any read fails in this method, then we should disconnect from the network, as on reconnect 00262 * the packets can be retried. 00263 * @param timeout the max time to wait for the packet read to complete, in milliseconds 00264 * @return the MQTT packet type, or -1 if none 00265 */ 00266 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::readPacket(int timeout) 00267 { 00268 int rc = -1; 00269 MQTTHeader header = {0}; 00270 int len = 0; 00271 int rem_len = 0; 00272 00273 /* 1. read the header byte. This has the packet type in it */ 00274 if (ipstack->read(readbuf, 1, timeout) != 1) 00275 goto exit; 00276 00277 len = 1; 00278 /* 2. read the remaining length. This is variable in itself */ 00279 decodePacket(&rem_len, timeout); 00280 len += MQTTPacket_encode(readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ 00281 00282 /* 3. read the rest of the buffer using a callback to supply the rest of the data */ 00283 if (ipstack->read(readbuf + len, rem_len, timeout) != rem_len) 00284 goto exit; 00285 00286 header.byte = readbuf[0]; 00287 rc = header.bits.type; 00288 exit: 00289 return rc; 00290 } 00291 00292 00293 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::deliverMessage(MQTTString* topic, Message* message) 00294 { 00295 int rc = -1; 00296 00297 // we have to find the right message handler - indexed by topic 00298 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00299 { 00300 if (messageHandlers[i].topic != 0 && MQTTPacket_equals(topic, (char*)messageHandlers[i].topic)) 00301 { 00302 messageHandlers[i].fp(message); 00303 rc = 0; 00304 break; 00305 } 00306 } 00307 00308 return rc; 00309 } 00310 00311 00312 00313 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::cycle(int timeout) 00314 { 00315 /* get one piece of work off the wire and one pass through */ 00316 00317 // read the socket, see what work is due 00318 int packet_type = readPacket(timeout); 00319 00320 int len, rc; 00321 switch (packet_type) 00322 { 00323 case CONNACK: 00324 if (this->thread) 00325 { 00326 Result res = {this, 0}; 00327 if (MQTTDeserialize_connack(&res.rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00328 ; 00329 connectHandler(&res); 00330 connectHandler.detach(); // only invoke the callback once 00331 } 00332 break; 00333 case PUBACK: 00334 if (this->thread) 00335 ; //call resultHandler 00336 case SUBACK: 00337 break; 00338 case PUBLISH: 00339 MQTTString topicName; 00340 Message msg; 00341 rc = MQTTDeserialize_publish((int*)&msg.dup, (int*)&msg.qos, (int*)&msg.retained, (int*)&msg.id, &topicName, 00342 (char**)&msg.payload, (int*)&msg.payloadlen, readbuf, limits.MAX_MQTT_PACKET_SIZE);; 00343 if (msg.qos == QOS0) 00344 deliverMessage(&topicName, &msg); 00345 break; 00346 case PUBREC: 00347 int type, dup, mypacketid; 00348 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00349 ; 00350 // must lock this access against the application thread, if we are multi-threaded 00351 len = MQTTSerialize_ack(buf, limits.MAX_MQTT_PACKET_SIZE, PUBREL, 0, mypacketid); 00352 rc = sendPacket(len, timeout); // send the PUBREL packet 00353 if (rc != len) 00354 goto exit; // there was a problem 00355 00356 break; 00357 case PUBCOMP: 00358 break; 00359 case PINGRESP: 00360 ping_outstanding = false; 00361 break; 00362 } 00363 keepalive(); 00364 exit: 00365 return packet_type; 00366 } 00367 00368 00369 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::keepalive() 00370 { 00371 int rc = 0; 00372 00373 if (keepAliveInterval == 0) 00374 goto exit; 00375 00376 if (ping_timer.expired()) 00377 { 00378 if (ping_outstanding) 00379 rc = -1; 00380 else 00381 { 00382 int len = MQTTSerialize_pingreq(buf, limits.MAX_MQTT_PACKET_SIZE); 00383 rc = sendPacket(len, 1000); // send the ping packet 00384 if (rc != len) 00385 rc = -1; // indicate there's a problem 00386 else 00387 ping_outstanding = true; 00388 } 00389 } 00390 00391 exit: 00392 return rc; 00393 } 00394 00395 00396 template<class Network, class Timer, class Thread, class Mutex> void MQTT::Async<Network, Timer, Thread, Mutex>::run(void const *argument) 00397 { 00398 while (true) 00399 cycle(ping_timer.left_ms()); 00400 } 00401 00402 00403 // only used in single-threaded mode where one command at a time is in process 00404 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::waitfor(int packet_type, Timer& atimer) 00405 { 00406 int rc = -1; 00407 00408 do 00409 { 00410 if (atimer.expired()) 00411 break; // we timed out 00412 } 00413 while ((rc = cycle(atimer.left_ms())) != packet_type); 00414 00415 return rc; 00416 } 00417 00418 00419 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::connect(resultHandler resultHandler, MQTTPacket_connectData* options) 00420 { 00421 connect_timer.countdown(limits.command_timeout_ms); 00422 00423 MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer; 00424 if (options == 0) 00425 options = &default_options; // set default options if none were supplied 00426 00427 this->keepAliveInterval = options->keepAliveInterval; 00428 ping_timer.countdown(this->keepAliveInterval); 00429 int len = MQTTSerialize_connect(buf, limits.MAX_MQTT_PACKET_SIZE, options); 00430 int rc = sendPacket(len, connect_timer.left_ms()); // send the connect packet 00431 if (rc != len) 00432 goto exit; // there was a problem 00433 00434 if (resultHandler == 0) // wait until the connack is received 00435 { 00436 // this will be a blocking call, wait for the connack 00437 if (waitfor(CONNACK, connect_timer) == CONNACK) 00438 { 00439 int connack_rc = -1; 00440 if (MQTTDeserialize_connack(&connack_rc, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00441 rc = connack_rc; 00442 } 00443 } 00444 else 00445 { 00446 // set connect response callback function 00447 connectHandler.attach(resultHandler); 00448 00449 // start background thread 00450 this->thread = new Thread((void (*)(void const *argument))&MQTT::Async<Network, Timer, Thread, Mutex>::threadfn, (void*)this); 00451 } 00452 00453 exit: 00454 return rc; 00455 } 00456 00457 00458 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::findFreeOperation() 00459 { 00460 int found = -1; 00461 for (int i = 0; i < limits.MAX_CONCURRENT_OPERATIONS; ++i) 00462 { 00463 if (operations[i].id == 0) 00464 { 00465 found = i; 00466 break; 00467 } 00468 } 00469 return found; 00470 } 00471 00472 00473 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::subscribe(resultHandler resultHandler, const char* topicFilter, enum QoS qos, messageHandler messageHandler) 00474 { 00475 int index = 0; 00476 if (this->thread) 00477 index = findFreeOperation(); 00478 Timer& atimer = operations[index].timer; 00479 00480 atimer.countdown(limits.command_timeout_ms); 00481 MQTTString topic = {(char*)topicFilter, 0, 0}; 00482 00483 int len = MQTTSerialize_subscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic, (int*)&qos); 00484 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00485 if (rc != len) 00486 goto exit; // there was a problem 00487 00488 /* wait for suback */ 00489 if (resultHandler == 0) 00490 { 00491 // this will block 00492 if (waitfor(SUBACK, atimer) == SUBACK) 00493 { 00494 int count = 0, grantedQoS = -1, mypacketid; 00495 if (MQTTDeserialize_suback(&mypacketid, 1, &count, &grantedQoS, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00496 rc = grantedQoS; // 0, 1, 2 or 0x80 00497 if (rc != 0x80) 00498 { 00499 for (int i = 0; i < limits.MAX_MESSAGE_HANDLERS; ++i) 00500 { 00501 if (messageHandlers[i].topic == 0) 00502 { 00503 messageHandlers[i].topic = topicFilter; 00504 messageHandlers[i].fp.attach(messageHandler); 00505 rc = 0; 00506 break; 00507 } 00508 } 00509 } 00510 } 00511 } 00512 else 00513 { 00514 // set subscribe response callback function 00515 00516 } 00517 00518 exit: 00519 return rc; 00520 } 00521 00522 00523 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::unsubscribe(resultHandler resultHandler, const char* topicFilter) 00524 { 00525 int index = 0; 00526 if (this->thread) 00527 index = findFreeOperation(); 00528 Timer& atimer = operations[index].timer; 00529 00530 atimer.countdown(limits.command_timeout_ms); 00531 MQTTString topic = {(char*)topicFilter, 0, 0}; 00532 00533 int len = MQTTSerialize_unsubscribe(buf, limits.MAX_MQTT_PACKET_SIZE, 0, packetid.getNext(), 1, &topic); 00534 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00535 if (rc != len) 00536 goto exit; // there was a problem 00537 00538 // set unsubscribe response callback function 00539 00540 00541 exit: 00542 return rc; 00543 } 00544 00545 00546 00547 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::publish(resultHandler resultHandler, const char* topicName, Message* message) 00548 { 00549 int index = 0; 00550 if (this->thread) 00551 index = findFreeOperation(); 00552 Timer& atimer = operations[index].timer; 00553 00554 atimer.countdown(limits.command_timeout_ms); 00555 MQTTString topic = {(char*)topicName, 0, 0}; 00556 00557 if (message->qos == QOS1 || message->qos == QOS2) 00558 message->id = packetid.getNext(); 00559 00560 int len = MQTTSerialize_publish(buf, limits.MAX_MQTT_PACKET_SIZE, 0, message->qos, message->retained, message->id, topic, (char*)message->payload, message->payloadlen); 00561 int rc = sendPacket(len, atimer.left_ms()); // send the subscribe packet 00562 if (rc != len) 00563 goto exit; // there was a problem 00564 00565 /* wait for acks */ 00566 if (resultHandler == 0) 00567 { 00568 if (message->qos == QOS1) 00569 { 00570 if (waitfor(PUBACK, atimer) == PUBACK) 00571 { 00572 int type, dup, mypacketid; 00573 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00574 rc = 0; 00575 } 00576 } 00577 else if (message->qos == QOS2) 00578 { 00579 if (waitfor(PUBCOMP, atimer) == PUBCOMP) 00580 { 00581 int type, dup, mypacketid; 00582 if (MQTTDeserialize_ack(&type, &dup, &mypacketid, readbuf, limits.MAX_MQTT_PACKET_SIZE) == 1) 00583 rc = 0; 00584 } 00585 00586 } 00587 } 00588 else 00589 { 00590 // set publish response callback function 00591 00592 } 00593 00594 exit: 00595 return rc; 00596 } 00597 00598 00599 template<class Network, class Timer, class Thread, class Mutex> int MQTT::Async<Network, Timer, Thread, Mutex>::disconnect(resultHandler resultHandler) 00600 { 00601 Timer timer = Timer(limits.command_timeout_ms); // we might wait for incomplete incoming publishes to complete 00602 int len = MQTTSerialize_disconnect(buf, limits.MAX_MQTT_PACKET_SIZE); 00603 int rc = sendPacket(len, timer.left_ms()); // send the disconnect packet 00604 00605 return (rc == len) ? 0 : -1; 00606 } 00607 00608 00609 00610 #endif
Generated on Fri Jul 15 2022 18:56:26 by
1.7.2
