This example program uses HiveMQ Broker (http://www.mqtt-dashboard.com/index.html) to both publish andsubscribe to topics.

Dependencies:   WNCInterface mbed-rtos mbed

See the README for details on this example program. NOTE: When started, the program can take up to 40 seconds before it will respond. This delay is the time required for the WNC Data Module to connect with the network.

Committer:
root@developer-sjc-cyan-compiler.local.mbed.org
Date:
Thu Nov 17 18:46:00 2016 +0000
Revision:
4:4e31afefdf81
Parent:
3:9413314f8017
Added tag att_cellular_K64_wnc_14A2A_20161117 for changeset 9413314f8017

Who changed what in which revision?

UserRevisionLine numberNew contents of line
JMF 0:73334e2a82be 1 //#define MQTT_DEBUG
JMF 0:73334e2a82be 2
JMF 0:73334e2a82be 3 #include "mbed.h"
JMF 0:73334e2a82be 4 #include "MQTTClient.h"
JMF 0:73334e2a82be 5 #include "MQTTFormat.h"
JMF 0:73334e2a82be 6
JMF 0:73334e2a82be 7 //#include "MQTTEthernet.h"
JMF 0:73334e2a82be 8 #include "MQTTWNCInterface.h"
JMF 0:73334e2a82be 9 #include "rtos.h"
JMF 0:73334e2a82be 10 #include "k64f.h"
JMF 0:73334e2a82be 11
JMF 0:73334e2a82be 12
JMF 0:73334e2a82be 13 // connect options for MQTT broker
JMF 0:73334e2a82be 14 #define BROKER "broker.hivemq.com" // MQTT broker URL
JMF 0:73334e2a82be 15 #define PORT 1883 // MQTT broker port number
JMF 0:73334e2a82be 16 #define CLIENTID "96430312d8f7" // use K64F MAC address without colons
JMF 0:73334e2a82be 17 #define USERNAME "" // not required for MQTT Dashboard public broker
JMF 0:73334e2a82be 18 #define PASSWORD "" // not required for MQTT Dashboard public broker
JMF 0:73334e2a82be 19 #define TOPIC "jmf/Test1" // MQTT topic
JMF 0:73334e2a82be 20
JMF 0:73334e2a82be 21 Queue<uint32_t, 6> messageQ;
JMF 1:368874dc3385 22 MODSERIAL pc(USBTX,USBRX,256,256);
JMF 1:368874dc3385 23
JMF 0:73334e2a82be 24 struct rcvd_errs{
JMF 0:73334e2a82be 25 int err;
JMF 0:73334e2a82be 26 char *er;
JMF 0:73334e2a82be 27 };
JMF 0:73334e2a82be 28
JMF 0:73334e2a82be 29 rcvd_errs response[] = {
JMF 0:73334e2a82be 30 200, "200 OK - Request has succeeded.",
JMF 0:73334e2a82be 31 201, "201 Created - Request has been fulfilled and a new resource created.",
JMF 0:73334e2a82be 32 202, "202 Accepted - The request has been accepted for processing, but the processing will be completed asynchronously",
JMF 0:73334e2a82be 33 204, "204 No Content - The server has fulfilled the request but does not need to return an entity-body.",
JMF 0:73334e2a82be 34 400, "400 Bad Request - Bad request (e.g. sending an array when the API is expecting a hash.)",
JMF 0:73334e2a82be 35 401, "401 Unauthorized - No valid API key provided.",
JMF 0:73334e2a82be 36 403, "403 Forbidden - You tried to access a disabled device, or your API key is not allowed to access that resource, etc.",
JMF 0:73334e2a82be 37 404, "404 Not Found - The requested item could not be found.",
JMF 0:73334e2a82be 38 405, "405 Method Not Allowed - The HTTP method specified is not allowed.",
JMF 0:73334e2a82be 39 415, "415 Unsupported Media Type - The requested media type is currently not supported.",
JMF 0:73334e2a82be 40 422, "422 Unprocessable Entity - Can result from sending invalid fields.",
JMF 0:73334e2a82be 41 429, "429 Too Many Requests - The user has sent too many requests in a given period of time.",
JMF 0:73334e2a82be 42 500, "500 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 43 502, "502 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 44 503, "503 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 45 504, "504 Server errors - Something went wrong in the M2X server",
JMF 0:73334e2a82be 46 };
JMF 0:73334e2a82be 47 #define RCMAX sizeof(response)/sizeof(rcvd_errs)
JMF 0:73334e2a82be 48
JMF 0:73334e2a82be 49 char * response_str(int rc) {
JMF 0:73334e2a82be 50 static char *unkown = "Unknown error code...";
JMF 0:73334e2a82be 51 int i=0;
JMF 0:73334e2a82be 52 while( response[i].err != rc && i < RCMAX)
JMF 0:73334e2a82be 53 i++;
JMF 0:73334e2a82be 54 return (i<RCMAX? response[i].er : unkown);
JMF 0:73334e2a82be 55 }
JMF 0:73334e2a82be 56
JMF 0:73334e2a82be 57 // LED color control function
JMF 0:73334e2a82be 58 void controlLED(color_t led_color) {
JMF 0:73334e2a82be 59 switch(led_color) {
JMF 0:73334e2a82be 60 case red :
JMF 0:73334e2a82be 61 greenLED = blueLED = 1;
JMF 0:73334e2a82be 62 redLED = 0.7;
JMF 0:73334e2a82be 63 break;
JMF 0:73334e2a82be 64 case green :
JMF 0:73334e2a82be 65 redLED = blueLED = 1;
JMF 0:73334e2a82be 66 greenLED = 0.7;
JMF 0:73334e2a82be 67 break;
JMF 0:73334e2a82be 68 case blue :
JMF 0:73334e2a82be 69 redLED = greenLED = 1;
JMF 0:73334e2a82be 70 blueLED = 0.7;
JMF 0:73334e2a82be 71 break;
JMF 0:73334e2a82be 72 case off :
JMF 0:73334e2a82be 73 redLED = greenLED = blueLED = 1;
JMF 0:73334e2a82be 74 break;
JMF 0:73334e2a82be 75 }
JMF 0:73334e2a82be 76 }
JMF 0:73334e2a82be 77
JMF 0:73334e2a82be 78 // Switch 2 interrupt handler
JMF 0:73334e2a82be 79 void sw2_ISR(void) {
JMF 0:73334e2a82be 80 messageQ.put((uint32_t*)22);
JMF 0:73334e2a82be 81 }
JMF 0:73334e2a82be 82
JMF 0:73334e2a82be 83 // Switch3 interrupt handler
JMF 0:73334e2a82be 84 void sw3_ISR(void) {
JMF 0:73334e2a82be 85 messageQ.put((uint32_t*)33);
JMF 0:73334e2a82be 86 }
JMF 0:73334e2a82be 87
JMF 0:73334e2a82be 88 // MQTT message arrived callback function
JMF 0:73334e2a82be 89 void messageArrived(MQTT::MessageData& md) {
JMF 0:73334e2a82be 90 MQTT::Message &message = md.message;
JMF 2:cedbc9de0521 91 pc.printf("Receiving MQTT message: %.*s\r\n", message.payloadlen, (char*)message.payload);
JMF 0:73334e2a82be 92
JMF 0:73334e2a82be 93 if (message.payloadlen == 3) {
JMF 0:73334e2a82be 94 if (strncmp((char*)message.payload, "red", 3) == 0)
JMF 0:73334e2a82be 95 controlLED(red);
JMF 0:73334e2a82be 96
JMF 0:73334e2a82be 97 else if(strncmp((char*)message.payload, "grn", 3) == 0)
JMF 0:73334e2a82be 98 controlLED(green);
JMF 0:73334e2a82be 99
JMF 0:73334e2a82be 100 else if(strncmp((char*)message.payload, "blu", 3) == 0)
JMF 0:73334e2a82be 101 controlLED(blue);
JMF 0:73334e2a82be 102
JMF 0:73334e2a82be 103 else if(strncmp((char*)message.payload, "off", 3) == 0)
JMF 0:73334e2a82be 104 controlLED(off);
JMF 0:73334e2a82be 105 }
JMF 0:73334e2a82be 106 }
JMF 0:73334e2a82be 107
JMF 0:73334e2a82be 108 int main() {
JMF 0:73334e2a82be 109 int rc, good = 0;
JMF 0:73334e2a82be 110 Timer tmr;
JMF 0:73334e2a82be 111 char* topic = TOPIC;
JMF 0:73334e2a82be 112 // turn off LED
JMF 0:73334e2a82be 113 controlLED(blue);
JMF 1:368874dc3385 114
JMF 1:368874dc3385 115 pc.baud(115200);
JMF 3:9413314f8017 116 pc.printf("\r\n\r\nWelcome to the K64F MQTT Demo!\r\n");
JMF 1:368874dc3385 117
JMF 0:73334e2a82be 118 // set SW2 and SW3 to generate interrupt on falling edge
JMF 0:73334e2a82be 119 switch2.fall(&sw2_ISR);
JMF 0:73334e2a82be 120 switch3.fall(&sw3_ISR);
JMF 0:73334e2a82be 121
JMF 0:73334e2a82be 122 // initialize ethernet interface
JMF 0:73334e2a82be 123 MQTTwnc ipstack = MQTTwnc();
JMF 0:73334e2a82be 124
JMF 0:73334e2a82be 125 // get and display client network info
JMF 0:73334e2a82be 126 WNCInterface& eth = ipstack.getEth();
JMF 0:73334e2a82be 127
JMF 0:73334e2a82be 128 // construct the MQTT client
JMF 0:73334e2a82be 129 MQTT::Client<MQTTwnc, Countdown> client = MQTT::Client<MQTTwnc, Countdown>(ipstack);
JMF 0:73334e2a82be 130
JMF 0:73334e2a82be 131 char* hostname = BROKER;
JMF 0:73334e2a82be 132 int port = PORT;
JMF 0:73334e2a82be 133
JMF 2:cedbc9de0521 134 pc.printf("\r\nConnected to local network...\r\n");
JMF 2:cedbc9de0521 135 pc.printf("IP address is %s\r\n", eth.getIPAddress());
JMF 2:cedbc9de0521 136 pc.printf("MAC address is %s\r\n", eth.getMACAddress());
JMF 2:cedbc9de0521 137 pc.printf("Gateway address is %s\r\n", eth.getGateway());
JMF 0:73334e2a82be 138
JMF 0:73334e2a82be 139 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
JMF 0:73334e2a82be 140 int tries;
JMF 0:73334e2a82be 141
JMF 0:73334e2a82be 142 while( !good ) {
JMF 0:73334e2a82be 143 tries=0;
JMF 0:73334e2a82be 144 // connect to TCP socket and check return code
JMF 0:73334e2a82be 145 tmr.start();
JMF 0:73334e2a82be 146 rc = 1;
JMF 0:73334e2a82be 147 while( rc && tries < 3) {
JMF 2:cedbc9de0521 148 pc.printf("\r\n\r\nAttempting TCP connect to %s:%d: ", hostname, port);
JMF 0:73334e2a82be 149 rc = ipstack.connect(hostname, port);
JMF 0:73334e2a82be 150 if( rc ) {
JMF 2:cedbc9de0521 151 pc.printf("Failed!!\r\n");
JMF 0:73334e2a82be 152 while( tmr.read_ms() < 5000 ) ;
JMF 0:73334e2a82be 153 tries++;
JMF 0:73334e2a82be 154 tmr.reset();
JMF 0:73334e2a82be 155 }
JMF 0:73334e2a82be 156 else {
JMF 2:cedbc9de0521 157 pc.printf("Success!\r\n");
JMF 0:73334e2a82be 158 rc = 0;
JMF 0:73334e2a82be 159 }
JMF 0:73334e2a82be 160 }
JMF 0:73334e2a82be 161
JMF 0:73334e2a82be 162 data.MQTTVersion = 3;
JMF 0:73334e2a82be 163 data.clientID.cstring = eth.getMACAddress();
JMF 0:73334e2a82be 164 // data.username.cstring = USERNAME;
JMF 0:73334e2a82be 165 // data.password.cstring = PASSWORD;
JMF 0:73334e2a82be 166
JMF 0:73334e2a82be 167 // send MQTT connect packet and check return code
JMF 0:73334e2a82be 168 rc = 1;
JMF 0:73334e2a82be 169 tmr.reset();
JMF 0:73334e2a82be 170 while( !client.isConnected() && rc && tries < 3) {
JMF 2:cedbc9de0521 171 pc.printf("Attempting (%d) MQTT connect to %s:%d: ", tries, hostname, port);
JMF 0:73334e2a82be 172 rc = client.connect(data);
JMF 0:73334e2a82be 173 if( rc ) {
JMF 2:cedbc9de0521 174 pc.printf("Connection Failed!\r\n");
JMF 0:73334e2a82be 175 while( tmr.read_ms() < 5000 );
JMF 0:73334e2a82be 176 tmr.reset();
JMF 0:73334e2a82be 177 tries++;
JMF 0:73334e2a82be 178 }
JMF 0:73334e2a82be 179 else
JMF 2:cedbc9de0521 180 pc.printf("Connected!\r\n");
JMF 0:73334e2a82be 181 }
JMF 0:73334e2a82be 182
JMF 0:73334e2a82be 183 // subscribe to MQTT topic
JMF 0:73334e2a82be 184 tmr.reset();
JMF 0:73334e2a82be 185 rc = 1;
JMF 0:73334e2a82be 186 while( rc && client.isConnected() && tries < 3) {
JMF 2:cedbc9de0521 187 pc.printf("We are %s, Subscribing to MQTT topic %s (%d): ", client.isConnected()?"connected":"NOT CONNECTED",topic,tries);
JMF 0:73334e2a82be 188 rc = client.subscribe(topic, MQTT::QOS0, messageArrived);
JMF 0:73334e2a82be 189 if( rc ) {
JMF 2:cedbc9de0521 190 pc.printf("Subscribe request failed!\r\n");
JMF 0:73334e2a82be 191 while( tmr.read_ms() < 5000 );
JMF 0:73334e2a82be 192 tries++;
JMF 0:73334e2a82be 193 tmr.reset();
JMF 0:73334e2a82be 194 }
JMF 0:73334e2a82be 195 else {
JMF 0:73334e2a82be 196 good=1;
JMF 2:cedbc9de0521 197 pc.printf("Subscribe successful!\r\n");
JMF 0:73334e2a82be 198 }
JMF 0:73334e2a82be 199 }
JMF 0:73334e2a82be 200 }
JMF 0:73334e2a82be 201
JMF 0:73334e2a82be 202 MQTT::Message message;
JMF 0:73334e2a82be 203 char buf[100];
JMF 0:73334e2a82be 204 message.qos = MQTT::QOS0;
JMF 0:73334e2a82be 205 message.retained = false;
JMF 0:73334e2a82be 206 message.dup = false;
JMF 0:73334e2a82be 207 message.payload = (void*)buf;
JMF 0:73334e2a82be 208 message.payloadlen = strlen(buf)+1;
JMF 0:73334e2a82be 209
JMF 0:73334e2a82be 210 while(true) {
JMF 0:73334e2a82be 211 osEvent switchEvent = messageQ.get(100);
JMF 0:73334e2a82be 212
JMF 0:73334e2a82be 213 if (switchEvent.value.v == 22 || switchEvent.value.v == 33) {
JMF 0:73334e2a82be 214 switch(switchEvent.value.v) {
JMF 0:73334e2a82be 215 case 22 :
JMF 0:73334e2a82be 216 sprintf(buf, "sw2");
JMF 0:73334e2a82be 217 break;
JMF 0:73334e2a82be 218 case 33 :
JMF 0:73334e2a82be 219 sprintf(buf, "sw3");
JMF 0:73334e2a82be 220 break;
JMF 0:73334e2a82be 221 }
JMF 2:cedbc9de0521 222 pc.printf("Publishing MQTT message: %s (%d)\r\n", (char*)message.payload,
JMF 0:73334e2a82be 223 message.payloadlen);
JMF 0:73334e2a82be 224 rc = client.publish(topic, message);
JMF 0:73334e2a82be 225 if( rc ) {
JMF 2:cedbc9de0521 226 pc.printf("Publish request failed! (%d)\r\n",rc);
JMF 0:73334e2a82be 227 }
JMF 0:73334e2a82be 228 else {
JMF 2:cedbc9de0521 229 pc.printf("Publish successful!\r\n");
JMF 0:73334e2a82be 230 client.yield(100);
JMF 0:73334e2a82be 231 }
JMF 0:73334e2a82be 232 }
JMF 0:73334e2a82be 233 else {
JMF 0:73334e2a82be 234 client.yield(100);
JMF 0:73334e2a82be 235 }
JMF 0:73334e2a82be 236 }
JMF 0:73334e2a82be 237 }
JMF 0:73334e2a82be 238