This example program uses HiveMQ Broker (http://www.mqtt-dashboard.com/index.html) to both publish andsubscribe to topics.
Dependencies: WNCInterface mbed-rtos mbed
See the README for details on this example program. NOTE: When started, the program can take up to 40 seconds before it will respond. This delay is the time required for the WNC Data Module to connect with the network.
main.cpp@4:4e31afefdf81, 2016-11-17 (annotated)
- Committer:
- root@developer-sjc-cyan-compiler.local.mbed.org
- Date:
- Thu Nov 17 18:46:00 2016 +0000
- Revision:
- 4:4e31afefdf81
- Parent:
- 3:9413314f8017
Added tag att_cellular_K64_wnc_14A2A_20161117 for changeset 9413314f8017
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
JMF | 0:73334e2a82be | 1 | //#define MQTT_DEBUG |
JMF | 0:73334e2a82be | 2 | |
JMF | 0:73334e2a82be | 3 | #include "mbed.h" |
JMF | 0:73334e2a82be | 4 | #include "MQTTClient.h" |
JMF | 0:73334e2a82be | 5 | #include "MQTTFormat.h" |
JMF | 0:73334e2a82be | 6 | |
JMF | 0:73334e2a82be | 7 | //#include "MQTTEthernet.h" |
JMF | 0:73334e2a82be | 8 | #include "MQTTWNCInterface.h" |
JMF | 0:73334e2a82be | 9 | #include "rtos.h" |
JMF | 0:73334e2a82be | 10 | #include "k64f.h" |
JMF | 0:73334e2a82be | 11 | |
JMF | 0:73334e2a82be | 12 | |
JMF | 0:73334e2a82be | 13 | // connect options for MQTT broker |
JMF | 0:73334e2a82be | 14 | #define BROKER "broker.hivemq.com" // MQTT broker URL |
JMF | 0:73334e2a82be | 15 | #define PORT 1883 // MQTT broker port number |
JMF | 0:73334e2a82be | 16 | #define CLIENTID "96430312d8f7" // use K64F MAC address without colons |
JMF | 0:73334e2a82be | 17 | #define USERNAME "" // not required for MQTT Dashboard public broker |
JMF | 0:73334e2a82be | 18 | #define PASSWORD "" // not required for MQTT Dashboard public broker |
JMF | 0:73334e2a82be | 19 | #define TOPIC "jmf/Test1" // MQTT topic |
JMF | 0:73334e2a82be | 20 | |
JMF | 0:73334e2a82be | 21 | Queue<uint32_t, 6> messageQ; |
JMF | 1:368874dc3385 | 22 | MODSERIAL pc(USBTX,USBRX,256,256); |
JMF | 1:368874dc3385 | 23 | |
JMF | 0:73334e2a82be | 24 | struct rcvd_errs{ |
JMF | 0:73334e2a82be | 25 | int err; |
JMF | 0:73334e2a82be | 26 | char *er; |
JMF | 0:73334e2a82be | 27 | }; |
JMF | 0:73334e2a82be | 28 | |
JMF | 0:73334e2a82be | 29 | rcvd_errs response[] = { |
JMF | 0:73334e2a82be | 30 | 200, "200 OK - Request has succeeded.", |
JMF | 0:73334e2a82be | 31 | 201, "201 Created - Request has been fulfilled and a new resource created.", |
JMF | 0:73334e2a82be | 32 | 202, "202 Accepted - The request has been accepted for processing, but the processing will be completed asynchronously", |
JMF | 0:73334e2a82be | 33 | 204, "204 No Content - The server has fulfilled the request but does not need to return an entity-body.", |
JMF | 0:73334e2a82be | 34 | 400, "400 Bad Request - Bad request (e.g. sending an array when the API is expecting a hash.)", |
JMF | 0:73334e2a82be | 35 | 401, "401 Unauthorized - No valid API key provided.", |
JMF | 0:73334e2a82be | 36 | 403, "403 Forbidden - You tried to access a disabled device, or your API key is not allowed to access that resource, etc.", |
JMF | 0:73334e2a82be | 37 | 404, "404 Not Found - The requested item could not be found.", |
JMF | 0:73334e2a82be | 38 | 405, "405 Method Not Allowed - The HTTP method specified is not allowed.", |
JMF | 0:73334e2a82be | 39 | 415, "415 Unsupported Media Type - The requested media type is currently not supported.", |
JMF | 0:73334e2a82be | 40 | 422, "422 Unprocessable Entity - Can result from sending invalid fields.", |
JMF | 0:73334e2a82be | 41 | 429, "429 Too Many Requests - The user has sent too many requests in a given period of time.", |
JMF | 0:73334e2a82be | 42 | 500, "500 Server errors - Something went wrong in the M2X server", |
JMF | 0:73334e2a82be | 43 | 502, "502 Server errors - Something went wrong in the M2X server", |
JMF | 0:73334e2a82be | 44 | 503, "503 Server errors - Something went wrong in the M2X server", |
JMF | 0:73334e2a82be | 45 | 504, "504 Server errors - Something went wrong in the M2X server", |
JMF | 0:73334e2a82be | 46 | }; |
JMF | 0:73334e2a82be | 47 | #define RCMAX sizeof(response)/sizeof(rcvd_errs) |
JMF | 0:73334e2a82be | 48 | |
JMF | 0:73334e2a82be | 49 | char * response_str(int rc) { |
JMF | 0:73334e2a82be | 50 | static char *unkown = "Unknown error code..."; |
JMF | 0:73334e2a82be | 51 | int i=0; |
JMF | 0:73334e2a82be | 52 | while( response[i].err != rc && i < RCMAX) |
JMF | 0:73334e2a82be | 53 | i++; |
JMF | 0:73334e2a82be | 54 | return (i<RCMAX? response[i].er : unkown); |
JMF | 0:73334e2a82be | 55 | } |
JMF | 0:73334e2a82be | 56 | |
JMF | 0:73334e2a82be | 57 | // LED color control function |
JMF | 0:73334e2a82be | 58 | void controlLED(color_t led_color) { |
JMF | 0:73334e2a82be | 59 | switch(led_color) { |
JMF | 0:73334e2a82be | 60 | case red : |
JMF | 0:73334e2a82be | 61 | greenLED = blueLED = 1; |
JMF | 0:73334e2a82be | 62 | redLED = 0.7; |
JMF | 0:73334e2a82be | 63 | break; |
JMF | 0:73334e2a82be | 64 | case green : |
JMF | 0:73334e2a82be | 65 | redLED = blueLED = 1; |
JMF | 0:73334e2a82be | 66 | greenLED = 0.7; |
JMF | 0:73334e2a82be | 67 | break; |
JMF | 0:73334e2a82be | 68 | case blue : |
JMF | 0:73334e2a82be | 69 | redLED = greenLED = 1; |
JMF | 0:73334e2a82be | 70 | blueLED = 0.7; |
JMF | 0:73334e2a82be | 71 | break; |
JMF | 0:73334e2a82be | 72 | case off : |
JMF | 0:73334e2a82be | 73 | redLED = greenLED = blueLED = 1; |
JMF | 0:73334e2a82be | 74 | break; |
JMF | 0:73334e2a82be | 75 | } |
JMF | 0:73334e2a82be | 76 | } |
JMF | 0:73334e2a82be | 77 | |
JMF | 0:73334e2a82be | 78 | // Switch 2 interrupt handler |
JMF | 0:73334e2a82be | 79 | void sw2_ISR(void) { |
JMF | 0:73334e2a82be | 80 | messageQ.put((uint32_t*)22); |
JMF | 0:73334e2a82be | 81 | } |
JMF | 0:73334e2a82be | 82 | |
JMF | 0:73334e2a82be | 83 | // Switch3 interrupt handler |
JMF | 0:73334e2a82be | 84 | void sw3_ISR(void) { |
JMF | 0:73334e2a82be | 85 | messageQ.put((uint32_t*)33); |
JMF | 0:73334e2a82be | 86 | } |
JMF | 0:73334e2a82be | 87 | |
JMF | 0:73334e2a82be | 88 | // MQTT message arrived callback function |
JMF | 0:73334e2a82be | 89 | void messageArrived(MQTT::MessageData& md) { |
JMF | 0:73334e2a82be | 90 | MQTT::Message &message = md.message; |
JMF | 2:cedbc9de0521 | 91 | pc.printf("Receiving MQTT message: %.*s\r\n", message.payloadlen, (char*)message.payload); |
JMF | 0:73334e2a82be | 92 | |
JMF | 0:73334e2a82be | 93 | if (message.payloadlen == 3) { |
JMF | 0:73334e2a82be | 94 | if (strncmp((char*)message.payload, "red", 3) == 0) |
JMF | 0:73334e2a82be | 95 | controlLED(red); |
JMF | 0:73334e2a82be | 96 | |
JMF | 0:73334e2a82be | 97 | else if(strncmp((char*)message.payload, "grn", 3) == 0) |
JMF | 0:73334e2a82be | 98 | controlLED(green); |
JMF | 0:73334e2a82be | 99 | |
JMF | 0:73334e2a82be | 100 | else if(strncmp((char*)message.payload, "blu", 3) == 0) |
JMF | 0:73334e2a82be | 101 | controlLED(blue); |
JMF | 0:73334e2a82be | 102 | |
JMF | 0:73334e2a82be | 103 | else if(strncmp((char*)message.payload, "off", 3) == 0) |
JMF | 0:73334e2a82be | 104 | controlLED(off); |
JMF | 0:73334e2a82be | 105 | } |
JMF | 0:73334e2a82be | 106 | } |
JMF | 0:73334e2a82be | 107 | |
JMF | 0:73334e2a82be | 108 | int main() { |
JMF | 0:73334e2a82be | 109 | int rc, good = 0; |
JMF | 0:73334e2a82be | 110 | Timer tmr; |
JMF | 0:73334e2a82be | 111 | char* topic = TOPIC; |
JMF | 0:73334e2a82be | 112 | // turn off LED |
JMF | 0:73334e2a82be | 113 | controlLED(blue); |
JMF | 1:368874dc3385 | 114 | |
JMF | 1:368874dc3385 | 115 | pc.baud(115200); |
JMF | 3:9413314f8017 | 116 | pc.printf("\r\n\r\nWelcome to the K64F MQTT Demo!\r\n"); |
JMF | 1:368874dc3385 | 117 | |
JMF | 0:73334e2a82be | 118 | // set SW2 and SW3 to generate interrupt on falling edge |
JMF | 0:73334e2a82be | 119 | switch2.fall(&sw2_ISR); |
JMF | 0:73334e2a82be | 120 | switch3.fall(&sw3_ISR); |
JMF | 0:73334e2a82be | 121 | |
JMF | 0:73334e2a82be | 122 | // initialize ethernet interface |
JMF | 0:73334e2a82be | 123 | MQTTwnc ipstack = MQTTwnc(); |
JMF | 0:73334e2a82be | 124 | |
JMF | 0:73334e2a82be | 125 | // get and display client network info |
JMF | 0:73334e2a82be | 126 | WNCInterface& eth = ipstack.getEth(); |
JMF | 0:73334e2a82be | 127 | |
JMF | 0:73334e2a82be | 128 | // construct the MQTT client |
JMF | 0:73334e2a82be | 129 | MQTT::Client<MQTTwnc, Countdown> client = MQTT::Client<MQTTwnc, Countdown>(ipstack); |
JMF | 0:73334e2a82be | 130 | |
JMF | 0:73334e2a82be | 131 | char* hostname = BROKER; |
JMF | 0:73334e2a82be | 132 | int port = PORT; |
JMF | 0:73334e2a82be | 133 | |
JMF | 2:cedbc9de0521 | 134 | pc.printf("\r\nConnected to local network...\r\n"); |
JMF | 2:cedbc9de0521 | 135 | pc.printf("IP address is %s\r\n", eth.getIPAddress()); |
JMF | 2:cedbc9de0521 | 136 | pc.printf("MAC address is %s\r\n", eth.getMACAddress()); |
JMF | 2:cedbc9de0521 | 137 | pc.printf("Gateway address is %s\r\n", eth.getGateway()); |
JMF | 0:73334e2a82be | 138 | |
JMF | 0:73334e2a82be | 139 | MQTTPacket_connectData data = MQTTPacket_connectData_initializer; |
JMF | 0:73334e2a82be | 140 | int tries; |
JMF | 0:73334e2a82be | 141 | |
JMF | 0:73334e2a82be | 142 | while( !good ) { |
JMF | 0:73334e2a82be | 143 | tries=0; |
JMF | 0:73334e2a82be | 144 | // connect to TCP socket and check return code |
JMF | 0:73334e2a82be | 145 | tmr.start(); |
JMF | 0:73334e2a82be | 146 | rc = 1; |
JMF | 0:73334e2a82be | 147 | while( rc && tries < 3) { |
JMF | 2:cedbc9de0521 | 148 | pc.printf("\r\n\r\nAttempting TCP connect to %s:%d: ", hostname, port); |
JMF | 0:73334e2a82be | 149 | rc = ipstack.connect(hostname, port); |
JMF | 0:73334e2a82be | 150 | if( rc ) { |
JMF | 2:cedbc9de0521 | 151 | pc.printf("Failed!!\r\n"); |
JMF | 0:73334e2a82be | 152 | while( tmr.read_ms() < 5000 ) ; |
JMF | 0:73334e2a82be | 153 | tries++; |
JMF | 0:73334e2a82be | 154 | tmr.reset(); |
JMF | 0:73334e2a82be | 155 | } |
JMF | 0:73334e2a82be | 156 | else { |
JMF | 2:cedbc9de0521 | 157 | pc.printf("Success!\r\n"); |
JMF | 0:73334e2a82be | 158 | rc = 0; |
JMF | 0:73334e2a82be | 159 | } |
JMF | 0:73334e2a82be | 160 | } |
JMF | 0:73334e2a82be | 161 | |
JMF | 0:73334e2a82be | 162 | data.MQTTVersion = 3; |
JMF | 0:73334e2a82be | 163 | data.clientID.cstring = eth.getMACAddress(); |
JMF | 0:73334e2a82be | 164 | // data.username.cstring = USERNAME; |
JMF | 0:73334e2a82be | 165 | // data.password.cstring = PASSWORD; |
JMF | 0:73334e2a82be | 166 | |
JMF | 0:73334e2a82be | 167 | // send MQTT connect packet and check return code |
JMF | 0:73334e2a82be | 168 | rc = 1; |
JMF | 0:73334e2a82be | 169 | tmr.reset(); |
JMF | 0:73334e2a82be | 170 | while( !client.isConnected() && rc && tries < 3) { |
JMF | 2:cedbc9de0521 | 171 | pc.printf("Attempting (%d) MQTT connect to %s:%d: ", tries, hostname, port); |
JMF | 0:73334e2a82be | 172 | rc = client.connect(data); |
JMF | 0:73334e2a82be | 173 | if( rc ) { |
JMF | 2:cedbc9de0521 | 174 | pc.printf("Connection Failed!\r\n"); |
JMF | 0:73334e2a82be | 175 | while( tmr.read_ms() < 5000 ); |
JMF | 0:73334e2a82be | 176 | tmr.reset(); |
JMF | 0:73334e2a82be | 177 | tries++; |
JMF | 0:73334e2a82be | 178 | } |
JMF | 0:73334e2a82be | 179 | else |
JMF | 2:cedbc9de0521 | 180 | pc.printf("Connected!\r\n"); |
JMF | 0:73334e2a82be | 181 | } |
JMF | 0:73334e2a82be | 182 | |
JMF | 0:73334e2a82be | 183 | // subscribe to MQTT topic |
JMF | 0:73334e2a82be | 184 | tmr.reset(); |
JMF | 0:73334e2a82be | 185 | rc = 1; |
JMF | 0:73334e2a82be | 186 | while( rc && client.isConnected() && tries < 3) { |
JMF | 2:cedbc9de0521 | 187 | pc.printf("We are %s, Subscribing to MQTT topic %s (%d): ", client.isConnected()?"connected":"NOT CONNECTED",topic,tries); |
JMF | 0:73334e2a82be | 188 | rc = client.subscribe(topic, MQTT::QOS0, messageArrived); |
JMF | 0:73334e2a82be | 189 | if( rc ) { |
JMF | 2:cedbc9de0521 | 190 | pc.printf("Subscribe request failed!\r\n"); |
JMF | 0:73334e2a82be | 191 | while( tmr.read_ms() < 5000 ); |
JMF | 0:73334e2a82be | 192 | tries++; |
JMF | 0:73334e2a82be | 193 | tmr.reset(); |
JMF | 0:73334e2a82be | 194 | } |
JMF | 0:73334e2a82be | 195 | else { |
JMF | 0:73334e2a82be | 196 | good=1; |
JMF | 2:cedbc9de0521 | 197 | pc.printf("Subscribe successful!\r\n"); |
JMF | 0:73334e2a82be | 198 | } |
JMF | 0:73334e2a82be | 199 | } |
JMF | 0:73334e2a82be | 200 | } |
JMF | 0:73334e2a82be | 201 | |
JMF | 0:73334e2a82be | 202 | MQTT::Message message; |
JMF | 0:73334e2a82be | 203 | char buf[100]; |
JMF | 0:73334e2a82be | 204 | message.qos = MQTT::QOS0; |
JMF | 0:73334e2a82be | 205 | message.retained = false; |
JMF | 0:73334e2a82be | 206 | message.dup = false; |
JMF | 0:73334e2a82be | 207 | message.payload = (void*)buf; |
JMF | 0:73334e2a82be | 208 | message.payloadlen = strlen(buf)+1; |
JMF | 0:73334e2a82be | 209 | |
JMF | 0:73334e2a82be | 210 | while(true) { |
JMF | 0:73334e2a82be | 211 | osEvent switchEvent = messageQ.get(100); |
JMF | 0:73334e2a82be | 212 | |
JMF | 0:73334e2a82be | 213 | if (switchEvent.value.v == 22 || switchEvent.value.v == 33) { |
JMF | 0:73334e2a82be | 214 | switch(switchEvent.value.v) { |
JMF | 0:73334e2a82be | 215 | case 22 : |
JMF | 0:73334e2a82be | 216 | sprintf(buf, "sw2"); |
JMF | 0:73334e2a82be | 217 | break; |
JMF | 0:73334e2a82be | 218 | case 33 : |
JMF | 0:73334e2a82be | 219 | sprintf(buf, "sw3"); |
JMF | 0:73334e2a82be | 220 | break; |
JMF | 0:73334e2a82be | 221 | } |
JMF | 2:cedbc9de0521 | 222 | pc.printf("Publishing MQTT message: %s (%d)\r\n", (char*)message.payload, |
JMF | 0:73334e2a82be | 223 | message.payloadlen); |
JMF | 0:73334e2a82be | 224 | rc = client.publish(topic, message); |
JMF | 0:73334e2a82be | 225 | if( rc ) { |
JMF | 2:cedbc9de0521 | 226 | pc.printf("Publish request failed! (%d)\r\n",rc); |
JMF | 0:73334e2a82be | 227 | } |
JMF | 0:73334e2a82be | 228 | else { |
JMF | 2:cedbc9de0521 | 229 | pc.printf("Publish successful!\r\n"); |
JMF | 0:73334e2a82be | 230 | client.yield(100); |
JMF | 0:73334e2a82be | 231 | } |
JMF | 0:73334e2a82be | 232 | } |
JMF | 0:73334e2a82be | 233 | else { |
JMF | 0:73334e2a82be | 234 | client.yield(100); |
JMF | 0:73334e2a82be | 235 | } |
JMF | 0:73334e2a82be | 236 | } |
JMF | 0:73334e2a82be | 237 | } |
JMF | 0:73334e2a82be | 238 |