Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Dependents:   MQTT_G_SENSOR

This program and the MQTT libraries it uses are part of the EclipseTM Paho project; specifically the embedded client.

This example and API are working, but are still in progress. Please give us your feedback.

HelloMQTT is an example of using the MQTT API. The MQTT API is portable across network interface stacks. MQTT is designed to be used with TCP/IP, but any transport with similar characteristics should be suitable.

HelloMQTT uses the NetworkInterface APIs in mbed OS 5 to show how this works. The MQTT library contains an MQTTNetwork.h header, which is a wrapper around the mbed networking interface. To switch between connectivity methods (the default is Ethernet) the easy-connect library is provided in this example application. You can change the connectivity method in mbed_app.json.

Adding new connectivity methods to the program is trivial, as long as they implement the mbed OS 5 NetworkStack API.

Committer:
icraggs
Date:
Tue May 06 09:45:22 2014 +0000
Revision:
3:7a6a899de7cc
Use latest MQTT library on both mbed and Linux

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 3:7a6a899de7cc 1 /*******************************************************************************
icraggs 3:7a6a899de7cc 2 * Copyright (c) 2014 IBM Corp.
icraggs 3:7a6a899de7cc 3 *
icraggs 3:7a6a899de7cc 4 * All rights reserved. This program and the accompanying materials
icraggs 3:7a6a899de7cc 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 3:7a6a899de7cc 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 3:7a6a899de7cc 7 *
icraggs 3:7a6a899de7cc 8 * The Eclipse Public License is available at
icraggs 3:7a6a899de7cc 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 3:7a6a899de7cc 10 * and the Eclipse Distribution License is available at
icraggs 3:7a6a899de7cc 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 3:7a6a899de7cc 12 *
icraggs 3:7a6a899de7cc 13 * Contributors:
icraggs 3:7a6a899de7cc 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 3:7a6a899de7cc 15 *******************************************************************************/
icraggs 3:7a6a899de7cc 16
icraggs 3:7a6a899de7cc 17 /**
icraggs 3:7a6a899de7cc 18 This is a sample program to illustrate the use of the MQTT Client library
icraggs 3:7a6a899de7cc 19 on Linux. The Client class requires two classes which mediate
icraggs 3:7a6a899de7cc 20 access to system interfaces for networking and timing. As long as these two
icraggs 3:7a6a899de7cc 21 classes provide the required public programming interfaces, it does not matter
icraggs 3:7a6a899de7cc 22 what facilities they use underneath. In this program, they use the Linux
icraggs 3:7a6a899de7cc 23 system libraries.
icraggs 3:7a6a899de7cc 24
icraggs 3:7a6a899de7cc 25 */
icraggs 3:7a6a899de7cc 26
icraggs 3:7a6a899de7cc 27 #if defined(LINUX)
icraggs 3:7a6a899de7cc 28
icraggs 3:7a6a899de7cc 29 #include <sys/types.h>
icraggs 3:7a6a899de7cc 30 #include <sys/socket.h>
icraggs 3:7a6a899de7cc 31 #include <sys/param.h>
icraggs 3:7a6a899de7cc 32 #include <sys/time.h>
icraggs 3:7a6a899de7cc 33 #include <sys/select.h>
icraggs 3:7a6a899de7cc 34 #include <netinet/in.h>
icraggs 3:7a6a899de7cc 35 #include <netinet/tcp.h>
icraggs 3:7a6a899de7cc 36 #include <arpa/inet.h>
icraggs 3:7a6a899de7cc 37 #include <netdb.h>
icraggs 3:7a6a899de7cc 38 #include <stdio.h>
icraggs 3:7a6a899de7cc 39 #include <unistd.h>
icraggs 3:7a6a899de7cc 40 #include <errno.h>
icraggs 3:7a6a899de7cc 41 #include <fcntl.h>
icraggs 3:7a6a899de7cc 42
icraggs 3:7a6a899de7cc 43 #include <stdlib.h>
icraggs 3:7a6a899de7cc 44 #include <string.h>
icraggs 3:7a6a899de7cc 45 #include <signal.h>
icraggs 3:7a6a899de7cc 46
icraggs 3:7a6a899de7cc 47 #include "MQTTClient.h"
icraggs 3:7a6a899de7cc 48 #include "FP.cpp"
icraggs 3:7a6a899de7cc 49
icraggs 3:7a6a899de7cc 50 #define DEFAULT_STACK_SIZE -1
icraggs 3:7a6a899de7cc 51
icraggs 3:7a6a899de7cc 52
icraggs 3:7a6a899de7cc 53 class IPStack
icraggs 3:7a6a899de7cc 54 {
icraggs 3:7a6a899de7cc 55 public:
icraggs 3:7a6a899de7cc 56 IPStack()
icraggs 3:7a6a899de7cc 57 {
icraggs 3:7a6a899de7cc 58
icraggs 3:7a6a899de7cc 59 }
icraggs 3:7a6a899de7cc 60
icraggs 3:7a6a899de7cc 61 int Socket_error(const char* aString)
icraggs 3:7a6a899de7cc 62 {
icraggs 3:7a6a899de7cc 63
icraggs 3:7a6a899de7cc 64 if (errno != EINTR && errno != EAGAIN && errno != EINPROGRESS && errno != EWOULDBLOCK)
icraggs 3:7a6a899de7cc 65 {
icraggs 3:7a6a899de7cc 66 if (strcmp(aString, "shutdown") != 0 || (errno != ENOTCONN && errno != ECONNRESET))
icraggs 3:7a6a899de7cc 67 printf("Socket error %s in %s for socket %d\n", strerror(errno), aString, mysock);
icraggs 3:7a6a899de7cc 68 }
icraggs 3:7a6a899de7cc 69 return errno;
icraggs 3:7a6a899de7cc 70 }
icraggs 3:7a6a899de7cc 71
icraggs 3:7a6a899de7cc 72 int connect(const char* hostname, int port)
icraggs 3:7a6a899de7cc 73 {
icraggs 3:7a6a899de7cc 74 int type = SOCK_STREAM;
icraggs 3:7a6a899de7cc 75 struct sockaddr_in address;
icraggs 3:7a6a899de7cc 76 int rc = -1;
icraggs 3:7a6a899de7cc 77 sa_family_t family = AF_INET;
icraggs 3:7a6a899de7cc 78 struct addrinfo *result = NULL;
icraggs 3:7a6a899de7cc 79 struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
icraggs 3:7a6a899de7cc 80
icraggs 3:7a6a899de7cc 81 if ((rc = getaddrinfo(hostname, NULL, &hints, &result)) == 0)
icraggs 3:7a6a899de7cc 82 {
icraggs 3:7a6a899de7cc 83 struct addrinfo* res = result;
icraggs 3:7a6a899de7cc 84
icraggs 3:7a6a899de7cc 85 /* prefer ip4 addresses */
icraggs 3:7a6a899de7cc 86 while (res)
icraggs 3:7a6a899de7cc 87 {
icraggs 3:7a6a899de7cc 88 if (res->ai_family == AF_INET)
icraggs 3:7a6a899de7cc 89 {
icraggs 3:7a6a899de7cc 90 result = res;
icraggs 3:7a6a899de7cc 91 break;
icraggs 3:7a6a899de7cc 92 }
icraggs 3:7a6a899de7cc 93 res = res->ai_next;
icraggs 3:7a6a899de7cc 94 }
icraggs 3:7a6a899de7cc 95
icraggs 3:7a6a899de7cc 96 if (result->ai_family == AF_INET)
icraggs 3:7a6a899de7cc 97 {
icraggs 3:7a6a899de7cc 98 address.sin_port = htons(port);
icraggs 3:7a6a899de7cc 99 address.sin_family = family = AF_INET;
icraggs 3:7a6a899de7cc 100 address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
icraggs 3:7a6a899de7cc 101 }
icraggs 3:7a6a899de7cc 102 else
icraggs 3:7a6a899de7cc 103 rc = -1;
icraggs 3:7a6a899de7cc 104
icraggs 3:7a6a899de7cc 105 freeaddrinfo(result);
icraggs 3:7a6a899de7cc 106 }
icraggs 3:7a6a899de7cc 107
icraggs 3:7a6a899de7cc 108 if (rc == 0)
icraggs 3:7a6a899de7cc 109 {
icraggs 3:7a6a899de7cc 110 mysock = socket(family, type, 0);
icraggs 3:7a6a899de7cc 111 if (mysock != -1)
icraggs 3:7a6a899de7cc 112 {
icraggs 3:7a6a899de7cc 113 int opt = 1;
icraggs 3:7a6a899de7cc 114
icraggs 3:7a6a899de7cc 115 //if (setsockopt(mysock, SOL_SOCKET, SO_NOSIGPIPE, (void*)&opt, sizeof(opt)) != 0)
icraggs 3:7a6a899de7cc 116 // printf("Could not set SO_NOSIGPIPE for socket %d", mysock);
icraggs 3:7a6a899de7cc 117
icraggs 3:7a6a899de7cc 118 rc = ::connect(mysock, (struct sockaddr*)&address, sizeof(address));
icraggs 3:7a6a899de7cc 119 }
icraggs 3:7a6a899de7cc 120 }
icraggs 3:7a6a899de7cc 121
icraggs 3:7a6a899de7cc 122 return rc;
icraggs 3:7a6a899de7cc 123 }
icraggs 3:7a6a899de7cc 124
icraggs 3:7a6a899de7cc 125 int read(char* buffer, int len, int timeout_ms)
icraggs 3:7a6a899de7cc 126 {
icraggs 3:7a6a899de7cc 127 struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
icraggs 3:7a6a899de7cc 128 if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
icraggs 3:7a6a899de7cc 129 {
icraggs 3:7a6a899de7cc 130 interval.tv_sec = 0;
icraggs 3:7a6a899de7cc 131 interval.tv_usec = 100;
icraggs 3:7a6a899de7cc 132 }
icraggs 3:7a6a899de7cc 133
icraggs 3:7a6a899de7cc 134 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
icraggs 3:7a6a899de7cc 135
icraggs 3:7a6a899de7cc 136 //printf("reading %d bytes\n", len);
icraggs 3:7a6a899de7cc 137 int rc = ::recv(mysock, buffer, (size_t)len, 0);
icraggs 3:7a6a899de7cc 138 if (rc == -1)
icraggs 3:7a6a899de7cc 139 Socket_error("read");
icraggs 3:7a6a899de7cc 140 //printf("read %d bytes\n", rc);
icraggs 3:7a6a899de7cc 141 return rc;
icraggs 3:7a6a899de7cc 142 }
icraggs 3:7a6a899de7cc 143
icraggs 3:7a6a899de7cc 144 int write(char* buffer, int len, int timeout)
icraggs 3:7a6a899de7cc 145 {
icraggs 3:7a6a899de7cc 146 struct timeval tv;
icraggs 3:7a6a899de7cc 147
icraggs 3:7a6a899de7cc 148 tv.tv_sec = 0; /* 30 Secs Timeout */
icraggs 3:7a6a899de7cc 149 tv.tv_usec = timeout * 1000; // Not init'ing this can cause strange errors
icraggs 3:7a6a899de7cc 150
icraggs 3:7a6a899de7cc 151 setsockopt(mysock, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv,sizeof(struct timeval));
icraggs 3:7a6a899de7cc 152 int rc = ::write(mysock, buffer, len);
icraggs 3:7a6a899de7cc 153 //printf("write rc %d\n", rc);
icraggs 3:7a6a899de7cc 154 return rc;
icraggs 3:7a6a899de7cc 155 }
icraggs 3:7a6a899de7cc 156
icraggs 3:7a6a899de7cc 157 int disconnect()
icraggs 3:7a6a899de7cc 158 {
icraggs 3:7a6a899de7cc 159 return ::close(mysock);
icraggs 3:7a6a899de7cc 160 }
icraggs 3:7a6a899de7cc 161
icraggs 3:7a6a899de7cc 162 private:
icraggs 3:7a6a899de7cc 163
icraggs 3:7a6a899de7cc 164 int mysock;
icraggs 3:7a6a899de7cc 165
icraggs 3:7a6a899de7cc 166 };
icraggs 3:7a6a899de7cc 167
icraggs 3:7a6a899de7cc 168
icraggs 3:7a6a899de7cc 169 class Countdown
icraggs 3:7a6a899de7cc 170 {
icraggs 3:7a6a899de7cc 171 public:
icraggs 3:7a6a899de7cc 172 Countdown()
icraggs 3:7a6a899de7cc 173 {
icraggs 3:7a6a899de7cc 174
icraggs 3:7a6a899de7cc 175 }
icraggs 3:7a6a899de7cc 176
icraggs 3:7a6a899de7cc 177 Countdown(int ms)
icraggs 3:7a6a899de7cc 178 {
icraggs 3:7a6a899de7cc 179 countdown_ms(ms);
icraggs 3:7a6a899de7cc 180 }
icraggs 3:7a6a899de7cc 181
icraggs 3:7a6a899de7cc 182
icraggs 3:7a6a899de7cc 183 bool expired()
icraggs 3:7a6a899de7cc 184 {
icraggs 3:7a6a899de7cc 185 struct timeval now, res;
icraggs 3:7a6a899de7cc 186 gettimeofday(&now, NULL);
icraggs 3:7a6a899de7cc 187 timersub(&end_time, &now, &res);
icraggs 3:7a6a899de7cc 188 //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
icraggs 3:7a6a899de7cc 189 //if (res.tv_sec > 0 || res.tv_usec > 0)
icraggs 3:7a6a899de7cc 190 // printf("expired %d %d\n", res.tv_sec, res.tv_usec);
icraggs 3:7a6a899de7cc 191 return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
icraggs 3:7a6a899de7cc 192 }
icraggs 3:7a6a899de7cc 193
icraggs 3:7a6a899de7cc 194
icraggs 3:7a6a899de7cc 195 void countdown_ms(int ms)
icraggs 3:7a6a899de7cc 196 {
icraggs 3:7a6a899de7cc 197 struct timeval now;
icraggs 3:7a6a899de7cc 198 gettimeofday(&now, NULL);
icraggs 3:7a6a899de7cc 199 struct timeval interval = {ms / 1000, (ms % 1000) * 1000};
icraggs 3:7a6a899de7cc 200 //printf("interval %d %d\n", interval.tv_sec, interval.tv_usec);
icraggs 3:7a6a899de7cc 201 timeradd(&now, &interval, &end_time);
icraggs 3:7a6a899de7cc 202 }
icraggs 3:7a6a899de7cc 203
icraggs 3:7a6a899de7cc 204
icraggs 3:7a6a899de7cc 205 void countdown(int seconds)
icraggs 3:7a6a899de7cc 206 {
icraggs 3:7a6a899de7cc 207 struct timeval now;
icraggs 3:7a6a899de7cc 208 gettimeofday(&now, NULL);
icraggs 3:7a6a899de7cc 209 struct timeval interval = {seconds, 0};
icraggs 3:7a6a899de7cc 210 timeradd(&now, &interval, &end_time);
icraggs 3:7a6a899de7cc 211 }
icraggs 3:7a6a899de7cc 212
icraggs 3:7a6a899de7cc 213
icraggs 3:7a6a899de7cc 214 int left_ms()
icraggs 3:7a6a899de7cc 215 {
icraggs 3:7a6a899de7cc 216 struct timeval now, res;
icraggs 3:7a6a899de7cc 217 gettimeofday(&now, NULL);
icraggs 3:7a6a899de7cc 218 timersub(&end_time, &now, &res);
icraggs 3:7a6a899de7cc 219 //printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
icraggs 3:7a6a899de7cc 220 return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
icraggs 3:7a6a899de7cc 221 }
icraggs 3:7a6a899de7cc 222
icraggs 3:7a6a899de7cc 223 private:
icraggs 3:7a6a899de7cc 224
icraggs 3:7a6a899de7cc 225 struct timeval end_time;
icraggs 3:7a6a899de7cc 226 };
icraggs 3:7a6a899de7cc 227
icraggs 3:7a6a899de7cc 228
icraggs 3:7a6a899de7cc 229 int arrivedcount = 0;
icraggs 3:7a6a899de7cc 230
icraggs 3:7a6a899de7cc 231 void messageArrived(MQTT::Message* message)
icraggs 3:7a6a899de7cc 232 {
icraggs 3:7a6a899de7cc 233 printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n",
icraggs 3:7a6a899de7cc 234 ++arrivedcount, message->qos, message->retained, message->dup, message->id);
icraggs 3:7a6a899de7cc 235 printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
icraggs 3:7a6a899de7cc 236 }
icraggs 3:7a6a899de7cc 237
icraggs 3:7a6a899de7cc 238
icraggs 3:7a6a899de7cc 239 int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info)
icraggs 3:7a6a899de7cc 240 {
icraggs 3:7a6a899de7cc 241 const char* hostname = "localhost"; //"m2m.eclipse.org";
icraggs 3:7a6a899de7cc 242 int port = 1883;
icraggs 3:7a6a899de7cc 243 printf("Connecting to %s:%d\n", hostname, port);
icraggs 3:7a6a899de7cc 244 int rc = info->network->connect(hostname, port);
icraggs 3:7a6a899de7cc 245 if (rc != 0)
icraggs 3:7a6a899de7cc 246 printf("rc from TCP connect is %d\n", rc);
icraggs 3:7a6a899de7cc 247
icraggs 3:7a6a899de7cc 248 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 3:7a6a899de7cc 249 data.MQTTVersion = 3;
icraggs 3:7a6a899de7cc 250 data.clientID.cstring = (char*)"mbed-icraggs";
icraggs 3:7a6a899de7cc 251 rc = info->client->connect(&data);
icraggs 3:7a6a899de7cc 252 if (rc != 0)
icraggs 3:7a6a899de7cc 253 printf("rc from MQTT connect is %d\n", rc);
icraggs 3:7a6a899de7cc 254
icraggs 3:7a6a899de7cc 255 return rc;
icraggs 3:7a6a899de7cc 256 }
icraggs 3:7a6a899de7cc 257
icraggs 3:7a6a899de7cc 258
icraggs 3:7a6a899de7cc 259 int main(int argc, char* argv[])
icraggs 3:7a6a899de7cc 260 {
icraggs 3:7a6a899de7cc 261 IPStack ipstack = IPStack();
icraggs 3:7a6a899de7cc 262 float version = 0.3;
icraggs 3:7a6a899de7cc 263 const char* topic = "mbed-sample";
icraggs 3:7a6a899de7cc 264
icraggs 3:7a6a899de7cc 265 printf("Version is %f\n", version);
icraggs 3:7a6a899de7cc 266
icraggs 3:7a6a899de7cc 267 MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
icraggs 3:7a6a899de7cc 268
icraggs 3:7a6a899de7cc 269 client.setConnectionLostHandler(connect);
icraggs 3:7a6a899de7cc 270
icraggs 3:7a6a899de7cc 271 MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack};
icraggs 3:7a6a899de7cc 272 int rc = connect(&info);
icraggs 3:7a6a899de7cc 273
icraggs 3:7a6a899de7cc 274 rc = client.subscribe(topic, MQTT::QOS2, messageArrived);
icraggs 3:7a6a899de7cc 275 if (rc != 0)
icraggs 3:7a6a899de7cc 276 printf("rc from MQTT subscribe is %d\n", rc);
icraggs 3:7a6a899de7cc 277
icraggs 3:7a6a899de7cc 278 MQTT::Message message;
icraggs 3:7a6a899de7cc 279
icraggs 3:7a6a899de7cc 280 // QoS 0
icraggs 3:7a6a899de7cc 281 char buf[100];
icraggs 3:7a6a899de7cc 282 sprintf(buf, "Hello World! QoS 0 message from app version %f", version);
icraggs 3:7a6a899de7cc 283 message.qos = MQTT::QOS0;
icraggs 3:7a6a899de7cc 284 message.retained = false;
icraggs 3:7a6a899de7cc 285 message.dup = false;
icraggs 3:7a6a899de7cc 286 message.payload = (void*)buf;
icraggs 3:7a6a899de7cc 287 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 288 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 289 while (arrivedcount == 0)
icraggs 3:7a6a899de7cc 290 client.yield(100);
icraggs 3:7a6a899de7cc 291
icraggs 3:7a6a899de7cc 292 // QoS 1
icraggs 3:7a6a899de7cc 293 printf("Now QoS 1\n");
icraggs 3:7a6a899de7cc 294 sprintf(buf, "Hello World! QoS 1 message from app version %f", version);
icraggs 3:7a6a899de7cc 295 message.qos = MQTT::QOS1;
icraggs 3:7a6a899de7cc 296 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 297 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 298 while (arrivedcount == 1)
icraggs 3:7a6a899de7cc 299 client.yield(100);
icraggs 3:7a6a899de7cc 300
icraggs 3:7a6a899de7cc 301 // QoS 2
icraggs 3:7a6a899de7cc 302 sprintf(buf, "Hello World! QoS 2 message from app version %f", version);
icraggs 3:7a6a899de7cc 303 message.qos = MQTT::QOS2;
icraggs 3:7a6a899de7cc 304 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 305 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 306 while (arrivedcount == 2)
icraggs 3:7a6a899de7cc 307 client.yield(100);
icraggs 3:7a6a899de7cc 308
icraggs 3:7a6a899de7cc 309 rc = client.unsubscribe(topic);
icraggs 3:7a6a899de7cc 310 if (rc != 0)
icraggs 3:7a6a899de7cc 311 printf("rc from unsubscribe was %d\n", rc);
icraggs 3:7a6a899de7cc 312
icraggs 3:7a6a899de7cc 313 rc = client.disconnect();
icraggs 3:7a6a899de7cc 314 if (rc != 0)
icraggs 3:7a6a899de7cc 315 printf("rc from disconnect was %d\n", rc);
icraggs 3:7a6a899de7cc 316
icraggs 3:7a6a899de7cc 317 ipstack.disconnect();
icraggs 3:7a6a899de7cc 318
icraggs 3:7a6a899de7cc 319 printf("Finishing with %d messages received\n", arrivedcount);
icraggs 3:7a6a899de7cc 320
icraggs 3:7a6a899de7cc 321 return 0;
icraggs 3:7a6a899de7cc 322 }
icraggs 3:7a6a899de7cc 323
icraggs 3:7a6a899de7cc 324 #endif