Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Dependents:   MQTT_G_SENSOR

This program and the MQTT libraries it uses are part of the EclipseTM Paho project; specifically the embedded client.

This example and API are working, but are still in progress. Please give us your feedback.

HelloMQTT is an example of using the MQTT API. The MQTT API is portable across network interface stacks. MQTT is designed to be used with TCP/IP, but any transport with similar characteristics should be suitable.

HelloMQTT uses the NetworkInterface APIs in mbed OS 5 to show how this works. The MQTT library contains an MQTTNetwork.h header, which is a wrapper around the mbed networking interface. To switch between connectivity methods (the default is Ethernet) the easy-connect library is provided in this example application. You can change the connectivity method in mbed_app.json.

Adding new connectivity methods to the program is trivial, as long as they implement the mbed OS 5 NetworkStack API.

Committer:
icraggs
Date:
Wed Apr 30 13:16:09 2014 +0000
Revision:
2:638c854c0695
Parent:
1:a1d5c7a6acbc
Child:
3:7a6a899de7cc
Update to the latest level of code

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 1:a1d5c7a6acbc 1 /*******************************************************************************
icraggs 1:a1d5c7a6acbc 2 * Copyright (c) 2014 IBM Corp.
icraggs 1:a1d5c7a6acbc 3 *
icraggs 1:a1d5c7a6acbc 4 * All rights reserved. This program and the accompanying materials
icraggs 1:a1d5c7a6acbc 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 1:a1d5c7a6acbc 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 1:a1d5c7a6acbc 7 *
icraggs 1:a1d5c7a6acbc 8 * The Eclipse Public License is available at
icraggs 1:a1d5c7a6acbc 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 1:a1d5c7a6acbc 10 * and the Eclipse Distribution License is available at
icraggs 1:a1d5c7a6acbc 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 1:a1d5c7a6acbc 12 *
icraggs 1:a1d5c7a6acbc 13 * Contributors:
icraggs 1:a1d5c7a6acbc 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 1:a1d5c7a6acbc 15 *******************************************************************************/
icraggs 2:638c854c0695 16
icraggs 2:638c854c0695 17 /**
icraggs 2:638c854c0695 18 This is a sample program to illustrate the use of the MQTT Client library
icraggs 2:638c854c0695 19 on the mbed platform. The Client class requires two classes which mediate
icraggs 2:638c854c0695 20 access to system interfaces for networking and timing. As long as these two
icraggs 2:638c854c0695 21 classes provide the required public programming interfaces, it does not matter
icraggs 2:638c854c0695 22 what facilities they use underneath. In this program, they use the mbed
icraggs 2:638c854c0695 23 system libraries.
icraggs 2:638c854c0695 24
icraggs 2:638c854c0695 25 */
icraggs 1:a1d5c7a6acbc 26
icraggs 0:0cae29831d01 27 #include "mbed.h"
icraggs 0:0cae29831d01 28 #include "EthernetInterface.h"
icraggs 2:638c854c0695 29
icraggs 0:0cae29831d01 30 #include "C12832_lcd.h"
icraggs 0:0cae29831d01 31 C12832_LCD lcd;
icraggs 0:0cae29831d01 32
icraggs 2:638c854c0695 33 #include "FP.cpp"
icraggs 2:638c854c0695 34 #include "MQTTClient.h"
icraggs 2:638c854c0695 35
icraggs 2:638c854c0695 36
icraggs 2:638c854c0695 37
icraggs 2:638c854c0695 38 class IPStack
icraggs 0:0cae29831d01 39 {
icraggs 2:638c854c0695 40 public:
icraggs 2:638c854c0695 41 IPStack()
icraggs 2:638c854c0695 42 {
icraggs 2:638c854c0695 43 eth.init(); // Use DHCP
icraggs 2:638c854c0695 44 eth.connect();
icraggs 2:638c854c0695 45 mysock.set_blocking(false, 1000); // 1 second Timeout
icraggs 2:638c854c0695 46 }
icraggs 2:638c854c0695 47
icraggs 2:638c854c0695 48 int connect(char* hostname, int port)
icraggs 2:638c854c0695 49 {
icraggs 2:638c854c0695 50 return mysock.connect(hostname, port);
icraggs 2:638c854c0695 51 }
icraggs 2:638c854c0695 52
icraggs 2:638c854c0695 53 int read(char* buffer, int len, int timeout)
icraggs 2:638c854c0695 54 {
icraggs 2:638c854c0695 55 mysock.set_blocking(false, timeout);
icraggs 2:638c854c0695 56 return mysock.receive(buffer, len);
icraggs 2:638c854c0695 57 }
icraggs 2:638c854c0695 58
icraggs 2:638c854c0695 59 int write(char* buffer, int len, int timeout)
icraggs 2:638c854c0695 60 {
icraggs 2:638c854c0695 61 mysock.set_blocking(false, timeout);
icraggs 2:638c854c0695 62 return mysock.send(buffer, len);
icraggs 2:638c854c0695 63 }
icraggs 2:638c854c0695 64
icraggs 2:638c854c0695 65 int disconnect()
icraggs 2:638c854c0695 66 {
icraggs 2:638c854c0695 67 return mysock.close();
icraggs 2:638c854c0695 68 }
icraggs 2:638c854c0695 69
icraggs 2:638c854c0695 70 private:
icraggs 2:638c854c0695 71
icraggs 2:638c854c0695 72 EthernetInterface eth;
icraggs 0:0cae29831d01 73 TCPSocketConnection mysock;
icraggs 2:638c854c0695 74
icraggs 2:638c854c0695 75 };
icraggs 2:638c854c0695 76
icraggs 2:638c854c0695 77
icraggs 2:638c854c0695 78 class Countdown
icraggs 2:638c854c0695 79 {
icraggs 2:638c854c0695 80 public:
icraggs 2:638c854c0695 81 Countdown()
icraggs 2:638c854c0695 82 {
icraggs 2:638c854c0695 83 t = Timer();
icraggs 2:638c854c0695 84 }
icraggs 2:638c854c0695 85
icraggs 2:638c854c0695 86 Countdown(int ms)
icraggs 2:638c854c0695 87 {
icraggs 2:638c854c0695 88 t = Timer();
icraggs 2:638c854c0695 89 countdown_ms(ms);
icraggs 2:638c854c0695 90 }
icraggs 2:638c854c0695 91
icraggs 0:0cae29831d01 92
icraggs 2:638c854c0695 93 bool expired()
icraggs 2:638c854c0695 94 {
icraggs 2:638c854c0695 95 return t.read_ms() >= interval_end_ms;
icraggs 2:638c854c0695 96 }
icraggs 2:638c854c0695 97
icraggs 2:638c854c0695 98 void countdown_ms(int ms)
icraggs 2:638c854c0695 99 {
icraggs 2:638c854c0695 100 t.stop();
icraggs 2:638c854c0695 101 interval_end_ms = ms;
icraggs 2:638c854c0695 102 t.reset();
icraggs 2:638c854c0695 103 t.start();
icraggs 2:638c854c0695 104 }
icraggs 2:638c854c0695 105
icraggs 2:638c854c0695 106 void countdown(int seconds)
icraggs 2:638c854c0695 107 {
icraggs 2:638c854c0695 108 countdown_ms(seconds * 1000);
icraggs 2:638c854c0695 109 }
icraggs 2:638c854c0695 110
icraggs 2:638c854c0695 111 int left_ms()
icraggs 2:638c854c0695 112 {
icraggs 2:638c854c0695 113 return interval_end_ms - t.read_ms();
icraggs 2:638c854c0695 114 }
icraggs 2:638c854c0695 115
icraggs 2:638c854c0695 116 private:
icraggs 2:638c854c0695 117 Timer t;
icraggs 2:638c854c0695 118 int interval_end_ms;
icraggs 2:638c854c0695 119 };
icraggs 0:0cae29831d01 120
icraggs 2:638c854c0695 121 int arrivedcount = 0;
icraggs 2:638c854c0695 122
icraggs 2:638c854c0695 123 void messageArrived(MQTT::Message* message)
icraggs 2:638c854c0695 124 {
icraggs 2:638c854c0695 125 lcd.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\n", message->qos, message->retained, message->dup, message->id);
icraggs 2:638c854c0695 126 lcd.printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
icraggs 2:638c854c0695 127 ++arrivedcount;
icraggs 2:638c854c0695 128 }
icraggs 0:0cae29831d01 129
icraggs 2:638c854c0695 130 int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info)
icraggs 2:638c854c0695 131 {
icraggs 2:638c854c0695 132 char* hostname = "m2m.eclipse.org";
icraggs 2:638c854c0695 133 int port = 1883;
icraggs 2:638c854c0695 134 lcd.printf("Connecting to %s:%d\n", hostname, port);
icraggs 2:638c854c0695 135 int rc = info->network->connect(hostname, port);
icraggs 2:638c854c0695 136 lcd.printf("rc from TCP connect is %d\n", rc);
icraggs 2:638c854c0695 137
icraggs 2:638c854c0695 138 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 2:638c854c0695 139 data.MQTTVersion = 3;
icraggs 2:638c854c0695 140 data.clientID.cstring = "mbed-icraggs";
icraggs 2:638c854c0695 141 rc = info->client->connect(&data);
icraggs 2:638c854c0695 142 lcd.printf("rc from MQTT connect is %d\n", rc);
icraggs 2:638c854c0695 143
icraggs 2:638c854c0695 144 return rc;
icraggs 2:638c854c0695 145 }
icraggs 2:638c854c0695 146
icraggs 2:638c854c0695 147
icraggs 2:638c854c0695 148 int main(int argc, char* argv[])
icraggs 2:638c854c0695 149 {
icraggs 2:638c854c0695 150 IPStack ipstack = IPStack();
icraggs 2:638c854c0695 151 float version = 0.3;
icraggs 2:638c854c0695 152 char* topic = "mbed-sample";
icraggs 2:638c854c0695 153
icraggs 2:638c854c0695 154 lcd.printf("Version is %f\n", version);
icraggs 2:638c854c0695 155
icraggs 2:638c854c0695 156 MQTT::Client<IPStack, Countdown>client = MQTT::Client<IPStack, Countdown>(&ipstack);
icraggs 0:0cae29831d01 157
icraggs 2:638c854c0695 158 MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack};
icraggs 2:638c854c0695 159 int rc = connect(&info);
icraggs 2:638c854c0695 160
icraggs 2:638c854c0695 161 client.setConnectionLostHandler(connect);
icraggs 2:638c854c0695 162
icraggs 2:638c854c0695 163 rc = client.subscribe(topic, MQTT::QOS1, messageArrived);
icraggs 2:638c854c0695 164 if (rc != 0)
icraggs 2:638c854c0695 165 lcd.printf("rc from MQTT subscribe is %d\n", rc);
icraggs 2:638c854c0695 166
icraggs 2:638c854c0695 167 MQTT::Message message;
icraggs 0:0cae29831d01 168
icraggs 2:638c854c0695 169 // QoS 0
icraggs 2:638c854c0695 170 char buf[100];
icraggs 2:638c854c0695 171 sprintf(buf, "Hello World! QoS 0 message from app version %f\n", version);
icraggs 2:638c854c0695 172 message.qos = MQTT::QOS0;
icraggs 2:638c854c0695 173 message.retained = false;
icraggs 2:638c854c0695 174 message.dup = false;
icraggs 2:638c854c0695 175 message.payload = (void*)buf;
icraggs 2:638c854c0695 176 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 177 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 178 while (arrivedcount == 0)
icraggs 2:638c854c0695 179 client.yield(100);
icraggs 2:638c854c0695 180
icraggs 2:638c854c0695 181 // QoS 1
icraggs 2:638c854c0695 182 sprintf(buf, "Hello World! QoS 1 message from app version %f\n", version);
icraggs 2:638c854c0695 183 message.qos = MQTT::QOS1;
icraggs 2:638c854c0695 184 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 185 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 186 while (arrivedcount == 1)
icraggs 2:638c854c0695 187 client.yield(100);
icraggs 2:638c854c0695 188
icraggs 2:638c854c0695 189 // QoS 2
icraggs 2:638c854c0695 190 sprintf(buf, "Hello World! QoS 2 message from app version %f\n", version);
icraggs 2:638c854c0695 191 message.qos = MQTT::QOS2;
icraggs 2:638c854c0695 192 message.payloadlen = strlen(buf)+1;
icraggs 2:638c854c0695 193 rc = client.publish(topic, &message);
icraggs 2:638c854c0695 194 while (arrivedcount == 2)
icraggs 2:638c854c0695 195 client.yield(100);
icraggs 2:638c854c0695 196
icraggs 2:638c854c0695 197 rc = client.unsubscribe(topic);
icraggs 2:638c854c0695 198 if (rc != 0)
icraggs 2:638c854c0695 199 lcd.printf("rc from unsubscribe was %d\n", rc);
icraggs 2:638c854c0695 200
icraggs 2:638c854c0695 201 rc = client.disconnect();
icraggs 2:638c854c0695 202 if (rc != 0)
icraggs 2:638c854c0695 203 lcd.printf("rc from disconnect was %d\n", rc);
icraggs 2:638c854c0695 204
icraggs 2:638c854c0695 205 ipstack.disconnect();
icraggs 2:638c854c0695 206
icraggs 2:638c854c0695 207 lcd.printf("Finishing with %d messages received\n", arrivedcount);
icraggs 2:638c854c0695 208
icraggs 0:0cae29831d01 209 return 0;
icraggs 0:0cae29831d01 210 }