An API for using MQTT over multiple transports

Dependencies:   FP MQTTPacket

Dependents:   Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more

This library is part of the EclipseTM Paho project; specifically the embedded client.

The goals of this API are:

  1. to be independent of any system library: hence templates parameters for networking, timer and threading classes
  2. not to rely on heap storage, only automatic (I think this is a good thing)
  3. to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
Committer:
icraggs
Date:
Wed Apr 09 23:21:54 2014 +0000
Revision:
9:01b8cc7d94cc
Parent:
8:c46930bd6c82
Fix function pointer definitions

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 8:c46930bd6c82 1
icraggs 8:c46930bd6c82 2 #if defined(LINUX)
Ian Craggs 7:f9d690fb6dad 3
Ian Craggs 7:f9d690fb6dad 4 #include <sys/types.h>
Ian Craggs 7:f9d690fb6dad 5 #include <sys/socket.h>
Ian Craggs 7:f9d690fb6dad 6 #include <sys/param.h>
Ian Craggs 7:f9d690fb6dad 7 #include <sys/time.h>
Ian Craggs 7:f9d690fb6dad 8 #include <sys/select.h>
Ian Craggs 7:f9d690fb6dad 9 #include <netinet/in.h>
Ian Craggs 7:f9d690fb6dad 10 #include <netinet/tcp.h>
Ian Craggs 7:f9d690fb6dad 11 #include <arpa/inet.h>
Ian Craggs 7:f9d690fb6dad 12 #include <netdb.h>
Ian Craggs 7:f9d690fb6dad 13 #include <stdio.h>
Ian Craggs 7:f9d690fb6dad 14 #include <unistd.h>
Ian Craggs 7:f9d690fb6dad 15 #include <errno.h>
Ian Craggs 7:f9d690fb6dad 16 #include <fcntl.h>
Ian Craggs 7:f9d690fb6dad 17
Ian Craggs 7:f9d690fb6dad 18 #include <stdlib.h>
Ian Craggs 7:f9d690fb6dad 19 #include <string.h>
Ian Craggs 7:f9d690fb6dad 20 #include <signal.h>
Ian Craggs 7:f9d690fb6dad 21
Ian Craggs 7:f9d690fb6dad 22 #include <pthread.h>
Ian Craggs 7:f9d690fb6dad 23
Ian Craggs 7:f9d690fb6dad 24 #include "MQTTClient.h"
Ian Craggs 7:f9d690fb6dad 25 #include "MQTTClient.cpp"
Ian Craggs 7:f9d690fb6dad 26 #include "FP.cpp"
Ian Craggs 7:f9d690fb6dad 27
Ian Craggs 7:f9d690fb6dad 28 #define DEFAULT_STACK_SIZE -1
Ian Craggs 7:f9d690fb6dad 29
Ian Craggs 7:f9d690fb6dad 30 class Thread {
Ian Craggs 7:f9d690fb6dad 31 public:
Ian Craggs 7:f9d690fb6dad 32
Ian Craggs 7:f9d690fb6dad 33 Thread(void* (*fn)(void *parameter), void *parameter=NULL,
Ian Craggs 7:f9d690fb6dad 34 int priority=0,
Ian Craggs 7:f9d690fb6dad 35 uint32_t stack_size=DEFAULT_STACK_SIZE,
Ian Craggs 7:f9d690fb6dad 36 unsigned char *stack_pointer=NULL)
Ian Craggs 7:f9d690fb6dad 37 {
Ian Craggs 7:f9d690fb6dad 38 thread = 0;
Ian Craggs 7:f9d690fb6dad 39 pthread_attr_t attr;
Ian Craggs 7:f9d690fb6dad 40
Ian Craggs 7:f9d690fb6dad 41 pthread_attr_init(&attr);
Ian Craggs 7:f9d690fb6dad 42 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
Ian Craggs 7:f9d690fb6dad 43 if (pthread_create(&thread, &attr, fn, parameter) != 0)
Ian Craggs 7:f9d690fb6dad 44 thread = 0;
Ian Craggs 7:f9d690fb6dad 45 pthread_attr_destroy(&attr);
Ian Craggs 7:f9d690fb6dad 46 }
Ian Craggs 7:f9d690fb6dad 47
Ian Craggs 7:f9d690fb6dad 48 private:
Ian Craggs 7:f9d690fb6dad 49 pthread_t thread;
Ian Craggs 7:f9d690fb6dad 50 };
Ian Craggs 7:f9d690fb6dad 51
Ian Craggs 7:f9d690fb6dad 52
Ian Craggs 7:f9d690fb6dad 53 class IPStack
Ian Craggs 7:f9d690fb6dad 54 {
Ian Craggs 7:f9d690fb6dad 55 public:
Ian Craggs 7:f9d690fb6dad 56 IPStack()
Ian Craggs 7:f9d690fb6dad 57 {
Ian Craggs 7:f9d690fb6dad 58
Ian Craggs 7:f9d690fb6dad 59 }
Ian Craggs 7:f9d690fb6dad 60
Ian Craggs 7:f9d690fb6dad 61 int Socket_error(const char* aString)
Ian Craggs 7:f9d690fb6dad 62 {
Ian Craggs 7:f9d690fb6dad 63
Ian Craggs 7:f9d690fb6dad 64 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
Ian Craggs 7:f9d690fb6dad 65 {
Ian Craggs 7:f9d690fb6dad 66 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
Ian Craggs 7:f9d690fb6dad 67 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
Ian Craggs 7:f9d690fb6dad 68 }
Ian Craggs 7:f9d690fb6dad 69 return errno;
Ian Craggs 7:f9d690fb6dad 70 }
Ian Craggs 7:f9d690fb6dad 71
Ian Craggs 7:f9d690fb6dad 72 int connect(const char* hostname, int port)
Ian Craggs 7:f9d690fb6dad 73 {
Ian Craggs 7:f9d690fb6dad 74 int type = SOCK_STREAM;
Ian Craggs 7:f9d690fb6dad 75 struct sockaddr_in address;
Ian Craggs 7:f9d690fb6dad 76 int rc = -1;
Ian Craggs 7:f9d690fb6dad 77 sa_family_t family = AF_INET;
Ian Craggs 7:f9d690fb6dad 78 struct addrinfo *result = NULL;
Ian Craggs 7:f9d690fb6dad 79 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
Ian Craggs 7:f9d690fb6dad 80
Ian Craggs 7:f9d690fb6dad 81 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
Ian Craggs 7:f9d690fb6dad 82 {
Ian Craggs 7:f9d690fb6dad 83 struct addrinfo* res = result;
Ian Craggs 7:f9d690fb6dad 84
Ian Craggs 7:f9d690fb6dad 85 /* prefer ip4 addresses */
Ian Craggs 7:f9d690fb6dad 86 while (res)
Ian Craggs 7:f9d690fb6dad 87 {
Ian Craggs 7:f9d690fb6dad 88 if (res->ai_family == AF_INET)
Ian Craggs 7:f9d690fb6dad 89 {
Ian Craggs 7:f9d690fb6dad 90 result = res;
Ian Craggs 7:f9d690fb6dad 91 break;
Ian Craggs 7:f9d690fb6dad 92 }
Ian Craggs 7:f9d690fb6dad 93 res = res->ai_next;
Ian Craggs 7:f9d690fb6dad 94 }
Ian Craggs 7:f9d690fb6dad 95
Ian Craggs 7:f9d690fb6dad 96 if (result->ai_family == AF_INET)
Ian Craggs 7:f9d690fb6dad 97 {
Ian Craggs 7:f9d690fb6dad 98 address.sin_port = htons(port);
Ian Craggs 7:f9d690fb6dad 99 address.sin_family = family = AF_INET;
Ian Craggs 7:f9d690fb6dad 100 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
Ian Craggs 7:f9d690fb6dad 101 }
Ian Craggs 7:f9d690fb6dad 102 else
Ian Craggs 7:f9d690fb6dad 103 rc = -1;
Ian Craggs 7:f9d690fb6dad 104
Ian Craggs 7:f9d690fb6dad 105 freeaddrinfo(result);
Ian Craggs 7:f9d690fb6dad 106 }
Ian Craggs 7:f9d690fb6dad 107
Ian Craggs 7:f9d690fb6dad 108 if (rc == 0)
Ian Craggs 7:f9d690fb6dad 109 {
Ian Craggs 7:f9d690fb6dad 110 mysock = socket(family, type, 0);
Ian Craggs 7:f9d690fb6dad 111 if (mysock != -1)
Ian Craggs 7:f9d690fb6dad 112 {
Ian Craggs 7:f9d690fb6dad 113 int opt = 1;
Ian Craggs 7:f9d690fb6dad 114
Ian Craggs 7:f9d690fb6dad 115 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
Ian Craggs 7:f9d690fb6dad 116 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
Ian Craggs 7:f9d690fb6dad 117
Ian Craggs 7:f9d690fb6dad 118 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
Ian Craggs 7:f9d690fb6dad 119 }
Ian Craggs 7:f9d690fb6dad 120 }
Ian Craggs 7:f9d690fb6dad 121
Ian Craggs 7:f9d690fb6dad 122 return rc;
Ian Craggs 7:f9d690fb6dad 123 }
Ian Craggs 7:f9d690fb6dad 124
Ian Craggs 7:f9d690fb6dad 125 int read(char* buffer, int len, int timeout)
Ian Craggs 7:f9d690fb6dad 126 {
Ian Craggs 7:f9d690fb6dad 127 printf("reading %d bytes\n", len);
Ian Craggs 7:f9d690fb6dad 128 int rc = ::recv(mysock, buffer, (size_t)len, 0);
Ian Craggs 7:f9d690fb6dad 129 if (rc == -1)
Ian Craggs 7:f9d690fb6dad 130 Socket_error("read");
Ian Craggs 7:f9d690fb6dad 131 printf("read %d bytes\n", rc);
Ian Craggs 7:f9d690fb6dad 132 return rc;
Ian Craggs 7:f9d690fb6dad 133 }
Ian Craggs 7:f9d690fb6dad 134
Ian Craggs 7:f9d690fb6dad 135 int write(char* buffer, int len, int timeout)
Ian Craggs 7:f9d690fb6dad 136 {
Ian Craggs 7:f9d690fb6dad 137 return ::write(mysock, buffer, len);
Ian Craggs 7:f9d690fb6dad 138 }
Ian Craggs 7:f9d690fb6dad 139
Ian Craggs 7:f9d690fb6dad 140 private:
Ian Craggs 7:f9d690fb6dad 141
Ian Craggs 7:f9d690fb6dad 142 int mysock;
Ian Craggs 7:f9d690fb6dad 143
Ian Craggs 7:f9d690fb6dad 144 };
Ian Craggs 7:f9d690fb6dad 145
Ian Craggs 7:f9d690fb6dad 146
Ian Craggs 7:f9d690fb6dad 147 class Timer
Ian Craggs 7:f9d690fb6dad 148 {
Ian Craggs 7:f9d690fb6dad 149 public:
Ian Craggs 7:f9d690fb6dad 150
Ian Craggs 7:f9d690fb6dad 151 Timer()
Ian Craggs 7:f9d690fb6dad 152 {
Ian Craggs 7:f9d690fb6dad 153 reset();
Ian Craggs 7:f9d690fb6dad 154 }
Ian Craggs 7:f9d690fb6dad 155
Ian Craggs 7:f9d690fb6dad 156 void start()
Ian Craggs 7:f9d690fb6dad 157 {
Ian Craggs 7:f9d690fb6dad 158 if (running)
Ian Craggs 7:f9d690fb6dad 159 return;
Ian Craggs 7:f9d690fb6dad 160 if (!timerisset(&stop_time))
Ian Craggs 7:f9d690fb6dad 161 gettimeofday(&start_time, NULL);
Ian Craggs 7:f9d690fb6dad 162 else
Ian Craggs 7:f9d690fb6dad 163 {
Ian Craggs 7:f9d690fb6dad 164 struct timeval now, res;
Ian Craggs 7:f9d690fb6dad 165
Ian Craggs 7:f9d690fb6dad 166 gettimeofday(&now, NULL);
Ian Craggs 7:f9d690fb6dad 167
Ian Craggs 7:f9d690fb6dad 168 timersub(&now, &stop_time, &res); // interval to be added to start time
Ian Craggs 7:f9d690fb6dad 169 timeradd(&start_time, &res, &stop_time);
Ian Craggs 7:f9d690fb6dad 170 stop_time = start_time;
Ian Craggs 7:f9d690fb6dad 171 timerclear(&stop_time);
Ian Craggs 7:f9d690fb6dad 172 }
Ian Craggs 7:f9d690fb6dad 173 running = true;
Ian Craggs 7:f9d690fb6dad 174 }
Ian Craggs 7:f9d690fb6dad 175
Ian Craggs 7:f9d690fb6dad 176 void stop()
Ian Craggs 7:f9d690fb6dad 177 {
Ian Craggs 7:f9d690fb6dad 178 if (running)
Ian Craggs 7:f9d690fb6dad 179 {
Ian Craggs 7:f9d690fb6dad 180 gettimeofday(&stop_time, NULL);
Ian Craggs 7:f9d690fb6dad 181 running = false;
Ian Craggs 7:f9d690fb6dad 182 }
Ian Craggs 7:f9d690fb6dad 183 }
Ian Craggs 7:f9d690fb6dad 184
Ian Craggs 7:f9d690fb6dad 185 void reset()
Ian Craggs 7:f9d690fb6dad 186 {
Ian Craggs 7:f9d690fb6dad 187 timerclear(&start_time);
Ian Craggs 7:f9d690fb6dad 188 timerclear(&stop_time);
Ian Craggs 7:f9d690fb6dad 189 running = false;
Ian Craggs 7:f9d690fb6dad 190 }
Ian Craggs 7:f9d690fb6dad 191
Ian Craggs 7:f9d690fb6dad 192 int read_ms() // get the time passed in milli-seconds
Ian Craggs 7:f9d690fb6dad 193 {
Ian Craggs 7:f9d690fb6dad 194 struct timeval now, res;
Ian Craggs 7:f9d690fb6dad 195
Ian Craggs 7:f9d690fb6dad 196 gettimeofday(&now, NULL);
Ian Craggs 7:f9d690fb6dad 197 timersub(&now, &start_time, &res);
Ian Craggs 7:f9d690fb6dad 198 return (res.tv_sec)*1000 + (res.tv_usec)/1000;
Ian Craggs 7:f9d690fb6dad 199 }
Ian Craggs 7:f9d690fb6dad 200
Ian Craggs 7:f9d690fb6dad 201 private:
Ian Craggs 7:f9d690fb6dad 202
Ian Craggs 7:f9d690fb6dad 203 struct timeval start_time, stop_time;
Ian Craggs 7:f9d690fb6dad 204 bool running;
Ian Craggs 7:f9d690fb6dad 205
Ian Craggs 7:f9d690fb6dad 206 };
Ian Craggs 7:f9d690fb6dad 207
Ian Craggs 7:f9d690fb6dad 208
Ian Craggs 7:f9d690fb6dad 209 void messageArrived(MQTT::Message* message)
Ian Craggs 7:f9d690fb6dad 210 {
Ian Craggs 7:f9d690fb6dad 211 }
Ian Craggs 7:f9d690fb6dad 212
Ian Craggs 7:f9d690fb6dad 213
Ian Craggs 7:f9d690fb6dad 214 int main(int argc, char* argv[])
Ian Craggs 7:f9d690fb6dad 215 {
Ian Craggs 7:f9d690fb6dad 216 IPStack ipstack = IPStack();
Ian Craggs 7:f9d690fb6dad 217 Timer t;
Ian Craggs 7:f9d690fb6dad 218 FP<void, MQTT::Message*> messageArrivedPointer;
Ian Craggs 7:f9d690fb6dad 219
Ian Craggs 7:f9d690fb6dad 220 messageArrivedPointer.attach(messageArrived);
Ian Craggs 7:f9d690fb6dad 221
Ian Craggs 7:f9d690fb6dad 222 int rc = ipstack.connect("127.0.0.1", 1883);
Ian Craggs 7:f9d690fb6dad 223 printf("rc from TCP connect is %d\n", rc);
Ian Craggs 7:f9d690fb6dad 224
Ian Craggs 7:f9d690fb6dad 225 MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t);
Ian Craggs 7:f9d690fb6dad 226
Ian Craggs 7:f9d690fb6dad 227 printf("constructed\n");
Ian Craggs 7:f9d690fb6dad 228
Ian Craggs 7:f9d690fb6dad 229 rc = client.connect();
Ian Craggs 7:f9d690fb6dad 230 printf("rc from connect is %d\n", rc);
Ian Craggs 7:f9d690fb6dad 231
Ian Craggs 7:f9d690fb6dad 232 rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer);
Ian Craggs 7:f9d690fb6dad 233 sleep(1);
Ian Craggs 7:f9d690fb6dad 234 }
icraggs 8:c46930bd6c82 235
icraggs 8:c46930bd6c82 236 #endif