Takes in sensor readings from NRF24 connected nodes and publises the data over MQTT

Dependencies:   EthernetInterface MQTT mbed-rtos mbed nRF24L01P

Committer:
Mephi
Date:
Wed Jun 03 18:46:47 2015 +0000
Revision:
0:cec410958705
v0.1 Initial Commit. Works fine for a little while...

Who changed what in which revision?

UserRevisionLine numberNew contents of line
Mephi 0:cec410958705 1 #include "mbed.h"
Mephi 0:cec410958705 2 #include "MQTTEthernet.h"
Mephi 0:cec410958705 3 #include "MQTTClient.h"
Mephi 0:cec410958705 4 #include "nRF24L01P.h"
Mephi 0:cec410958705 5
Mephi 0:cec410958705 6 #define TRANSFER_SIZE 8
Mephi 0:cec410958705 7 const uint8_t NodeID = 1;
Mephi 0:cec410958705 8 const uint8_t Nodes = 19;
Mephi 0:cec410958705 9
Mephi 0:cec410958705 10 Serial pc(USBTX, USBRX); // tx, rx
Mephi 0:cec410958705 11 nRF24L01P my_nrf24l01p(p5, p6, p7, p8, p9, p10); // mosi, miso, sck, csn, ce, irq
Mephi 0:cec410958705 12 DigitalOut myled1(LED1);
Mephi 0:cec410958705 13 DigitalOut myled2(LED2);
Mephi 0:cec410958705 14 DigitalOut myled3(LED3);
Mephi 0:cec410958705 15 DigitalOut myled4(LED4);
Mephi 0:cec410958705 16
Mephi 0:cec410958705 17 float brightness = 0.0f;
Mephi 0:cec410958705 18 char txData[TRANSFER_SIZE], rxData[TRANSFER_SIZE], oldRxData[TRANSFER_SIZE];
Mephi 0:cec410958705 19 int arrivedcount = 0;
Mephi 0:cec410958705 20 time_t seconds;
Mephi 0:cec410958705 21
Mephi 0:cec410958705 22 struct MQTTmessage {
Mephi 0:cec410958705 23 char topic[50];
Mephi 0:cec410958705 24 char value[50];
Mephi 0:cec410958705 25 bool isNumeric;
Mephi 0:cec410958705 26 };
Mephi 0:cec410958705 27
Mephi 0:cec410958705 28 struct lastHeard {
Mephi 0:cec410958705 29 uint8_t address;
Mephi 0:cec410958705 30 time_t seconds;
Mephi 0:cec410958705 31 };
Mephi 0:cec410958705 32
Mephi 0:cec410958705 33
Mephi 0:cec410958705 34 struct lastHeard lastHeardArray[Nodes];
Mephi 0:cec410958705 35
Mephi 0:cec410958705 36 const struct MQTTtoNRF24map {
Mephi 0:cec410958705 37 uint8_t address;
Mephi 0:cec410958705 38 char topic[50];
Mephi 0:cec410958705 39 } maps[Nodes] = {
Mephi 0:cec410958705 40 {100, "temperature/study"},
Mephi 0:cec410958705 41 {101, "temperature/entrance"},
Mephi 0:cec410958705 42 {102, "temperature/hall"},
Mephi 0:cec410958705 43 {103, "temperature/lounge"},
Mephi 0:cec410958705 44 {104, "temperature/utility"},
Mephi 0:cec410958705 45 {105, "temperature/kitchen"},
Mephi 0:cec410958705 46 {106, "temperature/toilet"},
Mephi 0:cec410958705 47 {107, "temperature/landing"},
Mephi 0:cec410958705 48 {108, "temperature/mainBedroom"},
Mephi 0:cec410958705 49 {109, "temperature/samBedroom"},
Mephi 0:cec410958705 50 {110, "temperature/meganBedroom"},
Mephi 0:cec410958705 51 {111, "temperature/spareBedroom"},
Mephi 0:cec410958705 52 {112, "temperature/bathroom"},
Mephi 0:cec410958705 53 {113, "temperature/outside"},
Mephi 0:cec410958705 54 {114, "temperature/garage"},
Mephi 0:cec410958705 55 {120, "door/garage"},
Mephi 0:cec410958705 56 {121, "door/garageBig"},
Mephi 0:cec410958705 57 {122, "door/study"},
Mephi 0:cec410958705 58 {150, "power/house"},
Mephi 0:cec410958705 59 };
Mephi 0:cec410958705 60
Mephi 0:cec410958705 61 struct MQTTmessage makeMessage(char topic[50], char value[50], bool isNumeric) {
Mephi 0:cec410958705 62 struct MQTTmessage returnMessage;
Mephi 0:cec410958705 63 strncpy(returnMessage.topic, topic, 50);
Mephi 0:cec410958705 64 strncpy(returnMessage.value, value, 50);
Mephi 0:cec410958705 65 returnMessage.isNumeric = isNumeric;
Mephi 0:cec410958705 66 return returnMessage;
Mephi 0:cec410958705 67 }
Mephi 0:cec410958705 68
Mephi 0:cec410958705 69 int checkDuplicates(char inRx[TRANSFER_SIZE], char inOldRx[TRANSFER_SIZE]) {
Mephi 0:cec410958705 70 int i;
Mephi 0:cec410958705 71 for (i = 0; i < TRANSFER_SIZE; i++) {
Mephi 0:cec410958705 72 if (inRx[i] != inOldRx[i]) {return 1;} // And if it's not the same as the last one recieved
Mephi 0:cec410958705 73 }
Mephi 0:cec410958705 74 return 0;
Mephi 0:cec410958705 75 }
Mephi 0:cec410958705 76
Mephi 0:cec410958705 77 void sendMessage(MQTT::Client<MQTTEthernet, Countdown> client, struct MQTTmessage inMessage) {
Mephi 0:cec410958705 78 MQTT::Message message;
Mephi 0:cec410958705 79 message.qos = MQTT::QOS0;
Mephi 0:cec410958705 80 message.retained = false;
Mephi 0:cec410958705 81 message.dup = false;
Mephi 0:cec410958705 82 message.payload = (void*)inMessage.value;
Mephi 0:cec410958705 83 if (inMessage.isNumeric) {
Mephi 0:cec410958705 84 message.payloadlen = strlen(inMessage.value);
Mephi 0:cec410958705 85 } else {
Mephi 0:cec410958705 86 message.payloadlen = strlen(inMessage.value) + 1;
Mephi 0:cec410958705 87 }client.publish(inMessage.topic, message);
Mephi 0:cec410958705 88 while (arrivedcount < 1)
Mephi 0:cec410958705 89 client.yield(100);
Mephi 0:cec410958705 90 seconds = time(NULL);
Mephi 0:cec410958705 91 pc.printf("%d : Sent message to %s : %s\r\n", seconds, inMessage.topic, inMessage.value);
Mephi 0:cec410958705 92 }
Mephi 0:cec410958705 93
Mephi 0:cec410958705 94 char* decodeTemperatures(uint8_t inTemp) {
Mephi 0:cec410958705 95 char tempStr[50];
Mephi 0:cec410958705 96 sprintf(tempStr, "%4.2f", ((float)inTemp - 20) / 2);
Mephi 0:cec410958705 97 return tempStr;
Mephi 0:cec410958705 98 }
Mephi 0:cec410958705 99
Mephi 0:cec410958705 100 struct MQTTmessage useRadioData(){
Mephi 0:cec410958705 101 struct MQTTmessage returnMessage;
Mephi 0:cec410958705 102 bool foundMap = false;
Mephi 0:cec410958705 103 for (int i=0; i<Nodes; i++) {
Mephi 0:cec410958705 104 if (maps[i].address == rxData[0]) {
Mephi 0:cec410958705 105 lastHeardArray[i].seconds = seconds;
Mephi 0:cec410958705 106 if (strncmp(maps[i].topic, "temp", 4) == 0) {
Mephi 0:cec410958705 107 strncpy(returnMessage.topic, maps[i].topic, 50);
Mephi 0:cec410958705 108 strncpy(returnMessage.value, decodeTemperatures(rxData[7]), 50);
Mephi 0:cec410958705 109 returnMessage.isNumeric = true;
Mephi 0:cec410958705 110 } else if (strncmp(maps[i].topic, "door", 4) == 0) {
Mephi 0:cec410958705 111 strncpy(returnMessage.topic, maps[i].topic, 50);
Mephi 0:cec410958705 112 if (rxData[7] == 255) {
Mephi 0:cec410958705 113 strncpy(returnMessage.value, "ON", 8);
Mephi 0:cec410958705 114 } else if (rxData[7] == 0) {
Mephi 0:cec410958705 115 strncpy(returnMessage.value, "OFF", 8);
Mephi 0:cec410958705 116 } else {
Mephi 0:cec410958705 117 strncpy(returnMessage.value, "Unknown", 8);
Mephi 0:cec410958705 118 }
Mephi 0:cec410958705 119 returnMessage.isNumeric = false;
Mephi 0:cec410958705 120 } else {
Mephi 0:cec410958705 121 strncpy(returnMessage.topic, maps[i].topic, 50);
Mephi 0:cec410958705 122 strncpy(returnMessage.value, decodeTemperatures(rxData[7]), 50);
Mephi 0:cec410958705 123 returnMessage.isNumeric = false;
Mephi 0:cec410958705 124 }
Mephi 0:cec410958705 125 foundMap = true;
Mephi 0:cec410958705 126 break;
Mephi 0:cec410958705 127 }
Mephi 0:cec410958705 128 }
Mephi 0:cec410958705 129 if (!foundMap){
Mephi 0:cec410958705 130 strncpy(returnMessage.topic, "mBed/UnknownMessage", 50);
Mephi 0:cec410958705 131 strncpy(returnMessage.value, rxData, 8);
Mephi 0:cec410958705 132 }
Mephi 0:cec410958705 133 return returnMessage;
Mephi 0:cec410958705 134 }
Mephi 0:cec410958705 135
Mephi 0:cec410958705 136 void messageArrived(MQTT::MessageData& md) {
Mephi 0:cec410958705 137 MQTT::Message &message = md.message;
Mephi 0:cec410958705 138 pc.printf("Message arrived: qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
Mephi 0:cec410958705 139 pc.printf("Payload %.*s\r\n", message.payloadlen, (char*)message.payload);
Mephi 0:cec410958705 140 ++arrivedcount;
Mephi 0:cec410958705 141 pc.puts((char*)message.payload);
Mephi 0:cec410958705 142 }
Mephi 0:cec410958705 143
Mephi 0:cec410958705 144 int main() {
Mephi 0:cec410958705 145 set_time(0);
Mephi 0:cec410958705 146 seconds = time(NULL);
Mephi 0:cec410958705 147 pc.printf("%d : mBed started\r\n", seconds);
Mephi 0:cec410958705 148
Mephi 0:cec410958705 149 for (int i=0; i<Nodes; i++) {
Mephi 0:cec410958705 150 lastHeardArray[i].address = maps[i].address;
Mephi 0:cec410958705 151 }
Mephi 0:cec410958705 152
Mephi 0:cec410958705 153 //MQTT init
Mephi 0:cec410958705 154 MQTTEthernet ipstack = MQTTEthernet();
Mephi 0:cec410958705 155 MQTT::Client<MQTTEthernet, Countdown> client = MQTT::Client<MQTTEthernet, Countdown>(ipstack);
Mephi 0:cec410958705 156
Mephi 0:cec410958705 157 char* hostname = "172.16.0.1";
Mephi 0:cec410958705 158 int port = 1883;
Mephi 0:cec410958705 159 char* topic = "mBed";
Mephi 0:cec410958705 160 seconds = time(NULL);
Mephi 0:cec410958705 161 pc.printf("%d : Connecting to %s:%d\r\n", seconds, hostname, port);
Mephi 0:cec410958705 162 int rc = ipstack.connect(hostname, port);
Mephi 0:cec410958705 163 if (rc != 0) {
Mephi 0:cec410958705 164 seconds = time(NULL);
Mephi 0:cec410958705 165 pc.printf("%d : rc from TCP connect is %d\r\n", seconds, rc);
Mephi 0:cec410958705 166 }
Mephi 0:cec410958705 167 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
Mephi 0:cec410958705 168 data.MQTTVersion = 3;
Mephi 0:cec410958705 169 data.clientID.cstring = "mbed-sample";
Mephi 0:cec410958705 170 data.username.cstring = "testuser";
Mephi 0:cec410958705 171 data.password.cstring = "testpassword";
Mephi 0:cec410958705 172 if ((rc = client.connect(data)) != 0){
Mephi 0:cec410958705 173 seconds = time(NULL);
Mephi 0:cec410958705 174 pc.printf("%d : rc from MQTT connect is %d\r\n", seconds, rc);
Mephi 0:cec410958705 175 }
Mephi 0:cec410958705 176 if ((rc = client.subscribe(topic, MQTT::QOS1, messageArrived)) != 0) {
Mephi 0:cec410958705 177 seconds = time(NULL);
Mephi 0:cec410958705 178 pc.printf("%d : rc from MQTT subscribe is %d\r\n", seconds, rc);
Mephi 0:cec410958705 179 }
Mephi 0:cec410958705 180 sendMessage(client, makeMessage("mBed", "mBed powered up", false));
Mephi 0:cec410958705 181 //End MQTT init
Mephi 0:cec410958705 182
Mephi 0:cec410958705 183
Mephi 0:cec410958705 184 //Start NRF24 Init
Mephi 0:cec410958705 185 //int txDataCnt = 0;
Mephi 0:cec410958705 186 //int rxDataCnt = 0;
Mephi 0:cec410958705 187
Mephi 0:cec410958705 188 my_nrf24l01p.powerUp();
Mephi 0:cec410958705 189 my_nrf24l01p.setRfFrequency(2525);
Mephi 0:cec410958705 190 my_nrf24l01p.setTransferSize( TRANSFER_SIZE );
Mephi 0:cec410958705 191 my_nrf24l01p.setCrcWidth(16);
Mephi 0:cec410958705 192 my_nrf24l01p.setAirDataRate(250);
Mephi 0:cec410958705 193 my_nrf24l01p.disableAutoAcknowledge();
Mephi 0:cec410958705 194 my_nrf24l01p.setRxAddress(0xe7e7e7e7e8);
Mephi 0:cec410958705 195 my_nrf24l01p.setTxAddress(0xe7e7e7e7e7);
Mephi 0:cec410958705 196 my_nrf24l01p.disableAutoRetransmit();
Mephi 0:cec410958705 197
Mephi 0:cec410958705 198 pc.printf( "nRF24L01+ Frequency : %d MHz\r\n", my_nrf24l01p.getRfFrequency() );
Mephi 0:cec410958705 199 pc.printf( "nRF24L01+ Output power : %d dBm\r\n", my_nrf24l01p.getRfOutputPower() );
Mephi 0:cec410958705 200 pc.printf( "nRF24L01+ Data Rate : %d kbps\r\n", my_nrf24l01p.getAirDataRate() );
Mephi 0:cec410958705 201 pc.printf( "nRF24L01+ TX Address : 0x%010llX\r\n", my_nrf24l01p.getTxAddress() );
Mephi 0:cec410958705 202 pc.printf( "nRF24L01+ RX Address : 0x%010llX\r\n", my_nrf24l01p.getRxAddress() );
Mephi 0:cec410958705 203 pc.printf( "nRF24L01+ CRC Width : %d bits\r\n", my_nrf24l01p.getCrcWidth() );
Mephi 0:cec410958705 204 pc.printf( "nRF24L01+ TransferSize : %d bytes\r\n", my_nrf24l01p.getTransferSize() );
Mephi 0:cec410958705 205
Mephi 0:cec410958705 206 my_nrf24l01p.setReceiveMode();
Mephi 0:cec410958705 207 my_nrf24l01p.enable();
Mephi 0:cec410958705 208 //End NRF24 init
Mephi 0:cec410958705 209
Mephi 0:cec410958705 210 while (1) {
Mephi 0:cec410958705 211 if ( my_nrf24l01p.readable() ) {
Mephi 0:cec410958705 212 //rxDataCnt =
Mephi 0:cec410958705 213 my_nrf24l01p.read( NRF24L01P_PIPE_P0, rxData, TRANSFER_SIZE );
Mephi 0:cec410958705 214 myled1 = 1;
Mephi 0:cec410958705 215 int notDuplicate = checkDuplicates(rxData, oldRxData);
Mephi 0:cec410958705 216 if ( rxData[1]==NodeID ) { // Addressed to this node, and not the same as the last one
Mephi 0:cec410958705 217 seconds = time(NULL);
Mephi 0:cec410958705 218 pc.printf("%d : New NRF Data: ", seconds);
Mephi 0:cec410958705 219
Mephi 0:cec410958705 220 //int i;
Mephi 0:cec410958705 221 for (int i = 0; i < TRANSFER_SIZE; i++) {
Mephi 0:cec410958705 222 oldRxData[i] = rxData[i];
Mephi 0:cec410958705 223 if (i > 0) printf(":");
Mephi 0:cec410958705 224 printf("%d", rxData[i]);
Mephi 0:cec410958705 225 }
Mephi 0:cec410958705 226 printf(" - ");
Mephi 0:cec410958705 227 txData[0] = rxData[1]; // Send response back to source
Mephi 0:cec410958705 228 txData[1] = NodeID; // From this host
Mephi 0:cec410958705 229 txData[2] = 2; // as an Ack...
Mephi 0:cec410958705 230 txData[3] = rxData[3]; // ...to the packet ID that was recieved
Mephi 0:cec410958705 231 txData[4] = 0; // with an empty payload
Mephi 0:cec410958705 232 txData[5] = 0;
Mephi 0:cec410958705 233 txData[6] = 0;
Mephi 0:cec410958705 234 txData[7] = 0;
Mephi 0:cec410958705 235
Mephi 0:cec410958705 236 my_nrf24l01p.setTransmitMode();
Mephi 0:cec410958705 237 for (int i = 0; i < 8; i++) {
Mephi 0:cec410958705 238 //txDataCnt =
Mephi 0:cec410958705 239 my_nrf24l01p.write( NRF24L01P_PIPE_P1, txData, TRANSFER_SIZE );
Mephi 0:cec410958705 240 wait_us(500);
Mephi 0:cec410958705 241 }
Mephi 0:cec410958705 242 my_nrf24l01p.setReceiveMode();
Mephi 0:cec410958705 243
Mephi 0:cec410958705 244 for (int i = 0; i < TRANSFER_SIZE; i++) {
Mephi 0:cec410958705 245 if (i > 0) pc.printf(":");
Mephi 0:cec410958705 246 pc.printf("%d", txData[i]);
Mephi 0:cec410958705 247 }
Mephi 0:cec410958705 248 printf("\r\n");
Mephi 0:cec410958705 249
Mephi 0:cec410958705 250 seconds = time(NULL);
Mephi 0:cec410958705 251 pc.printf("%d : Sending NRF data \r\n", seconds);
Mephi 0:cec410958705 252 if (notDuplicate) {
Mephi 0:cec410958705 253 if(!client.isConnected()) {
Mephi 0:cec410958705 254 myled4 = 1;
Mephi 0:cec410958705 255 pc.printf("%d : Connecting to %s:%d\r\n", seconds, hostname, port);
Mephi 0:cec410958705 256 rc = ipstack.connect(hostname, port);
Mephi 0:cec410958705 257 if (rc != 0) {
Mephi 0:cec410958705 258 seconds = time(NULL);
Mephi 0:cec410958705 259 pc.printf("%d : rc from TCP connect is %d\r\n", seconds, rc);
Mephi 0:cec410958705 260 }
Mephi 0:cec410958705 261 /*
Mephi 0:cec410958705 262 data = MQTTPacket_connectData_initializer;
Mephi 0:cec410958705 263 data.MQTTVersion = 3;
Mephi 0:cec410958705 264 data.clientID.cstring = "mbed-sample";
Mephi 0:cec410958705 265 data.username.cstring = "testuser";
Mephi 0:cec410958705 266 data.password.cstring = "testpassword";
Mephi 0:cec410958705 267 */
Mephi 0:cec410958705 268 if ((rc = client.connect(data)) != 0){
Mephi 0:cec410958705 269 seconds = time(NULL);
Mephi 0:cec410958705 270 pc.printf("%d : rc from MQTT connect is %d\r\n", seconds, rc);
Mephi 0:cec410958705 271 }
Mephi 0:cec410958705 272 if ((rc = client.subscribe(topic, MQTT::QOS1, messageArrived)) != 0) {
Mephi 0:cec410958705 273 seconds = time(NULL);
Mephi 0:cec410958705 274 pc.printf("%d : rc from MQTT subscribe is %d\r\n", seconds, rc);
Mephi 0:cec410958705 275 }
Mephi 0:cec410958705 276 sendMessage(client, makeMessage("mBed", "session died, restarting", false));
Mephi 0:cec410958705 277 }
Mephi 0:cec410958705 278 sendMessage(client, useRadioData());
Mephi 0:cec410958705 279 }
Mephi 0:cec410958705 280 }
Mephi 0:cec410958705 281 myled1 = 0;
Mephi 0:cec410958705 282 }
Mephi 0:cec410958705 283 // insert mqtt poller here...
Mephi 0:cec410958705 284 }
Mephi 0:cec410958705 285 }