An API for using MQTT over multiple transports
Dependencies: FP MQTTPacket
Dependents: Cellular_HelloMQTT IoTStarterKit GSwifiInterface_HelloMQTT IBMIoTClientEthernetExample ... more
This library is part of the EclipseTM Paho project; specifically the embedded client.
The goals of this API are:
- to be independent of any system library: hence templates parameters for networking, timer and threading classes
- not to rely on heap storage, only automatic (I think this is a good thing)
- to limit memory use, for instance by defining the size of the buffers and arrays used at object creation time
linux_main.cpp@9:01b8cc7d94cc, 2014-04-09 (annotated)
- Committer:
- icraggs
- Date:
- Wed Apr 09 23:21:54 2014 +0000
- Revision:
- 9:01b8cc7d94cc
- Parent:
- 8:c46930bd6c82
Fix function pointer definitions
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
icraggs | 8:c46930bd6c82 | 1 | |
icraggs | 8:c46930bd6c82 | 2 | #if defined(LINUX) |
Ian Craggs |
7:f9d690fb6dad | 3 | |
Ian Craggs |
7:f9d690fb6dad | 4 | #include <sys/types.h> |
Ian Craggs |
7:f9d690fb6dad | 5 | #include <sys/socket.h> |
Ian Craggs |
7:f9d690fb6dad | 6 | #include <sys/param.h> |
Ian Craggs |
7:f9d690fb6dad | 7 | #include <sys/time.h> |
Ian Craggs |
7:f9d690fb6dad | 8 | #include <sys/select.h> |
Ian Craggs |
7:f9d690fb6dad | 9 | #include <netinet/in.h> |
Ian Craggs |
7:f9d690fb6dad | 10 | #include <netinet/tcp.h> |
Ian Craggs |
7:f9d690fb6dad | 11 | #include <arpa/inet.h> |
Ian Craggs |
7:f9d690fb6dad | 12 | #include <netdb.h> |
Ian Craggs |
7:f9d690fb6dad | 13 | #include <stdio.h> |
Ian Craggs |
7:f9d690fb6dad | 14 | #include <unistd.h> |
Ian Craggs |
7:f9d690fb6dad | 15 | #include <errno.h> |
Ian Craggs |
7:f9d690fb6dad | 16 | #include <fcntl.h> |
Ian Craggs |
7:f9d690fb6dad | 17 | |
Ian Craggs |
7:f9d690fb6dad | 18 | #include <stdlib.h> |
Ian Craggs |
7:f9d690fb6dad | 19 | #include <string.h> |
Ian Craggs |
7:f9d690fb6dad | 20 | #include <signal.h> |
Ian Craggs |
7:f9d690fb6dad | 21 | |
Ian Craggs |
7:f9d690fb6dad | 22 | #include <pthread.h> |
Ian Craggs |
7:f9d690fb6dad | 23 | |
Ian Craggs |
7:f9d690fb6dad | 24 | #include "MQTTClient.h" |
Ian Craggs |
7:f9d690fb6dad | 25 | #include "MQTTClient.cpp" |
Ian Craggs |
7:f9d690fb6dad | 26 | #include "FP.cpp" |
Ian Craggs |
7:f9d690fb6dad | 27 | |
Ian Craggs |
7:f9d690fb6dad | 28 | #define DEFAULT_STACK_SIZE -1 |
Ian Craggs |
7:f9d690fb6dad | 29 | |
Ian Craggs |
7:f9d690fb6dad | 30 | class Thread { |
Ian Craggs |
7:f9d690fb6dad | 31 | public: |
Ian Craggs |
7:f9d690fb6dad | 32 | |
Ian Craggs |
7:f9d690fb6dad | 33 | Thread(void* (*fn)(void *parameter), void *parameter=NULL, |
Ian Craggs |
7:f9d690fb6dad | 34 | int priority=0, |
Ian Craggs |
7:f9d690fb6dad | 35 | uint32_t stack_size=DEFAULT_STACK_SIZE, |
Ian Craggs |
7:f9d690fb6dad | 36 | unsigned char *stack_pointer=NULL) |
Ian Craggs |
7:f9d690fb6dad | 37 | { |
Ian Craggs |
7:f9d690fb6dad | 38 | thread = 0; |
Ian Craggs |
7:f9d690fb6dad | 39 | pthread_attr_t attr; |
Ian Craggs |
7:f9d690fb6dad | 40 | |
Ian Craggs |
7:f9d690fb6dad | 41 | pthread_attr_init(&attr); |
Ian Craggs |
7:f9d690fb6dad | 42 | pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); |
Ian Craggs |
7:f9d690fb6dad | 43 | if (pthread_create(&thread, &attr, fn, parameter) != 0) |
Ian Craggs |
7:f9d690fb6dad | 44 | thread = 0; |
Ian Craggs |
7:f9d690fb6dad | 45 | pthread_attr_destroy(&attr); |
Ian Craggs |
7:f9d690fb6dad | 46 | } |
Ian Craggs |
7:f9d690fb6dad | 47 | |
Ian Craggs |
7:f9d690fb6dad | 48 | private: |
Ian Craggs |
7:f9d690fb6dad | 49 | pthread_t thread; |
Ian Craggs |
7:f9d690fb6dad | 50 | }; |
Ian Craggs |
7:f9d690fb6dad | 51 | |
Ian Craggs |
7:f9d690fb6dad | 52 | |
Ian Craggs |
7:f9d690fb6dad | 53 | class IPStack |
Ian Craggs |
7:f9d690fb6dad | 54 | { |
Ian Craggs |
7:f9d690fb6dad | 55 | public: |
Ian Craggs |
7:f9d690fb6dad | 56 | IPStack() |
Ian Craggs |
7:f9d690fb6dad | 57 | { |
Ian Craggs |
7:f9d690fb6dad | 58 | |
Ian Craggs |
7:f9d690fb6dad | 59 | } |
Ian Craggs |
7:f9d690fb6dad | 60 | |
Ian Craggs |
7:f9d690fb6dad | 61 | int Socket_error(const char* aString) |
Ian Craggs |
7:f9d690fb6dad | 62 | { |
Ian Craggs |
7:f9d690fb6dad | 63 | |
Ian Craggs |
7:f9d690fb6dad | 64 | if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK) |
Ian Craggs |
7:f9d690fb6dad | 65 | { |
Ian Craggs |
7:f9d690fb6dad | 66 | if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET)) |
Ian Craggs |
7:f9d690fb6dad | 67 | printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock); |
Ian Craggs |
7:f9d690fb6dad | 68 | } |
Ian Craggs |
7:f9d690fb6dad | 69 | return errno; |
Ian Craggs |
7:f9d690fb6dad | 70 | } |
Ian Craggs |
7:f9d690fb6dad | 71 | |
Ian Craggs |
7:f9d690fb6dad | 72 | int connect(const char* hostname, int port) |
Ian Craggs |
7:f9d690fb6dad | 73 | { |
Ian Craggs |
7:f9d690fb6dad | 74 | int type = SOCK_STREAM; |
Ian Craggs |
7:f9d690fb6dad | 75 | struct sockaddr_in address; |
Ian Craggs |
7:f9d690fb6dad | 76 | int rc = -1; |
Ian Craggs |
7:f9d690fb6dad | 77 | sa_family_t family = AF_INET; |
Ian Craggs |
7:f9d690fb6dad | 78 | struct addrinfo *result = NULL; |
Ian Craggs |
7:f9d690fb6dad | 79 | struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL}; |
Ian Craggs |
7:f9d690fb6dad | 80 | |
Ian Craggs |
7:f9d690fb6dad | 81 | if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0) |
Ian Craggs |
7:f9d690fb6dad | 82 | { |
Ian Craggs |
7:f9d690fb6dad | 83 | struct addrinfo* res = result; |
Ian Craggs |
7:f9d690fb6dad | 84 | |
Ian Craggs |
7:f9d690fb6dad | 85 | /* prefer ip4 addresses */ |
Ian Craggs |
7:f9d690fb6dad | 86 | while (res) |
Ian Craggs |
7:f9d690fb6dad | 87 | { |
Ian Craggs |
7:f9d690fb6dad | 88 | if (res->ai_family == AF_INET) |
Ian Craggs |
7:f9d690fb6dad | 89 | { |
Ian Craggs |
7:f9d690fb6dad | 90 | result = res; |
Ian Craggs |
7:f9d690fb6dad | 91 | break; |
Ian Craggs |
7:f9d690fb6dad | 92 | } |
Ian Craggs |
7:f9d690fb6dad | 93 | res = res->ai_next; |
Ian Craggs |
7:f9d690fb6dad | 94 | } |
Ian Craggs |
7:f9d690fb6dad | 95 | |
Ian Craggs |
7:f9d690fb6dad | 96 | if (result->ai_family == AF_INET) |
Ian Craggs |
7:f9d690fb6dad | 97 | { |
Ian Craggs |
7:f9d690fb6dad | 98 | address.sin_port = htons(port); |
Ian Craggs |
7:f9d690fb6dad | 99 | address.sin_family = family = AF_INET; |
Ian Craggs |
7:f9d690fb6dad | 100 | address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr; |
Ian Craggs |
7:f9d690fb6dad | 101 | } |
Ian Craggs |
7:f9d690fb6dad | 102 | else |
Ian Craggs |
7:f9d690fb6dad | 103 | rc = -1; |
Ian Craggs |
7:f9d690fb6dad | 104 | |
Ian Craggs |
7:f9d690fb6dad | 105 | freeaddrinfo(result); |
Ian Craggs |
7:f9d690fb6dad | 106 | } |
Ian Craggs |
7:f9d690fb6dad | 107 | |
Ian Craggs |
7:f9d690fb6dad | 108 | if (rc == 0) |
Ian Craggs |
7:f9d690fb6dad | 109 | { |
Ian Craggs |
7:f9d690fb6dad | 110 | mysock = socket(family, type, 0); |
Ian Craggs |
7:f9d690fb6dad | 111 | if (mysock != -1) |
Ian Craggs |
7:f9d690fb6dad | 112 | { |
Ian Craggs |
7:f9d690fb6dad | 113 | int opt = 1; |
Ian Craggs |
7:f9d690fb6dad | 114 | |
Ian Craggs |
7:f9d690fb6dad | 115 | //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0) |
Ian Craggs |
7:f9d690fb6dad | 116 | // printf("Could not set SO_NOSIGPIPE for socket %d", mysock); |
Ian Craggs |
7:f9d690fb6dad | 117 | |
Ian Craggs |
7:f9d690fb6dad | 118 | rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address)); |
Ian Craggs |
7:f9d690fb6dad | 119 | } |
Ian Craggs |
7:f9d690fb6dad | 120 | } |
Ian Craggs |
7:f9d690fb6dad | 121 | |
Ian Craggs |
7:f9d690fb6dad | 122 | return rc; |
Ian Craggs |
7:f9d690fb6dad | 123 | } |
Ian Craggs |
7:f9d690fb6dad | 124 | |
Ian Craggs |
7:f9d690fb6dad | 125 | int read(char* buffer, int len, int timeout) |
Ian Craggs |
7:f9d690fb6dad | 126 | { |
Ian Craggs |
7:f9d690fb6dad | 127 | printf("reading %d bytes\n", len); |
Ian Craggs |
7:f9d690fb6dad | 128 | int rc = ::recv(mysock, buffer, (size_t)len, 0); |
Ian Craggs |
7:f9d690fb6dad | 129 | if (rc == -1) |
Ian Craggs |
7:f9d690fb6dad | 130 | Socket_error("read"); |
Ian Craggs |
7:f9d690fb6dad | 131 | printf("read %d bytes\n", rc); |
Ian Craggs |
7:f9d690fb6dad | 132 | return rc; |
Ian Craggs |
7:f9d690fb6dad | 133 | } |
Ian Craggs |
7:f9d690fb6dad | 134 | |
Ian Craggs |
7:f9d690fb6dad | 135 | int write(char* buffer, int len, int timeout) |
Ian Craggs |
7:f9d690fb6dad | 136 | { |
Ian Craggs |
7:f9d690fb6dad | 137 | return ::write(mysock, buffer, len); |
Ian Craggs |
7:f9d690fb6dad | 138 | } |
Ian Craggs |
7:f9d690fb6dad | 139 | |
Ian Craggs |
7:f9d690fb6dad | 140 | private: |
Ian Craggs |
7:f9d690fb6dad | 141 | |
Ian Craggs |
7:f9d690fb6dad | 142 | int mysock; |
Ian Craggs |
7:f9d690fb6dad | 143 | |
Ian Craggs |
7:f9d690fb6dad | 144 | }; |
Ian Craggs |
7:f9d690fb6dad | 145 | |
Ian Craggs |
7:f9d690fb6dad | 146 | |
Ian Craggs |
7:f9d690fb6dad | 147 | class Timer |
Ian Craggs |
7:f9d690fb6dad | 148 | { |
Ian Craggs |
7:f9d690fb6dad | 149 | public: |
Ian Craggs |
7:f9d690fb6dad | 150 | |
Ian Craggs |
7:f9d690fb6dad | 151 | Timer() |
Ian Craggs |
7:f9d690fb6dad | 152 | { |
Ian Craggs |
7:f9d690fb6dad | 153 | reset(); |
Ian Craggs |
7:f9d690fb6dad | 154 | } |
Ian Craggs |
7:f9d690fb6dad | 155 | |
Ian Craggs |
7:f9d690fb6dad | 156 | void start() |
Ian Craggs |
7:f9d690fb6dad | 157 | { |
Ian Craggs |
7:f9d690fb6dad | 158 | if (running) |
Ian Craggs |
7:f9d690fb6dad | 159 | return; |
Ian Craggs |
7:f9d690fb6dad | 160 | if (!timerisset(&stop_time)) |
Ian Craggs |
7:f9d690fb6dad | 161 | gettimeofday(&start_time, NULL); |
Ian Craggs |
7:f9d690fb6dad | 162 | else |
Ian Craggs |
7:f9d690fb6dad | 163 | { |
Ian Craggs |
7:f9d690fb6dad | 164 | struct timeval now, res; |
Ian Craggs |
7:f9d690fb6dad | 165 | |
Ian Craggs |
7:f9d690fb6dad | 166 | gettimeofday(&now, NULL); |
Ian Craggs |
7:f9d690fb6dad | 167 | |
Ian Craggs |
7:f9d690fb6dad | 168 | timersub(&now, &stop_time, &res); // interval to be added to start time |
Ian Craggs |
7:f9d690fb6dad | 169 | timeradd(&start_time, &res, &stop_time); |
Ian Craggs |
7:f9d690fb6dad | 170 | stop_time = start_time; |
Ian Craggs |
7:f9d690fb6dad | 171 | timerclear(&stop_time); |
Ian Craggs |
7:f9d690fb6dad | 172 | } |
Ian Craggs |
7:f9d690fb6dad | 173 | running = true; |
Ian Craggs |
7:f9d690fb6dad | 174 | } |
Ian Craggs |
7:f9d690fb6dad | 175 | |
Ian Craggs |
7:f9d690fb6dad | 176 | void stop() |
Ian Craggs |
7:f9d690fb6dad | 177 | { |
Ian Craggs |
7:f9d690fb6dad | 178 | if (running) |
Ian Craggs |
7:f9d690fb6dad | 179 | { |
Ian Craggs |
7:f9d690fb6dad | 180 | gettimeofday(&stop_time, NULL); |
Ian Craggs |
7:f9d690fb6dad | 181 | running = false; |
Ian Craggs |
7:f9d690fb6dad | 182 | } |
Ian Craggs |
7:f9d690fb6dad | 183 | } |
Ian Craggs |
7:f9d690fb6dad | 184 | |
Ian Craggs |
7:f9d690fb6dad | 185 | void reset() |
Ian Craggs |
7:f9d690fb6dad | 186 | { |
Ian Craggs |
7:f9d690fb6dad | 187 | timerclear(&start_time); |
Ian Craggs |
7:f9d690fb6dad | 188 | timerclear(&stop_time); |
Ian Craggs |
7:f9d690fb6dad | 189 | running = false; |
Ian Craggs |
7:f9d690fb6dad | 190 | } |
Ian Craggs |
7:f9d690fb6dad | 191 | |
Ian Craggs |
7:f9d690fb6dad | 192 | int read_ms() // get the time passed in milli-seconds |
Ian Craggs |
7:f9d690fb6dad | 193 | { |
Ian Craggs |
7:f9d690fb6dad | 194 | struct timeval now, res; |
Ian Craggs |
7:f9d690fb6dad | 195 | |
Ian Craggs |
7:f9d690fb6dad | 196 | gettimeofday(&now, NULL); |
Ian Craggs |
7:f9d690fb6dad | 197 | timersub(&now, &start_time, &res); |
Ian Craggs |
7:f9d690fb6dad | 198 | return (res.tv_sec)*1000 + (res.tv_usec)/1000; |
Ian Craggs |
7:f9d690fb6dad | 199 | } |
Ian Craggs |
7:f9d690fb6dad | 200 | |
Ian Craggs |
7:f9d690fb6dad | 201 | private: |
Ian Craggs |
7:f9d690fb6dad | 202 | |
Ian Craggs |
7:f9d690fb6dad | 203 | struct timeval start_time, stop_time; |
Ian Craggs |
7:f9d690fb6dad | 204 | bool running; |
Ian Craggs |
7:f9d690fb6dad | 205 | |
Ian Craggs |
7:f9d690fb6dad | 206 | }; |
Ian Craggs |
7:f9d690fb6dad | 207 | |
Ian Craggs |
7:f9d690fb6dad | 208 | |
Ian Craggs |
7:f9d690fb6dad | 209 | void messageArrived(MQTT::Message* message) |
Ian Craggs |
7:f9d690fb6dad | 210 | { |
Ian Craggs |
7:f9d690fb6dad | 211 | } |
Ian Craggs |
7:f9d690fb6dad | 212 | |
Ian Craggs |
7:f9d690fb6dad | 213 | |
Ian Craggs |
7:f9d690fb6dad | 214 | int main(int argc, char* argv[]) |
Ian Craggs |
7:f9d690fb6dad | 215 | { |
Ian Craggs |
7:f9d690fb6dad | 216 | IPStack ipstack = IPStack(); |
Ian Craggs |
7:f9d690fb6dad | 217 | Timer t; |
Ian Craggs |
7:f9d690fb6dad | 218 | FP<void, MQTT::Message*> messageArrivedPointer; |
Ian Craggs |
7:f9d690fb6dad | 219 | |
Ian Craggs |
7:f9d690fb6dad | 220 | messageArrivedPointer.attach(messageArrived); |
Ian Craggs |
7:f9d690fb6dad | 221 | |
Ian Craggs |
7:f9d690fb6dad | 222 | int rc = ipstack.connect("127.0.0.1", 1883); |
Ian Craggs |
7:f9d690fb6dad | 223 | printf("rc from TCP connect is %d\n", rc); |
Ian Craggs |
7:f9d690fb6dad | 224 | |
Ian Craggs |
7:f9d690fb6dad | 225 | MQTT::Client<IPStack, Timer, Thread> client = MQTT::Client<IPStack, Timer, Thread>(&ipstack, &t); |
Ian Craggs |
7:f9d690fb6dad | 226 | |
Ian Craggs |
7:f9d690fb6dad | 227 | printf("constructed\n"); |
Ian Craggs |
7:f9d690fb6dad | 228 | |
Ian Craggs |
7:f9d690fb6dad | 229 | rc = client.connect(); |
Ian Craggs |
7:f9d690fb6dad | 230 | printf("rc from connect is %d\n", rc); |
Ian Craggs |
7:f9d690fb6dad | 231 | |
Ian Craggs |
7:f9d690fb6dad | 232 | rc = client.subscribe("topic", MQTT::QOS2, messageArrivedPointer); |
Ian Craggs |
7:f9d690fb6dad | 233 | sleep(1); |
Ian Craggs |
7:f9d690fb6dad | 234 | } |
icraggs | 8:c46930bd6c82 | 235 | |
icraggs | 8:c46930bd6c82 | 236 | #endif |