Sample MQTT program - simple send and receive

Dependencies:   C12832 MQTT

Dependents:   MQTT_G_SENSOR

This program and the MQTT libraries it uses are part of the EclipseTM Paho project; specifically the embedded client.

This example and API are working, but are still in progress. Please give us your feedback.

HelloMQTT is an example of using the MQTT API. The MQTT API is portable across network interface stacks. MQTT is designed to be used with TCP/IP, but any transport with similar characteristics should be suitable.

HelloMQTT uses the NetworkInterface APIs in mbed OS 5 to show how this works. The MQTT library contains an MQTTNetwork.h header, which is a wrapper around the mbed networking interface. To switch between connectivity methods (the default is Ethernet) the easy-connect library is provided in this example application. You can change the connectivity method in mbed_app.json.

Adding new connectivity methods to the program is trivial, as long as they implement the mbed OS 5 NetworkStack API.

Committer:
icraggs
Date:
Fri Aug 01 23:46:08 2014 +0000
Revision:
15:d8a31b66a85d
Parent:
8:a3e3113054a1
Latest libraries

Who changed what in which revision?

UserRevisionLine numberNew contents of line
icraggs 3:7a6a899de7cc 1 /*******************************************************************************
icraggs 3:7a6a899de7cc 2 * Copyright (c) 2014 IBM Corp.
icraggs 3:7a6a899de7cc 3 *
icraggs 3:7a6a899de7cc 4 * All rights reserved. This program and the accompanying materials
icraggs 3:7a6a899de7cc 5 * are made available under the terms of the Eclipse Public License v1.0
icraggs 3:7a6a899de7cc 6 * and Eclipse Distribution License v1.0 which accompany this distribution.
icraggs 3:7a6a899de7cc 7 *
icraggs 3:7a6a899de7cc 8 * The Eclipse Public License is available at
icraggs 3:7a6a899de7cc 9 * http://www.eclipse.org/legal/epl-v10.html
icraggs 3:7a6a899de7cc 10 * and the Eclipse Distribution License is available at
icraggs 3:7a6a899de7cc 11 * http://www.eclipse.org/org/documents/edl-v10.php.
icraggs 3:7a6a899de7cc 12 *
icraggs 3:7a6a899de7cc 13 * Contributors:
icraggs 3:7a6a899de7cc 14 * Ian Craggs - initial API and implementation and/or initial documentation
icraggs 3:7a6a899de7cc 15 *******************************************************************************/
icraggs 3:7a6a899de7cc 16
icraggs 3:7a6a899de7cc 17 /**
icraggs 3:7a6a899de7cc 18 This is a sample program to illustrate the use of the MQTT Client library
icraggs 3:7a6a899de7cc 19 on Linux. The Client class requires two classes which mediate
icraggs 3:7a6a899de7cc 20 access to system interfaces for networking and timing. As long as these two
icraggs 3:7a6a899de7cc 21 classes provide the required public programming interfaces, it does not matter
icraggs 3:7a6a899de7cc 22 what facilities they use underneath. In this program, they use the Linux
icraggs 3:7a6a899de7cc 23 system libraries.
icraggs 3:7a6a899de7cc 24
icraggs 3:7a6a899de7cc 25 */
icraggs 3:7a6a899de7cc 26
icraggs 3:7a6a899de7cc 27 #if defined(LINUX)
icraggs 3:7a6a899de7cc 28
icraggs 8:a3e3113054a1 29 #include "LinuxMQTT.h"
icraggs 8:a3e3113054a1 30 #include "LinuxIPStack.h"
icraggs 8:a3e3113054a1 31 #include "MQTTClient.h"
icraggs 8:a3e3113054a1 32
icraggs 3:7a6a899de7cc 33 #include <sys/types.h>
icraggs 3:7a6a899de7cc 34 #include <sys/socket.h>
icraggs 3:7a6a899de7cc 35 #include <sys/param.h>
icraggs 3:7a6a899de7cc 36 #include <sys/time.h>
icraggs 3:7a6a899de7cc 37 #include <sys/select.h>
icraggs 3:7a6a899de7cc 38 #include <netinet/in.h>
icraggs 3:7a6a899de7cc 39 #include <netinet/tcp.h>
icraggs 3:7a6a899de7cc 40 #include <arpa/inet.h>
icraggs 3:7a6a899de7cc 41 #include <netdb.h>
icraggs 3:7a6a899de7cc 42 #include <stdio.h>
icraggs 3:7a6a899de7cc 43 #include <unistd.h>
icraggs 3:7a6a899de7cc 44 #include <errno.h>
icraggs 3:7a6a899de7cc 45 #include <fcntl.h>
icraggs 3:7a6a899de7cc 46
icraggs 3:7a6a899de7cc 47 #include <stdlib.h>
icraggs 3:7a6a899de7cc 48 #include <string.h>
icraggs 3:7a6a899de7cc 49 #include <signal.h>
icraggs 3:7a6a899de7cc 50
icraggs 3:7a6a899de7cc 51 #define DEFAULT_STACK_SIZE -1
icraggs 3:7a6a899de7cc 52
icraggs 3:7a6a899de7cc 53
icraggs 3:7a6a899de7cc 54 int arrivedcount = 0;
icraggs 3:7a6a899de7cc 55
icraggs 3:7a6a899de7cc 56 void messageArrived(MQTT::Message* message)
icraggs 3:7a6a899de7cc 57 {
icraggs 3:7a6a899de7cc 58 printf("Message %d arrived: qos %d, retained %d, dup %d, packetid %d\n",
icraggs 3:7a6a899de7cc 59 ++arrivedcount, message->qos, message->retained, message->dup, message->id);
icraggs 3:7a6a899de7cc 60 printf("Payload %.*s\n", message->payloadlen, (char*)message->payload);
icraggs 3:7a6a899de7cc 61 }
icraggs 3:7a6a899de7cc 62
icraggs 3:7a6a899de7cc 63
icraggs 3:7a6a899de7cc 64 int connect(MQTT::Client<IPStack, Countdown>::connectionLostInfo* info)
icraggs 3:7a6a899de7cc 65 {
icraggs 3:7a6a899de7cc 66 const char* hostname = "localhost"; //"m2m.eclipse.org";
icraggs 3:7a6a899de7cc 67 int port = 1883;
icraggs 3:7a6a899de7cc 68 printf("Connecting to %s:%d\n", hostname, port);
icraggs 3:7a6a899de7cc 69 int rc = info->network->connect(hostname, port);
icraggs 3:7a6a899de7cc 70 if (rc != 0)
icraggs 3:7a6a899de7cc 71 printf("rc from TCP connect is %d\n", rc);
icraggs 3:7a6a899de7cc 72
icraggs 3:7a6a899de7cc 73 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
icraggs 3:7a6a899de7cc 74 data.MQTTVersion = 3;
icraggs 3:7a6a899de7cc 75 data.clientID.cstring = (char*)"mbed-icraggs";
icraggs 3:7a6a899de7cc 76 rc = info->client->connect(&data);
icraggs 3:7a6a899de7cc 77 if (rc != 0)
icraggs 3:7a6a899de7cc 78 printf("rc from MQTT connect is %d\n", rc);
icraggs 3:7a6a899de7cc 79
icraggs 3:7a6a899de7cc 80 return rc;
icraggs 3:7a6a899de7cc 81 }
icraggs 3:7a6a899de7cc 82
icraggs 3:7a6a899de7cc 83
icraggs 3:7a6a899de7cc 84 int main(int argc, char* argv[])
icraggs 3:7a6a899de7cc 85 {
icraggs 3:7a6a899de7cc 86 IPStack ipstack = IPStack();
icraggs 3:7a6a899de7cc 87 float version = 0.3;
icraggs 3:7a6a899de7cc 88 const char* topic = "mbed-sample";
icraggs 3:7a6a899de7cc 89
icraggs 3:7a6a899de7cc 90 printf("Version is %f\n", version);
icraggs 3:7a6a899de7cc 91
icraggs 3:7a6a899de7cc 92 MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);
icraggs 3:7a6a899de7cc 93
icraggs 3:7a6a899de7cc 94 client.setConnectionLostHandler(connect);
icraggs 3:7a6a899de7cc 95
icraggs 3:7a6a899de7cc 96 MQTT::Client<IPStack, Countdown>::connectionLostInfo info = {&client, &ipstack};
icraggs 3:7a6a899de7cc 97 int rc = connect(&info);
icraggs 3:7a6a899de7cc 98
icraggs 3:7a6a899de7cc 99 rc = client.subscribe(topic, MQTT::QOS2, messageArrived);
icraggs 3:7a6a899de7cc 100 if (rc != 0)
icraggs 3:7a6a899de7cc 101 printf("rc from MQTT subscribe is %d\n", rc);
icraggs 3:7a6a899de7cc 102
icraggs 3:7a6a899de7cc 103 MQTT::Message message;
icraggs 3:7a6a899de7cc 104
icraggs 3:7a6a899de7cc 105 // QoS 0
icraggs 3:7a6a899de7cc 106 char buf[100];
icraggs 3:7a6a899de7cc 107 sprintf(buf, "Hello World! QoS 0 message from app version %f", version);
icraggs 3:7a6a899de7cc 108 message.qos = MQTT::QOS0;
icraggs 3:7a6a899de7cc 109 message.retained = false;
icraggs 3:7a6a899de7cc 110 message.dup = false;
icraggs 3:7a6a899de7cc 111 message.payload = (void*)buf;
icraggs 3:7a6a899de7cc 112 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 113 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 114 while (arrivedcount == 0)
icraggs 3:7a6a899de7cc 115 client.yield(100);
icraggs 3:7a6a899de7cc 116
icraggs 3:7a6a899de7cc 117 // QoS 1
icraggs 3:7a6a899de7cc 118 printf("Now QoS 1\n");
icraggs 3:7a6a899de7cc 119 sprintf(buf, "Hello World! QoS 1 message from app version %f", version);
icraggs 3:7a6a899de7cc 120 message.qos = MQTT::QOS1;
icraggs 3:7a6a899de7cc 121 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 122 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 123 while (arrivedcount == 1)
icraggs 3:7a6a899de7cc 124 client.yield(100);
icraggs 3:7a6a899de7cc 125
icraggs 3:7a6a899de7cc 126 // QoS 2
icraggs 3:7a6a899de7cc 127 sprintf(buf, "Hello World! QoS 2 message from app version %f", version);
icraggs 3:7a6a899de7cc 128 message.qos = MQTT::QOS2;
icraggs 3:7a6a899de7cc 129 message.payloadlen = strlen(buf)+1;
icraggs 3:7a6a899de7cc 130 rc = client.publish(topic, &message);
icraggs 3:7a6a899de7cc 131 while (arrivedcount == 2)
icraggs 3:7a6a899de7cc 132 client.yield(100);
icraggs 3:7a6a899de7cc 133
icraggs 3:7a6a899de7cc 134 rc = client.unsubscribe(topic);
icraggs 3:7a6a899de7cc 135 if (rc != 0)
icraggs 3:7a6a899de7cc 136 printf("rc from unsubscribe was %d\n", rc);
icraggs 3:7a6a899de7cc 137
icraggs 3:7a6a899de7cc 138 rc = client.disconnect();
icraggs 3:7a6a899de7cc 139 if (rc != 0)
icraggs 3:7a6a899de7cc 140 printf("rc from disconnect was %d\n", rc);
icraggs 3:7a6a899de7cc 141
icraggs 3:7a6a899de7cc 142 ipstack.disconnect();
icraggs 3:7a6a899de7cc 143
icraggs 3:7a6a899de7cc 144 printf("Finishing with %d messages received\n", arrivedcount);
icraggs 3:7a6a899de7cc 145
icraggs 3:7a6a899de7cc 146 return 0;
icraggs 3:7a6a899de7cc 147 }
icraggs 3:7a6a899de7cc 148
icraggs 3:7a6a899de7cc 149 #endif