This example program uses HiveMQ Broker (http://www.mqtt-dashboard.com/index.html) to both publish andsubscribe to topics.

Dependencies:   WNCInterface mbed-rtos mbed

See the README for details on this example program. NOTE: When started, the program can take up to 40 seconds before it will respond. This delay is the time required for the WNC Data Module to connect with the network.

Committer:
JMF
Date:
Wed Sep 21 17:32:51 2016 +0000
Revision:
0:73334e2a82be
Child:
1:368874dc3385
Initial Commit;

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:73334e2a82be 1 //#define MQTT_DEBUG
JMF 0:73334e2a82be 2
JMF 0:73334e2a82be 3 #include "mbed.h"
JMF 0:73334e2a82be 4 #include "MQTTClient.h"
JMF 0:73334e2a82be 5 #include "MQTTFormat.h"
JMF 0:73334e2a82be 6
JMF 0:73334e2a82be 7 //#include "MQTTEthernet.h"
JMF 0:73334e2a82be 8 #include "MQTTWNCInterface.h"
JMF 0:73334e2a82be 9 #include "rtos.h"
JMF 0:73334e2a82be 10 #include "k64f.h"
JMF 0:73334e2a82be 11
JMF 0:73334e2a82be 12
JMF 0:73334e2a82be 13 // connect options for MQTT broker
JMF 0:73334e2a82be 14 #define BROKER "broker.hivemq.com" // MQTT broker URL
JMF 0:73334e2a82be 15 #define PORT 1883 // MQTT broker port number
JMF 0:73334e2a82be 16 #define CLIENTID "96430312d8f7" // use K64F MAC address without colons
JMF 0:73334e2a82be 17 #define USERNAME "" // not required for MQTT Dashboard public broker
JMF 0:73334e2a82be 18 #define PASSWORD "" // not required for MQTT Dashboard public broker
JMF 0:73334e2a82be 19 #define TOPIC "jmf/Test1" // MQTT topic
JMF 0:73334e2a82be 20
JMF 0:73334e2a82be 21 Queue<uint32_t, 6> messageQ;
JMF 0:73334e2a82be 22
JMF 0:73334e2a82be 23 struct rcvd_errs{
JMF 0:73334e2a82be 24 int err;
JMF 0:73334e2a82be 25 char *er;
JMF 0:73334e2a82be 26 };
JMF 0:73334e2a82be 27
JMF 0:73334e2a82be 28 rcvd_errs response[] = {
JMF 0:73334e2a82be 29 200, "200 OK - Request has succeeded.",
JMF 0:73334e2a82be 30 201, "201 Created - Request has been fulfilled and a new resource created.",
JMF 0:73334e2a82be 31 202, "202 Accepted - The request has been accepted for processing, but the processing will be completed asynchronously",
JMF 0:73334e2a82be 32 204, "204 No Content - The server has fulfilled the request but does not need to return an entity-body.",
JMF 0:73334e2a82be 33 400, "400 Bad Request - Bad request (e.g. sending an array when the API is expecting a hash.)",
JMF 0:73334e2a82be 34 401, "401 Unauthorized - No valid API key provided.",
JMF 0:73334e2a82be 35 403, "403 Forbidden - You tried to access a disabled device, or your API key is not allowed to access that resource, etc.",
JMF 0:73334e2a82be 36 404, "404 Not Found - The requested item could not be found.",
JMF 0:73334e2a82be 37 405, "405 Method Not Allowed - The HTTP method specified is not allowed.",
JMF 0:73334e2a82be 38 415, "415 Unsupported Media Type - The requested media type is currently not supported.",
JMF 0:73334e2a82be 39 422, "422 Unprocessable Entity - Can result from sending invalid fields.",
JMF 0:73334e2a82be 40 429, "429 Too Many Requests - The user has sent too many requests in a given period of time.",
JMF 0:73334e2a82be 41 500, "500 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 42 502, "502 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 43 503, "503 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 44 504, "504 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 45 };
JMF 0:73334e2a82be 46 #define RCMAX sizeof(response)/sizeof(rcvd_errs)
JMF 0:73334e2a82be 47
JMF 0:73334e2a82be 48 char * response_str(int rc) {
JMF 0:73334e2a82be 49 static char *unkown = "Unknown error code...";
JMF 0:73334e2a82be 50 int i=0;
JMF 0:73334e2a82be 51 while( response[i].err != rc && i < RCMAX)
JMF 0:73334e2a82be 52 i++;
JMF 0:73334e2a82be 53 return (i<RCMAX? response[i].er : unkown);
JMF 0:73334e2a82be 54 }
JMF 0:73334e2a82be 55
JMF 0:73334e2a82be 56 // LED color control function
JMF 0:73334e2a82be 57 void controlLED(color_t led_color) {
JMF 0:73334e2a82be 58 switch(led_color) {
JMF 0:73334e2a82be 59 case red :
JMF 0:73334e2a82be 60 greenLED = blueLED = 1;
JMF 0:73334e2a82be 61 redLED = 0.7;
JMF 0:73334e2a82be 62 break;
JMF 0:73334e2a82be 63 case green :
JMF 0:73334e2a82be 64 redLED = blueLED = 1;
JMF 0:73334e2a82be 65 greenLED = 0.7;
JMF 0:73334e2a82be 66 break;
JMF 0:73334e2a82be 67 case blue :
JMF 0:73334e2a82be 68 redLED = greenLED = 1;
JMF 0:73334e2a82be 69 blueLED = 0.7;
JMF 0:73334e2a82be 70 break;
JMF 0:73334e2a82be 71 case off :
JMF 0:73334e2a82be 72 redLED = greenLED = blueLED = 1;
JMF 0:73334e2a82be 73 break;
JMF 0:73334e2a82be 74 }
JMF 0:73334e2a82be 75 }
JMF 0:73334e2a82be 76
JMF 0:73334e2a82be 77 // Switch 2 interrupt handler
JMF 0:73334e2a82be 78 void sw2_ISR(void) {
JMF 0:73334e2a82be 79 messageQ.put((uint32_t*)22);
JMF 0:73334e2a82be 80 }
JMF 0:73334e2a82be 81
JMF 0:73334e2a82be 82 // Switch3 interrupt handler
JMF 0:73334e2a82be 83 void sw3_ISR(void) {
JMF 0:73334e2a82be 84 messageQ.put((uint32_t*)33);
JMF 0:73334e2a82be 85 }
JMF 0:73334e2a82be 86
JMF 0:73334e2a82be 87 // MQTT message arrived callback function
JMF 0:73334e2a82be 88 void messageArrived(MQTT::MessageData& md) {
JMF 0:73334e2a82be 89 MQTT::Message &message = md.message;
JMF 0:73334e2a82be 90 printf("Receiving MQTT message: %.*s\r\n", message.payloadlen, (char*)message.payload);
JMF 0:73334e2a82be 91
JMF 0:73334e2a82be 92 if (message.payloadlen == 3) {
JMF 0:73334e2a82be 93 if (strncmp((char*)message.payload, "red", 3) == 0)
JMF 0:73334e2a82be 94 controlLED(red);
JMF 0:73334e2a82be 95
JMF 0:73334e2a82be 96 else if(strncmp((char*)message.payload, "grn", 3) == 0)
JMF 0:73334e2a82be 97 controlLED(green);
JMF 0:73334e2a82be 98
JMF 0:73334e2a82be 99 else if(strncmp((char*)message.payload, "blu", 3) == 0)
JMF 0:73334e2a82be 100 controlLED(blue);
JMF 0:73334e2a82be 101
JMF 0:73334e2a82be 102 else if(strncmp((char*)message.payload, "off", 3) == 0)
JMF 0:73334e2a82be 103 controlLED(off);
JMF 0:73334e2a82be 104 }
JMF 0:73334e2a82be 105 }
JMF 0:73334e2a82be 106
JMF 0:73334e2a82be 107 int main() {
JMF 0:73334e2a82be 108 int rc, good = 0;
JMF 0:73334e2a82be 109 Timer tmr;
JMF 0:73334e2a82be 110 char* topic = TOPIC;
JMF 0:73334e2a82be 111 // turn off LED
JMF 0:73334e2a82be 112 controlLED(blue);
JMF 0:73334e2a82be 113
JMF 0:73334e2a82be 114 // set SW2 and SW3 to generate interrupt on falling edge
JMF 0:73334e2a82be 115 switch2.fall(&sw2_ISR);
JMF 0:73334e2a82be 116 switch3.fall(&sw3_ISR);
JMF 0:73334e2a82be 117
JMF 0:73334e2a82be 118 // initialize ethernet interface
JMF 0:73334e2a82be 119 MQTTwnc ipstack = MQTTwnc();
JMF 0:73334e2a82be 120
JMF 0:73334e2a82be 121 // get and display client network info
JMF 0:73334e2a82be 122 WNCInterface& eth = ipstack.getEth();
JMF 0:73334e2a82be 123
JMF 0:73334e2a82be 124 // construct the MQTT client
JMF 0:73334e2a82be 125 MQTT::Client<MQTTwnc, Countdown> client = MQTT::Client<MQTTwnc, Countdown>(ipstack);
JMF 0:73334e2a82be 126
JMF 0:73334e2a82be 127 char* hostname = BROKER;
JMF 0:73334e2a82be 128 int port = PORT;
JMF 0:73334e2a82be 129
JMF 0:73334e2a82be 130 printf("\r\n\r\nWelcome to the K64F MQTT Demo!\r\n");
JMF 0:73334e2a82be 131 printf("\r\nConnected to local network...\r\n");
JMF 0:73334e2a82be 132 printf("IP address is %s\r\n", eth.getIPAddress());
JMF 0:73334e2a82be 133 printf("MAC address is %s\r\n", eth.getMACAddress());
JMF 0:73334e2a82be 134 printf("Gateway address is %s\r\n", eth.getGateway());
JMF 0:73334e2a82be 135
JMF 0:73334e2a82be 136 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
JMF 0:73334e2a82be 137 int tries;
JMF 0:73334e2a82be 138
JMF 0:73334e2a82be 139 while( !good ) {
JMF 0:73334e2a82be 140 tries=0;
JMF 0:73334e2a82be 141 // connect to TCP socket and check return code
JMF 0:73334e2a82be 142 tmr.start();
JMF 0:73334e2a82be 143 rc = 1;
JMF 0:73334e2a82be 144 while( rc && tries < 3) {
JMF 0:73334e2a82be 145 printf("\r\n\r\nAttempting TCP connect to %s:%d: ", hostname, port);
JMF 0:73334e2a82be 146 rc = ipstack.connect(hostname, port);
JMF 0:73334e2a82be 147 if( rc ) {
JMF 0:73334e2a82be 148 printf("Failed!!\r\n");
JMF 0:73334e2a82be 149 while( tmr.read_ms() < 5000 ) ;
JMF 0:73334e2a82be 150 tries++;
JMF 0:73334e2a82be 151 tmr.reset();
JMF 0:73334e2a82be 152 }
JMF 0:73334e2a82be 153 else {
JMF 0:73334e2a82be 154 printf("Success!\r\n");
JMF 0:73334e2a82be 155 rc = 0;
JMF 0:73334e2a82be 156 }
JMF 0:73334e2a82be 157 }
JMF 0:73334e2a82be 158
JMF 0:73334e2a82be 159 data.MQTTVersion = 3;
JMF 0:73334e2a82be 160 data.clientID.cstring = eth.getMACAddress();
JMF 0:73334e2a82be 161 // data.username.cstring = USERNAME;
JMF 0:73334e2a82be 162 // data.password.cstring = PASSWORD;
JMF 0:73334e2a82be 163
JMF 0:73334e2a82be 164 // send MQTT connect packet and check return code
JMF 0:73334e2a82be 165 rc = 1;
JMF 0:73334e2a82be 166 tmr.reset();
JMF 0:73334e2a82be 167 while( !client.isConnected() && rc && tries < 3) {
JMF 0:73334e2a82be 168 printf("Attempting (%d) MQTT connect to %s:%d: ", tries, hostname, port);
JMF 0:73334e2a82be 169 rc = client.connect(data);
JMF 0:73334e2a82be 170 if( rc ) {
JMF 0:73334e2a82be 171 printf("Connection Failed!\r\n");
JMF 0:73334e2a82be 172 while( tmr.read_ms() < 5000 );
JMF 0:73334e2a82be 173 tmr.reset();
JMF 0:73334e2a82be 174 tries++;
JMF 0:73334e2a82be 175 }
JMF 0:73334e2a82be 176 else
JMF 0:73334e2a82be 177 printf("Connected!\r\n");
JMF 0:73334e2a82be 178 }
JMF 0:73334e2a82be 179
JMF 0:73334e2a82be 180 // subscribe to MQTT topic
JMF 0:73334e2a82be 181 tmr.reset();
JMF 0:73334e2a82be 182 rc = 1;
JMF 0:73334e2a82be 183 while( rc && client.isConnected() && tries < 3) {
JMF 0:73334e2a82be 184 printf("We are %s, Subscribing to MQTT topic %s (%d): ", client.isConnected()?"connected":"NOT CONNECTED",topic,tries);
JMF 0:73334e2a82be 185 rc = client.subscribe(topic, MQTT::QOS0, messageArrived);
JMF 0:73334e2a82be 186 if( rc ) {
JMF 0:73334e2a82be 187 printf("Subscribe request failed!\r\n");
JMF 0:73334e2a82be 188 while( tmr.read_ms() < 5000 );
JMF 0:73334e2a82be 189 tries++;
JMF 0:73334e2a82be 190 tmr.reset();
JMF 0:73334e2a82be 191 }
JMF 0:73334e2a82be 192 else {
JMF 0:73334e2a82be 193 good=1;
JMF 0:73334e2a82be 194 printf("Subscribe successful!\r\n");
JMF 0:73334e2a82be 195 }
JMF 0:73334e2a82be 196 }
JMF 0:73334e2a82be 197 }
JMF 0:73334e2a82be 198
JMF 0:73334e2a82be 199 MQTT::Message message;
JMF 0:73334e2a82be 200 char buf[100];
JMF 0:73334e2a82be 201 message.qos = MQTT::QOS0;
JMF 0:73334e2a82be 202 message.retained = false;
JMF 0:73334e2a82be 203 message.dup = false;
JMF 0:73334e2a82be 204 message.payload = (void*)buf;
JMF 0:73334e2a82be 205 message.payloadlen = strlen(buf)+1;
JMF 0:73334e2a82be 206
JMF 0:73334e2a82be 207 while(true) {
JMF 0:73334e2a82be 208 osEvent switchEvent = messageQ.get(100);
JMF 0:73334e2a82be 209
JMF 0:73334e2a82be 210 if (switchEvent.value.v == 22 || switchEvent.value.v == 33) {
JMF 0:73334e2a82be 211 switch(switchEvent.value.v) {
JMF 0:73334e2a82be 212 case 22 :
JMF 0:73334e2a82be 213 sprintf(buf, "sw2");
JMF 0:73334e2a82be 214 break;
JMF 0:73334e2a82be 215 case 33 :
JMF 0:73334e2a82be 216 sprintf(buf, "sw3");
JMF 0:73334e2a82be 217 break;
JMF 0:73334e2a82be 218 }
JMF 0:73334e2a82be 219 printf("Publishing MQTT message: %s (%d)\r\n", (char*)message.payload,
JMF 0:73334e2a82be 220 message.payloadlen);
JMF 0:73334e2a82be 221 rc = client.publish(topic, message);
JMF 0:73334e2a82be 222 if( rc ) {
JMF 0:73334e2a82be 223 printf("Publish request failed! (%d)\r\n",rc);
JMF 0:73334e2a82be 224 }
JMF 0:73334e2a82be 225 else {
JMF 0:73334e2a82be 226 printf("Publish successful!\r\n");
JMF 0:73334e2a82be 227 client.yield(100);
JMF 0:73334e2a82be 228 }
JMF 0:73334e2a82be 229 }
JMF 0:73334e2a82be 230 else {
JMF 0:73334e2a82be 231 client.yield(100);
JMF 0:73334e2a82be 232 }
JMF 0:73334e2a82be 233 }
JMF 0:73334e2a82be 234 }
JMF 0:73334e2a82be 235