An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Committer:
Ian Craggs
Date:
Thu Nov 02 12:12:41 2017 +0000
Revision:
59:9cff7b6bbd01
Parent:
54:ff9e5c4b52d0
MQTTPacket library merge

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 31:a51dd239b78e 1 #if !defined(MQTTSOCKET_H)
icraggs 31:a51dd239b78e 2 #define MQTTSOCKET_H
icraggs 31:a51dd239b78e 3
icraggs 43:21da1f744243 4 #include "MQTTmbed.h"
icraggs 54:ff9e5c4b52d0 5 #include <EthernetInterface.h>
icraggs 54:ff9e5c4b52d0 6 #include <Timer.h>
icraggs 31:a51dd239b78e 7
icraggs 31:a51dd239b78e 8 class MQTTSocket
icraggs 31:a51dd239b78e 9 {
Jan Jongboom 49:08571008b958 10 public:
icraggs 54:ff9e5c4b52d0 11 MQTTSocket(EthernetInterface *anet)
icraggs 54:ff9e5c4b52d0 12 {
icraggs 54:ff9e5c4b52d0 13 net = anet;
icraggs 54:ff9e5c4b52d0 14 open = false;
icraggs 54:ff9e5c4b52d0 15 }
icraggs 54:ff9e5c4b52d0 16
icraggs 31:a51dd239b78e 17 int connect(char* hostname, int port, int timeout=1000)
icraggs 31:a51dd239b78e 18 {
icraggs 54:ff9e5c4b52d0 19 if (open)
icraggs 54:ff9e5c4b52d0 20 disconnect();
icraggs 54:ff9e5c4b52d0 21 nsapi_error_t rc = mysock.open(net);
icraggs 54:ff9e5c4b52d0 22 open = true;
icraggs 54:ff9e5c4b52d0 23 mysock.set_blocking(true);
icraggs 54:ff9e5c4b52d0 24 mysock.set_timeout((unsigned int)timeout);
icraggs 54:ff9e5c4b52d0 25 rc = mysock.connect(hostname, port);
icraggs 54:ff9e5c4b52d0 26 mysock.set_blocking(false); // blocking timeouts seem not to work
icraggs 54:ff9e5c4b52d0 27 return rc;
icraggs 31:a51dd239b78e 28 }
icraggs 31:a51dd239b78e 29
icraggs 54:ff9e5c4b52d0 30 // common read/write routine, avoiding blocking timeouts
icraggs 54:ff9e5c4b52d0 31 int common(unsigned char* buffer, int len, int timeout, bool read)
icraggs 54:ff9e5c4b52d0 32 {
icraggs 54:ff9e5c4b52d0 33 timer.start();
icraggs 54:ff9e5c4b52d0 34 mysock.set_blocking(false); // blocking timeouts seem not to work
icraggs 54:ff9e5c4b52d0 35 int bytes = 0;
icraggs 54:ff9e5c4b52d0 36 bool first = true;
icraggs 54:ff9e5c4b52d0 37 do
icraggs 54:ff9e5c4b52d0 38 {
icraggs 54:ff9e5c4b52d0 39 if (first)
icraggs 54:ff9e5c4b52d0 40 first = false;
icraggs 54:ff9e5c4b52d0 41 else
icraggs 54:ff9e5c4b52d0 42 wait_ms(timeout < 100 ? timeout : 100);
icraggs 54:ff9e5c4b52d0 43 int rc;
icraggs 54:ff9e5c4b52d0 44 if (read)
icraggs 54:ff9e5c4b52d0 45 rc = mysock.recv((char*)buffer, len);
icraggs 54:ff9e5c4b52d0 46 else
icraggs 54:ff9e5c4b52d0 47 rc = mysock.send((char*)buffer, len);
icraggs 54:ff9e5c4b52d0 48 if (rc < 0)
icraggs 54:ff9e5c4b52d0 49 {
icraggs 54:ff9e5c4b52d0 50 if (rc != NSAPI_ERROR_WOULD_BLOCK)
icraggs 54:ff9e5c4b52d0 51 {
icraggs 54:ff9e5c4b52d0 52 bytes = -1;
icraggs 54:ff9e5c4b52d0 53 break;
icraggs 54:ff9e5c4b52d0 54 }
icraggs 54:ff9e5c4b52d0 55 }
icraggs 54:ff9e5c4b52d0 56 else
icraggs 54:ff9e5c4b52d0 57 bytes += rc;
icraggs 54:ff9e5c4b52d0 58 }
icraggs 54:ff9e5c4b52d0 59 while (bytes < len && timer.read_ms() < timeout);
icraggs 54:ff9e5c4b52d0 60 timer.stop();
icraggs 54:ff9e5c4b52d0 61 return bytes;
icraggs 54:ff9e5c4b52d0 62 }
icraggs 54:ff9e5c4b52d0 63
icraggs 54:ff9e5c4b52d0 64 /* returns the number of bytes read, which could be 0.
icraggs 54:ff9e5c4b52d0 65 -1 if there was an error on the socket
icraggs 54:ff9e5c4b52d0 66 */
icraggs 36:2f1ada427e56 67 int read(unsigned char* buffer, int len, int timeout)
icraggs 31:a51dd239b78e 68 {
icraggs 54:ff9e5c4b52d0 69 return common(buffer, len, timeout, true);
icraggs 31:a51dd239b78e 70 }
Jan Jongboom 49:08571008b958 71
icraggs 36:2f1ada427e56 72 int write(unsigned char* buffer, int len, int timeout)
icraggs 31:a51dd239b78e 73 {
icraggs 54:ff9e5c4b52d0 74 return common(buffer, len, timeout, false);
icraggs 31:a51dd239b78e 75 }
Jan Jongboom 49:08571008b958 76
icraggs 31:a51dd239b78e 77 int disconnect()
icraggs 31:a51dd239b78e 78 {
icraggs 54:ff9e5c4b52d0 79 open = false;
icraggs 31:a51dd239b78e 80 return mysock.close();
icraggs 31:a51dd239b78e 81 }
Jan Jongboom 49:08571008b958 82
icraggs 54:ff9e5c4b52d0 83 /*bool is_connected()
Jan Jongboom 49:08571008b958 84 {
Jan Jongboom 49:08571008b958 85 return mysock.is_connected();
icraggs 54:ff9e5c4b52d0 86 }*/
Jan Jongboom 49:08571008b958 87
icraggs 31:a51dd239b78e 88 private:
icraggs 31:a51dd239b78e 89
icraggs 54:ff9e5c4b52d0 90 bool open;
icraggs 54:ff9e5c4b52d0 91 TCPSocket mysock;
icraggs 54:ff9e5c4b52d0 92 EthernetInterface *net;
icraggs 54:ff9e5c4b52d0 93 Timer timer;
Jan Jongboom 49:08571008b958 94
icraggs 31:a51dd239b78e 95 };
icraggs 31:a51dd239b78e 96
icraggs 31:a51dd239b78e 97 #endif