M2X MQTT Client for ARM MBED

Dependents:   WNCInterface_M2XMQTTdemo

Committer:
citrusbyte
Date:
Thu Sep 15 10:17:00 2016 +0000
Revision:
1:c4d41ff3c58e
Parent:
0:5a4798128c36
Update library to latest version

Who changed what in which revision?

UserRevisionLine numberNew contents of line
citrusbyte 0:5a4798128c36 1 #include "mbed.h"
citrusbyte 0:5a4798128c36 2 #define USER_AGENT "User-Agent: M2X Mbed MQTT Client/" M2X_VERSION
citrusbyte 0:5a4798128c36 3
citrusbyte 0:5a4798128c36 4 #ifdef DEBUG
citrusbyte 0:5a4798128c36 5 #define DBG(fmt_, data_) printf((fmt_), (data_))
citrusbyte 0:5a4798128c36 6 #define DBGLN(fmt_, data_) printf((fmt_), (data_)); printf("\n")
citrusbyte 0:5a4798128c36 7 #define DBGLNEND printf("\n")
citrusbyte 0:5a4798128c36 8 #endif /* DEBUG */
citrusbyte 0:5a4798128c36 9
citrusbyte 0:5a4798128c36 10 #define F(str) str
citrusbyte 0:5a4798128c36 11
citrusbyte 0:5a4798128c36 12 class M2XTimer {
citrusbyte 0:5a4798128c36 13 public:
citrusbyte 0:5a4798128c36 14 void start() { _timer.start(); }
citrusbyte 0:5a4798128c36 15
citrusbyte 0:5a4798128c36 16 unsigned long read_ms() {
citrusbyte 0:5a4798128c36 17 // In case of a timestamp overflow, we reset the server timestamp recorded.
citrusbyte 0:5a4798128c36 18 // Notice that unlike Arduino, mbed would overflow every 30 minutes,
citrusbyte 0:5a4798128c36 19 // so if 2 calls to this API are more than 30 minutes apart, we are
citrusbyte 0:5a4798128c36 20 // likely to run into troubles. This is a limitation of the current
citrusbyte 0:5a4798128c36 21 // mbed platform. However, we argue that it might be a rare case that
citrusbyte 0:5a4798128c36 22 // 2 calls to this are 30 minutes apart. In most cases, we would call
citrusbyte 0:5a4798128c36 23 // this API every few seconds or minutes, this won't be a huge problem.
citrusbyte 0:5a4798128c36 24 // However, if you have a use case that would require 2 intervening
citrusbyte 0:5a4798128c36 25 // calls to this be 30 minutes apart, you might want to leverage a RTC
citrusbyte 0:5a4798128c36 26 // clock instead of the simple ticker here, or call TimeService::reset()
citrusbyte 0:5a4798128c36 27 // before making an API call to sync current time.
citrusbyte 0:5a4798128c36 28 return _timer.read_ms();
citrusbyte 0:5a4798128c36 29 }
citrusbyte 0:5a4798128c36 30 private:
citrusbyte 0:5a4798128c36 31 Timer _timer;
citrusbyte 0:5a4798128c36 32 };
citrusbyte 0:5a4798128c36 33
citrusbyte 0:5a4798128c36 34 #include "TCPSocketConnection.h"
citrusbyte 0:5a4798128c36 35
citrusbyte 0:5a4798128c36 36 #include <stddef.h>
citrusbyte 0:5a4798128c36 37 #include <stdint.h>
citrusbyte 0:5a4798128c36 38 #include <stdio.h>
citrusbyte 0:5a4798128c36 39 #include <string.h>
citrusbyte 0:5a4798128c36 40
citrusbyte 0:5a4798128c36 41 #ifdef __cplusplus
citrusbyte 0:5a4798128c36 42 extern "C" {
citrusbyte 0:5a4798128c36 43 #endif
citrusbyte 0:5a4798128c36 44
citrusbyte 0:5a4798128c36 45 void delay(int ms)
citrusbyte 0:5a4798128c36 46 {
citrusbyte 0:5a4798128c36 47 wait_ms(ms);
citrusbyte 0:5a4798128c36 48 }
citrusbyte 0:5a4798128c36 49
citrusbyte 0:5a4798128c36 50 char* strdup(const char* s)
citrusbyte 0:5a4798128c36 51 {
citrusbyte 0:5a4798128c36 52 char* ret = (char*) malloc(strlen(s) + 1);
citrusbyte 0:5a4798128c36 53 if (ret == NULL) { return ret;}
citrusbyte 0:5a4798128c36 54 return strcpy(ret, s);
citrusbyte 0:5a4798128c36 55 }
citrusbyte 0:5a4798128c36 56
citrusbyte 0:5a4798128c36 57 #ifdef __cplusplus
citrusbyte 0:5a4798128c36 58 }
citrusbyte 0:5a4798128c36 59 #endif
citrusbyte 0:5a4798128c36 60
citrusbyte 0:5a4798128c36 61 class Print {
citrusbyte 0:5a4798128c36 62 public:
citrusbyte 0:5a4798128c36 63 size_t print(const char* s);
citrusbyte 0:5a4798128c36 64 size_t print(char c);
citrusbyte 0:5a4798128c36 65 size_t print(int n);
citrusbyte 0:5a4798128c36 66 size_t print(long n);
citrusbyte 0:5a4798128c36 67 size_t print(double n, int digits = 2);
citrusbyte 0:5a4798128c36 68
citrusbyte 0:5a4798128c36 69 size_t println(const char* s);
citrusbyte 0:5a4798128c36 70 size_t println(char c);
citrusbyte 0:5a4798128c36 71 size_t println(int n);
citrusbyte 0:5a4798128c36 72 size_t println(long n);
citrusbyte 0:5a4798128c36 73 size_t println(double n, int digits = 2);
citrusbyte 0:5a4798128c36 74 size_t println();
citrusbyte 0:5a4798128c36 75
citrusbyte 0:5a4798128c36 76 virtual size_t write(uint8_t c) = 0;
citrusbyte 0:5a4798128c36 77 virtual size_t write(const uint8_t* buf, size_t size);
citrusbyte 0:5a4798128c36 78 };
citrusbyte 0:5a4798128c36 79
citrusbyte 0:5a4798128c36 80 size_t Print::write(const uint8_t* buf, size_t size) {
citrusbyte 0:5a4798128c36 81 size_t ret = 0;
citrusbyte 0:5a4798128c36 82 while (size--) {
citrusbyte 0:5a4798128c36 83 ret += write(*buf++);
citrusbyte 0:5a4798128c36 84 }
citrusbyte 0:5a4798128c36 85 return ret;
citrusbyte 0:5a4798128c36 86 }
citrusbyte 0:5a4798128c36 87
citrusbyte 0:5a4798128c36 88 size_t Print::print(const char* s) {
citrusbyte 0:5a4798128c36 89 return write((const uint8_t*)s, strlen(s));
citrusbyte 0:5a4798128c36 90 }
citrusbyte 0:5a4798128c36 91
citrusbyte 0:5a4798128c36 92 size_t Print::print(char c) {
citrusbyte 0:5a4798128c36 93 return write(c);
citrusbyte 0:5a4798128c36 94 }
citrusbyte 0:5a4798128c36 95
citrusbyte 0:5a4798128c36 96 size_t Print::print(int n) {
citrusbyte 0:5a4798128c36 97 return print((long) n);
citrusbyte 0:5a4798128c36 98 }
citrusbyte 0:5a4798128c36 99
citrusbyte 0:5a4798128c36 100 size_t Print::print(long n) {
citrusbyte 0:5a4798128c36 101 char buf[8 * sizeof(long) + 1];
citrusbyte 0:5a4798128c36 102 snprintf(buf, sizeof(buf), "%ld", n);
citrusbyte 0:5a4798128c36 103 return print(buf);
citrusbyte 0:5a4798128c36 104 }
citrusbyte 0:5a4798128c36 105
citrusbyte 0:5a4798128c36 106 // Digits are ignored for now
citrusbyte 0:5a4798128c36 107 size_t Print::print(double n, int digits) {
citrusbyte 0:5a4798128c36 108 char buf[65];
citrusbyte 0:5a4798128c36 109 snprintf(buf, sizeof(buf), "%g", n);
citrusbyte 0:5a4798128c36 110 return print(buf);
citrusbyte 0:5a4798128c36 111 }
citrusbyte 0:5a4798128c36 112
citrusbyte 0:5a4798128c36 113 size_t Print::println(const char* s) {
citrusbyte 0:5a4798128c36 114 return print(s) + println();
citrusbyte 0:5a4798128c36 115 }
citrusbyte 0:5a4798128c36 116
citrusbyte 0:5a4798128c36 117 size_t Print::println(char c) {
citrusbyte 0:5a4798128c36 118 return print(c) + println();
citrusbyte 0:5a4798128c36 119 }
citrusbyte 0:5a4798128c36 120
citrusbyte 0:5a4798128c36 121 size_t Print::println(int n) {
citrusbyte 0:5a4798128c36 122 return print(n) + println();
citrusbyte 0:5a4798128c36 123 }
citrusbyte 0:5a4798128c36 124
citrusbyte 0:5a4798128c36 125 size_t Print::println(long n) {
citrusbyte 0:5a4798128c36 126 return print(n) + println();
citrusbyte 0:5a4798128c36 127 }
citrusbyte 0:5a4798128c36 128
citrusbyte 0:5a4798128c36 129 size_t Print::println(double n, int digits) {
citrusbyte 0:5a4798128c36 130 return print(n, digits) + println();
citrusbyte 0:5a4798128c36 131 }
citrusbyte 0:5a4798128c36 132
citrusbyte 0:5a4798128c36 133 size_t Print::println() {
citrusbyte 0:5a4798128c36 134 return print('\r') + print('\n');
citrusbyte 0:5a4798128c36 135 }
citrusbyte 0:5a4798128c36 136
citrusbyte 0:5a4798128c36 137 /*
citrusbyte 0:5a4798128c36 138 * TCP Client
citrusbyte 0:5a4798128c36 139 */
citrusbyte 0:5a4798128c36 140 class Client : public Print {
citrusbyte 0:5a4798128c36 141 public:
citrusbyte 0:5a4798128c36 142 Client();
citrusbyte 0:5a4798128c36 143 ~Client();
citrusbyte 0:5a4798128c36 144
citrusbyte 0:5a4798128c36 145 virtual int connect(const char *host, uint16_t port);
citrusbyte 0:5a4798128c36 146 virtual size_t write(uint8_t);
citrusbyte 0:5a4798128c36 147 virtual size_t write(const uint8_t *buf, size_t size);
citrusbyte 0:5a4798128c36 148 virtual int available();
citrusbyte 0:5a4798128c36 149 virtual int read();
citrusbyte 0:5a4798128c36 150 virtual void flush();
citrusbyte 0:5a4798128c36 151 virtual void stop();
citrusbyte 0:5a4798128c36 152 virtual uint8_t connected();
citrusbyte 0:5a4798128c36 153 private:
citrusbyte 0:5a4798128c36 154 virtual int read(uint8_t *buf, size_t size);
citrusbyte 0:5a4798128c36 155 void _fillin(void);
citrusbyte 0:5a4798128c36 156 uint8_t _inbuf[128];
citrusbyte 0:5a4798128c36 157 uint8_t _incnt;
citrusbyte 0:5a4798128c36 158 void _flushout(void);
citrusbyte 0:5a4798128c36 159 uint8_t _outbuf[128];
citrusbyte 0:5a4798128c36 160 uint8_t _outcnt;
citrusbyte 0:5a4798128c36 161 TCPSocketConnection _sock;
citrusbyte 0:5a4798128c36 162 };
citrusbyte 0:5a4798128c36 163
citrusbyte 0:5a4798128c36 164 Client::Client() : _incnt(0), _outcnt(0), _sock() {
citrusbyte 0:5a4798128c36 165 _sock.set_blocking(false, 1500);
citrusbyte 0:5a4798128c36 166 }
citrusbyte 0:5a4798128c36 167
citrusbyte 0:5a4798128c36 168 Client::~Client() {
citrusbyte 0:5a4798128c36 169 }
citrusbyte 0:5a4798128c36 170
citrusbyte 0:5a4798128c36 171 int Client::connect(const char *host, uint16_t port) {
citrusbyte 0:5a4798128c36 172 return _sock.connect(host, port) == 0;
citrusbyte 0:5a4798128c36 173 }
citrusbyte 0:5a4798128c36 174
citrusbyte 0:5a4798128c36 175 size_t Client::write(uint8_t b) {
citrusbyte 0:5a4798128c36 176 return write(&b, 1);
citrusbyte 0:5a4798128c36 177 }
citrusbyte 0:5a4798128c36 178
citrusbyte 0:5a4798128c36 179 size_t Client::write(const uint8_t *buf, size_t size) {
citrusbyte 0:5a4798128c36 180 size_t cnt = 0;
citrusbyte 0:5a4798128c36 181 while (size) {
citrusbyte 0:5a4798128c36 182 int tmp = sizeof(_outbuf) - _outcnt;
citrusbyte 0:5a4798128c36 183 if (tmp > size) tmp = size;
citrusbyte 0:5a4798128c36 184 memcpy(_outbuf + _outcnt, buf, tmp);
citrusbyte 0:5a4798128c36 185 _outcnt += tmp;
citrusbyte 0:5a4798128c36 186 buf += tmp;
citrusbyte 0:5a4798128c36 187 size -= tmp;
citrusbyte 0:5a4798128c36 188 cnt += tmp;
citrusbyte 0:5a4798128c36 189 // if no space flush it
citrusbyte 0:5a4798128c36 190 if (_outcnt == sizeof(_outbuf))
citrusbyte 0:5a4798128c36 191 _flushout();
citrusbyte 0:5a4798128c36 192 }
citrusbyte 0:5a4798128c36 193 return cnt;
citrusbyte 0:5a4798128c36 194 }
citrusbyte 0:5a4798128c36 195
citrusbyte 0:5a4798128c36 196 void Client::_flushout(void)
citrusbyte 0:5a4798128c36 197 {
citrusbyte 0:5a4798128c36 198 if (_outcnt > 0) {
citrusbyte 0:5a4798128c36 199 // NOTE: we know it's dangerous to cast from (const uint8_t *) to (char *),
citrusbyte 0:5a4798128c36 200 // but we are trying to maintain a stable interface between the Arduino
citrusbyte 0:5a4798128c36 201 // one and the mbed one. What's more, while TCPSocketConnection has no
citrusbyte 0:5a4798128c36 202 // intention of modifying the data here, it requires us to send a (char *)
citrusbyte 0:5a4798128c36 203 // typed data. So we belive it's safe to do the cast here.
citrusbyte 0:5a4798128c36 204 _sock.send_all(const_cast<char*>((const char*) _outbuf), _outcnt);
citrusbyte 0:5a4798128c36 205 _outcnt = 0;
citrusbyte 0:5a4798128c36 206 }
citrusbyte 0:5a4798128c36 207 }
citrusbyte 0:5a4798128c36 208
citrusbyte 0:5a4798128c36 209 void Client::_fillin(void)
citrusbyte 0:5a4798128c36 210 {
citrusbyte 0:5a4798128c36 211 int tmp = sizeof(_inbuf) - _incnt;
citrusbyte 0:5a4798128c36 212 if (tmp) {
citrusbyte 0:5a4798128c36 213 tmp = _sock.receive_all((char*)_inbuf + _incnt, tmp);
citrusbyte 0:5a4798128c36 214 if (tmp > 0)
citrusbyte 0:5a4798128c36 215 _incnt += tmp;
citrusbyte 0:5a4798128c36 216 }
citrusbyte 0:5a4798128c36 217 }
citrusbyte 0:5a4798128c36 218
citrusbyte 0:5a4798128c36 219 void Client::flush() {
citrusbyte 0:5a4798128c36 220 _flushout();
citrusbyte 0:5a4798128c36 221 }
citrusbyte 0:5a4798128c36 222
citrusbyte 0:5a4798128c36 223 int Client::available() {
citrusbyte 0:5a4798128c36 224 if (_incnt == 0) {
citrusbyte 0:5a4798128c36 225 _flushout();
citrusbyte 0:5a4798128c36 226 _fillin();
citrusbyte 0:5a4798128c36 227 }
citrusbyte 0:5a4798128c36 228 return (_incnt > 0) ? 1 : 0;
citrusbyte 0:5a4798128c36 229 }
citrusbyte 0:5a4798128c36 230
citrusbyte 0:5a4798128c36 231 int Client::read() {
citrusbyte 0:5a4798128c36 232 uint8_t ch;
citrusbyte 0:5a4798128c36 233 return (read(&ch, 1) == 1) ? ch : -1;
citrusbyte 0:5a4798128c36 234 }
citrusbyte 0:5a4798128c36 235
citrusbyte 0:5a4798128c36 236 int Client::read(uint8_t *buf, size_t size) {
citrusbyte 0:5a4798128c36 237 int cnt = 0;
citrusbyte 0:5a4798128c36 238 while (size) {
citrusbyte 0:5a4798128c36 239 // need more
citrusbyte 0:5a4798128c36 240 if (size > _incnt) {
citrusbyte 0:5a4798128c36 241 _flushout();
citrusbyte 0:5a4798128c36 242 _fillin();
citrusbyte 0:5a4798128c36 243 }
citrusbyte 0:5a4798128c36 244 if (_incnt > 0) {
citrusbyte 0:5a4798128c36 245 int tmp = _incnt;
citrusbyte 0:5a4798128c36 246 if (tmp > size) tmp = size;
citrusbyte 0:5a4798128c36 247 memcpy(buf, _inbuf, tmp);
citrusbyte 0:5a4798128c36 248 if (tmp != _incnt)
citrusbyte 0:5a4798128c36 249 memmove(_inbuf, _inbuf + tmp, _incnt - tmp);
citrusbyte 0:5a4798128c36 250 _incnt -= tmp;
citrusbyte 0:5a4798128c36 251 size -= tmp;
citrusbyte 0:5a4798128c36 252 buf += tmp;
citrusbyte 0:5a4798128c36 253 cnt += tmp;
citrusbyte 0:5a4798128c36 254 } else // no data
citrusbyte 0:5a4798128c36 255 break;
citrusbyte 0:5a4798128c36 256 }
citrusbyte 0:5a4798128c36 257 return cnt;
citrusbyte 0:5a4798128c36 258 }
citrusbyte 0:5a4798128c36 259
citrusbyte 0:5a4798128c36 260 void Client::stop() {
citrusbyte 0:5a4798128c36 261 _sock.close();
citrusbyte 0:5a4798128c36 262 }
citrusbyte 0:5a4798128c36 263
citrusbyte 0:5a4798128c36 264 uint8_t Client::connected() {
citrusbyte 0:5a4798128c36 265 return _sock.is_connected() ? 1 : 0;
citrusbyte 0:5a4798128c36 266 }