An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Committer:
Ian Craggs
Date:
Wed Apr 09 11:37:28 2014 +0100
Revision:
7:f9d690fb6dad
Child:
8:c46930bd6c82
Fixed some bugs

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Ian Craggs 7:f9d690fb6dad 1
Ian Craggs 7:f9d690fb6dad 2 #include <sys/types.h>
Ian Craggs 7:f9d690fb6dad 3 #include <sys/socket.h>
Ian Craggs 7:f9d690fb6dad 4 #include <sys/param.h>
Ian Craggs 7:f9d690fb6dad 5 #include <sys/time.h>
Ian Craggs 7:f9d690fb6dad 6 #include <sys/select.h>
Ian Craggs 7:f9d690fb6dad 7 #include <netinet/in.h>
Ian Craggs 7:f9d690fb6dad 8 #include <netinet/tcp.h>
Ian Craggs 7:f9d690fb6dad 9 #include <arpa/inet.h>
Ian Craggs 7:f9d690fb6dad 10 #include <netdb.h>
Ian Craggs 7:f9d690fb6dad 11 #include <stdio.h>
Ian Craggs 7:f9d690fb6dad 12 #include <unistd.h>
Ian Craggs 7:f9d690fb6dad 13 #include <errno.h>
Ian Craggs 7:f9d690fb6dad 14 #include <fcntl.h>
Ian Craggs 7:f9d690fb6dad 15
Ian Craggs 7:f9d690fb6dad 16 #include <stdlib.h>
Ian Craggs 7:f9d690fb6dad 17 #include <string.h>
Ian Craggs 7:f9d690fb6dad 18 #include <signal.h>
Ian Craggs 7:f9d690fb6dad 19
Ian Craggs 7:f9d690fb6dad 20 #include <pthread.h>
Ian Craggs 7:f9d690fb6dad 21
Ian Craggs 7:f9d690fb6dad 22 #include "MQTTClient.h"
Ian Craggs 7:f9d690fb6dad 23 #include "MQTTClient.cpp"
Ian Craggs 7:f9d690fb6dad 24 #include "FP.cpp"
Ian Craggs 7:f9d690fb6dad 25
Ian Craggs 7:f9d690fb6dad 26 #define DEFAULT_STACK_SIZE -1
Ian Craggs 7:f9d690fb6dad 27
Ian Craggs 7:f9d690fb6dad 28 class Thread {
Ian Craggs 7:f9d690fb6dad 29 public:
Ian Craggs 7:f9d690fb6dad 30
Ian Craggs 7:f9d690fb6dad 31 Thread(void* (*fn)(void *parameter), void *parameter=NULL,
Ian Craggs 7:f9d690fb6dad 32 int priority=0,
Ian Craggs 7:f9d690fb6dad 33 uint32_t stack_size=DEFAULT_STACK_SIZE,
Ian Craggs 7:f9d690fb6dad 34 unsigned char *stack_pointer=NULL)
Ian Craggs 7:f9d690fb6dad 35 {
Ian Craggs 7:f9d690fb6dad 36 thread = 0;
Ian Craggs 7:f9d690fb6dad 37 pthread_attr_t attr;
Ian Craggs 7:f9d690fb6dad 38
Ian Craggs 7:f9d690fb6dad 39 pthread_attr_init(&attr);
Ian Craggs 7:f9d690fb6dad 40 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
Ian Craggs 7:f9d690fb6dad 41 if (pthread_create(&thread, &attr, fn, parameter) != 0)
Ian Craggs 7:f9d690fb6dad 42 thread = 0;
Ian Craggs 7:f9d690fb6dad 43 pthread_attr_destroy(&attr);
Ian Craggs 7:f9d690fb6dad 44 }
Ian Craggs 7:f9d690fb6dad 45
Ian Craggs 7:f9d690fb6dad 46 private:
Ian Craggs 7:f9d690fb6dad 47 pthread_t thread;
Ian Craggs 7:f9d690fb6dad 48 };
Ian Craggs 7:f9d690fb6dad 49
Ian Craggs 7:f9d690fb6dad 50
Ian Craggs 7:f9d690fb6dad 51 class IPStack
Ian Craggs 7:f9d690fb6dad 52 {
Ian Craggs 7:f9d690fb6dad 53 public:
Ian Craggs 7:f9d690fb6dad 54 IPStack()
Ian Craggs 7:f9d690fb6dad 55 {
Ian Craggs 7:f9d690fb6dad 56
Ian Craggs 7:f9d690fb6dad 57 }
Ian Craggs 7:f9d690fb6dad 58
Ian Craggs 7:f9d690fb6dad 59 int Socket_error(const char* aString)
Ian Craggs 7:f9d690fb6dad 60 {
Ian Craggs 7:f9d690fb6dad 61
Ian Craggs 7:f9d690fb6dad 62 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
Ian Craggs 7:f9d690fb6dad 63 {
Ian Craggs 7:f9d690fb6dad 64 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
Ian Craggs 7:f9d690fb6dad 65 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
Ian Craggs 7:f9d690fb6dad 66 }
Ian Craggs 7:f9d690fb6dad 67 return errno;
Ian Craggs 7:f9d690fb6dad 68 }
Ian Craggs 7:f9d690fb6dad 69
Ian Craggs 7:f9d690fb6dad 70 int connect(const char* hostname, int port)
Ian Craggs 7:f9d690fb6dad 71 {
Ian Craggs 7:f9d690fb6dad 72 int type = SOCK_STREAM;
Ian Craggs 7:f9d690fb6dad 73 struct sockaddr_in address;
Ian Craggs 7:f9d690fb6dad 74 int rc = -1;
Ian Craggs 7:f9d690fb6dad 75 sa_family_t family = AF_INET;
Ian Craggs 7:f9d690fb6dad 76 struct addrinfo *result = NULL;
Ian Craggs 7:f9d690fb6dad 77 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
Ian Craggs 7:f9d690fb6dad 78
Ian Craggs 7:f9d690fb6dad 79 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
Ian Craggs 7:f9d690fb6dad 80 {
Ian Craggs 7:f9d690fb6dad 81 struct addrinfo* res = result;
Ian Craggs 7:f9d690fb6dad 82
Ian Craggs 7:f9d690fb6dad 83 /* prefer ip4 addresses */
Ian Craggs 7:f9d690fb6dad 84 while (res)
Ian Craggs 7:f9d690fb6dad 85 {
Ian Craggs 7:f9d690fb6dad 86 if (res->ai_family == AF_INET)
Ian Craggs 7:f9d690fb6dad 87 {
Ian Craggs 7:f9d690fb6dad 88 result = res;
Ian Craggs 7:f9d690fb6dad 89 break;
Ian Craggs 7:f9d690fb6dad 90 }
Ian Craggs 7:f9d690fb6dad 91 res = res->ai_next;
Ian Craggs 7:f9d690fb6dad 92 }
Ian Craggs 7:f9d690fb6dad 93
Ian Craggs 7:f9d690fb6dad 94 if (result->ai_family == AF_INET)
Ian Craggs 7:f9d690fb6dad 95 {
Ian Craggs 7:f9d690fb6dad 96 address.sin_port = htons(port);
Ian Craggs 7:f9d690fb6dad 97 address.sin_family = family = AF_INET;
Ian Craggs 7:f9d690fb6dad 98 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
Ian Craggs 7:f9d690fb6dad 99 }
Ian Craggs 7:f9d690fb6dad 100 else
Ian Craggs 7:f9d690fb6dad 101 rc = -1;
Ian Craggs 7:f9d690fb6dad 102
Ian Craggs 7:f9d690fb6dad 103 freeaddrinfo(result);
Ian Craggs 7:f9d690fb6dad 104 }
Ian Craggs 7:f9d690fb6dad 105
Ian Craggs 7:f9d690fb6dad 106 if (rc == 0)
Ian Craggs 7:f9d690fb6dad 107 {
Ian Craggs 7:f9d690fb6dad 108 mysock = socket(family, type, 0);
Ian Craggs 7:f9d690fb6dad 109 if (mysock != -1)
Ian Craggs 7:f9d690fb6dad 110 {
Ian Craggs 7:f9d690fb6dad 111 int opt = 1;
Ian Craggs 7:f9d690fb6dad 112
Ian Craggs 7:f9d690fb6dad 113 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Ian Craggs 7:f9d690fb6dad 114 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
Ian Craggs 7:f9d690fb6dad 115
Ian Craggs 7:f9d690fb6dad 116 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
Ian Craggs 7:f9d690fb6dad 117 }
Ian Craggs 7:f9d690fb6dad 118 }
Ian Craggs 7:f9d690fb6dad 119
Ian Craggs 7:f9d690fb6dad 120 return rc;
Ian Craggs 7:f9d690fb6dad 121 }
Ian Craggs 7:f9d690fb6dad 122
Ian Craggs 7:f9d690fb6dad 123 int read(char* buffer, int len, int timeout)
Ian Craggs 7:f9d690fb6dad 124 {
Ian Craggs 7:f9d690fb6dad 125 printf("reading %d bytes\n", len);
Ian Craggs 7:f9d690fb6dad 126 int rc = ::recv(mysock, buffer, (size_t)len, 0);
Ian Craggs 7:f9d690fb6dad 127 if (rc == -1)
Ian Craggs 7:f9d690fb6dad 128 Socket_error("read");
Ian Craggs 7:f9d690fb6dad 129 printf("read %d bytes\n", rc);
Ian Craggs 7:f9d690fb6dad 130 return rc;
Ian Craggs 7:f9d690fb6dad 131 }
Ian Craggs 7:f9d690fb6dad 132
Ian Craggs 7:f9d690fb6dad 133 int write(char* buffer, int len, int timeout)
Ian Craggs 7:f9d690fb6dad 134 {
Ian Craggs 7:f9d690fb6dad 135 return ::write(mysock, buffer, len);
Ian Craggs 7:f9d690fb6dad 136 }
Ian Craggs 7:f9d690fb6dad 137
Ian Craggs 7:f9d690fb6dad 138 private:
Ian Craggs 7:f9d690fb6dad 139
Ian Craggs 7:f9d690fb6dad 140 int mysock;
Ian Craggs 7:f9d690fb6dad 141
Ian Craggs 7:f9d690fb6dad 142 };
Ian Craggs 7:f9d690fb6dad 143
Ian Craggs 7:f9d690fb6dad 144
Ian Craggs 7:f9d690fb6dad 145 class Timer
Ian Craggs 7:f9d690fb6dad 146 {
Ian Craggs 7:f9d690fb6dad 147 public:
Ian Craggs 7:f9d690fb6dad 148
Ian Craggs 7:f9d690fb6dad 149 Timer()
Ian Craggs 7:f9d690fb6dad 150 {
Ian Craggs 7:f9d690fb6dad 151 reset();
Ian Craggs 7:f9d690fb6dad 152 }
Ian Craggs 7:f9d690fb6dad 153
Ian Craggs 7:f9d690fb6dad 154 void start()
Ian Craggs 7:f9d690fb6dad 155 {
Ian Craggs 7:f9d690fb6dad 156 if (running)
Ian Craggs 7:f9d690fb6dad 157 return;
Ian Craggs 7:f9d690fb6dad 158 if (!timerisset(&stop_time))
Ian Craggs 7:f9d690fb6dad 159 gettimeofday(&start_time, NULL);
Ian Craggs 7:f9d690fb6dad 160 else
Ian Craggs 7:f9d690fb6dad 161 {
Ian Craggs 7:f9d690fb6dad 162 struct timeval now, res;
Ian Craggs 7:f9d690fb6dad 163
Ian Craggs 7:f9d690fb6dad 164 gettimeofday(&now, NULL);
Ian Craggs 7:f9d690fb6dad 165
Ian Craggs 7:f9d690fb6dad 166 timersub(&now, &stop_time, &res); // interval to be added to start time
Ian Craggs 7:f9d690fb6dad 167 timeradd(&start_time, &res, &stop_time);
Ian Craggs 7:f9d690fb6dad 168 stop_time = start_time;
Ian Craggs 7:f9d690fb6dad 169 timerclear(&stop_time);
Ian Craggs 7:f9d690fb6dad 170 }
Ian Craggs 7:f9d690fb6dad 171 running = true;
Ian Craggs 7:f9d690fb6dad 172 }
Ian Craggs 7:f9d690fb6dad 173
Ian Craggs 7:f9d690fb6dad 174 void stop()
Ian Craggs 7:f9d690fb6dad 175 {
Ian Craggs 7:f9d690fb6dad 176 if (running)
Ian Craggs 7:f9d690fb6dad 177 {
Ian Craggs 7:f9d690fb6dad 178 gettimeofday(&stop_time, NULL);
Ian Craggs 7:f9d690fb6dad 179 running = false;
Ian Craggs 7:f9d690fb6dad 180 }
Ian Craggs 7:f9d690fb6dad 181 }
Ian Craggs 7:f9d690fb6dad 182
Ian Craggs 7:f9d690fb6dad 183 void reset()
Ian Craggs 7:f9d690fb6dad 184 {
Ian Craggs 7:f9d690fb6dad 185 timerclear(&start_time);
Ian Craggs 7:f9d690fb6dad 186 timerclear(&stop_time);
Ian Craggs 7:f9d690fb6dad 187 running = false;
Ian Craggs 7:f9d690fb6dad 188 }
Ian Craggs 7:f9d690fb6dad 189
Ian Craggs 7:f9d690fb6dad 190 int read_ms() // get the time passed in milli-seconds
Ian Craggs 7:f9d690fb6dad 191 {
Ian Craggs 7:f9d690fb6dad 192 struct timeval now, res;
Ian Craggs 7:f9d690fb6dad 193
Ian Craggs 7:f9d690fb6dad 194 gettimeofday(&now, NULL);
Ian Craggs 7:f9d690fb6dad 195 timersub(&now, &start_time, &res);
Ian Craggs 7:f9d690fb6dad 196 return (res.tv_sec)*1000 + (res.tv_usec)/1000;
Ian Craggs 7:f9d690fb6dad 197 }
Ian Craggs 7:f9d690fb6dad 198
Ian Craggs 7:f9d690fb6dad 199 private:
Ian Craggs 7:f9d690fb6dad 200
Ian Craggs 7:f9d690fb6dad 201 struct timeval start_time, stop_time;
Ian Craggs 7:f9d690fb6dad 202 bool running;
Ian Craggs 7:f9d690fb6dad 203
Ian Craggs 7:f9d690fb6dad 204 };
Ian Craggs 7:f9d690fb6dad 205
Ian Craggs 7:f9d690fb6dad 206
Ian Craggs 7:f9d690fb6dad 207 void messageArrived(MQTT::Message* message)
Ian Craggs 7:f9d690fb6dad 208 {
Ian Craggs 7:f9d690fb6dad 209 }
Ian Craggs 7:f9d690fb6dad 210
Ian Craggs 7:f9d690fb6dad 211
Ian Craggs 7:f9d690fb6dad 212 int main(int argc, char* argv[])
Ian Craggs 7:f9d690fb6dad 213 {
Ian Craggs 7:f9d690fb6dad 214 IPStack ipstack = IPStack();
Ian Craggs 7:f9d690fb6dad 215 Timer t;
Ian Craggs 7:f9d690fb6dad 216 FP<void, MQTT::Message*> messageArrivedPointer;
Ian Craggs 7:f9d690fb6dad 217
Ian Craggs 7:f9d690fb6dad 218 messageArrivedPointer.attach(messageArrived);
Ian Craggs 7:f9d690fb6dad 219
Ian Craggs 7:f9d690fb6dad 220 int rc = ipstack.connect("127.0.0.1", 1883);
Ian Craggs 7:f9d690fb6dad 221 printf("rc from TCP connect is %d\n", rc);
Ian Craggs 7:f9d690fb6dad 222
Ian Craggs 7:f9d690fb6dad 223 MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t);
Ian Craggs 7:f9d690fb6dad 224
Ian Craggs 7:f9d690fb6dad 225 printf("constructed\n");
Ian Craggs 7:f9d690fb6dad 226
Ian Craggs 7:f9d690fb6dad 227 rc = client.connect();
Ian Craggs 7:f9d690fb6dad 228 printf("rc from connect is %d\n", rc);
Ian Craggs 7:f9d690fb6dad 229
Ian Craggs 7:f9d690fb6dad 230 rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer);
Ian Craggs 7:f9d690fb6dad 231 sleep(1);
Ian Craggs 7:f9d690fb6dad 232 }