RTOs con capa MQTT

Dependencies:   ADS1015 EthernetInterface MQTT mbed-rtos mbed

Committer:
ramirobmar
Date:
Mon Aug 24 13:43:25 2015 +0000
Revision:
0:5c786957b6f8
Revision_01_01

Who changed what in which revision?

UserRevisionLine numberNew contents of line
ramirobmar 0:5c786957b6f8 1 #include "mbed.h"
ramirobmar 0:5c786957b6f8 2 #include "rtos.h"
ramirobmar 0:5c786957b6f8 3 #include "Adafruit_ADS1015.h"
ramirobmar 0:5c786957b6f8 4 #include "EthernetInterface.h"
ramirobmar 0:5c786957b6f8 5 #include "MQTTPacket.h"
ramirobmar 0:5c786957b6f8 6
ramirobmar 0:5c786957b6f8 7 #include <errno.h>
ramirobmar 0:5c786957b6f8 8 #include <stdlib.h>
ramirobmar 0:5c786957b6f8 9 #include <string.h>
ramirobmar 0:5c786957b6f8 10
ramirobmar 0:5c786957b6f8 11
ramirobmar 0:5c786957b6f8 12 #define LEN_BUFFER 100
ramirobmar 0:5c786957b6f8 13 #define RES_ADC 1
ramirobmar 0:5c786957b6f8 14 #define ID_SN_DEVICE 0x01
ramirobmar 0:5c786957b6f8 15 #define READ_SIGNAL 1
ramirobmar 0:5c786957b6f8 16
ramirobmar 0:5c786957b6f8 17 uint32_t measurement;
ramirobmar 0:5c786957b6f8 18 uint32_t mean_value;
ramirobmar 0:5c786957b6f8 19 uint16_t gain;
ramirobmar 0:5c786957b6f8 20 uint32_t offset;
ramirobmar 0:5c786957b6f8 21 uint32_t * buffer;
ramirobmar 0:5c786957b6f8 22 uint8_t counter;
ramirobmar 0:5c786957b6f8 23 Thread * thread1;
ramirobmar 0:5c786957b6f8 24 Mutex * lock_buffer;
ramirobmar 0:5c786957b6f8 25 Semaphore * sem_adc;
ramirobmar 0:5c786957b6f8 26 Thread * thread2;
ramirobmar 0:5c786957b6f8 27 Adafruit_ADS1015 * m_adc;
ramirobmar 0:5c786957b6f8 28
ramirobmar 0:5c786957b6f8 29 EthernetInterface eth;
ramirobmar 0:5c786957b6f8 30 TCPSocketConnection mysock;
ramirobmar 0:5c786957b6f8 31 char msg_err[81];
ramirobmar 0:5c786957b6f8 32 char msg_control[81];
ramirobmar 0:5c786957b6f8 33 char msg_status[81];
ramirobmar 0:5c786957b6f8 34 int toStop = 0;
ramirobmar 0:5c786957b6f8 35
ramirobmar 0:5c786957b6f8 36 int publish();
ramirobmar 0:5c786957b6f8 37
ramirobmar 0:5c786957b6f8 38 void set_values(void const *argument)
ramirobmar 0:5c786957b6f8 39 {
ramirobmar 0:5c786957b6f8 40 uint8_t i;
ramirobmar 0:5c786957b6f8 41 uint64_t aux_value;
ramirobmar 0:5c786957b6f8 42
ramirobmar 0:5c786957b6f8 43 while (true) {
ramirobmar 0:5c786957b6f8 44
ramirobmar 0:5c786957b6f8 45 lock_buffer->lock(osWaitForever);
ramirobmar 0:5c786957b6f8 46
ramirobmar 0:5c786957b6f8 47 for(i=0;i<counter;i++)
ramirobmar 0:5c786957b6f8 48 {
ramirobmar 0:5c786957b6f8 49 aux_value = aux_value + buffer[i];
ramirobmar 0:5c786957b6f8 50 }
ramirobmar 0:5c786957b6f8 51 mean_value = (uint32_t) aux_value / counter;
ramirobmar 0:5c786957b6f8 52
ramirobmar 0:5c786957b6f8 53 lock_buffer->unlock();
ramirobmar 0:5c786957b6f8 54 publish();
ramirobmar 0:5c786957b6f8 55
ramirobmar 0:5c786957b6f8 56 //Thread::signal_wait(READ_SIGNAL);
ramirobmar 0:5c786957b6f8 57 Thread::wait(10);
ramirobmar 0:5c786957b6f8 58 }
ramirobmar 0:5c786957b6f8 59
ramirobmar 0:5c786957b6f8 60 }
ramirobmar 0:5c786957b6f8 61
ramirobmar 0:5c786957b6f8 62
ramirobmar 0:5c786957b6f8 63
ramirobmar 0:5c786957b6f8 64 void system_measurements_thread(void const *argument)
ramirobmar 0:5c786957b6f8 65 {
ramirobmar 0:5c786957b6f8 66 uint16_t measurement;
ramirobmar 0:5c786957b6f8 67 uint32_t data_in;
ramirobmar 0:5c786957b6f8 68
ramirobmar 0:5c786957b6f8 69 while (true) {
ramirobmar 0:5c786957b6f8 70
ramirobmar 0:5c786957b6f8 71
ramirobmar 0:5c786957b6f8 72 sem_adc->wait(osWaitForever);
ramirobmar 0:5c786957b6f8 73 measurement=m_adc->readADC_Differential_0_1();
ramirobmar 0:5c786957b6f8 74 sem_adc->release();
ramirobmar 0:5c786957b6f8 75
ramirobmar 0:5c786957b6f8 76 data_in = measurement * gain - offset;
ramirobmar 0:5c786957b6f8 77
ramirobmar 0:5c786957b6f8 78 lock_buffer->lock(osWaitForever);
ramirobmar 0:5c786957b6f8 79 buffer[counter++]=data_in;
ramirobmar 0:5c786957b6f8 80 if(counter == LEN_BUFFER) counter = 0;
ramirobmar 0:5c786957b6f8 81 lock_buffer->unlock();
ramirobmar 0:5c786957b6f8 82
ramirobmar 0:5c786957b6f8 83 Thread::wait(10);
ramirobmar 0:5c786957b6f8 84
ramirobmar 0:5c786957b6f8 85 }
ramirobmar 0:5c786957b6f8 86 }
ramirobmar 0:5c786957b6f8 87
ramirobmar 0:5c786957b6f8 88 void init_system_measurements()
ramirobmar 0:5c786957b6f8 89 {
ramirobmar 0:5c786957b6f8 90 m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor
ramirobmar 0:5c786957b6f8 91 m_adc->setGain(GAIN_ONE);
ramirobmar 0:5c786957b6f8 92 buffer = new uint32_t[LEN_BUFFER];
ramirobmar 0:5c786957b6f8 93 counter=0;
ramirobmar 0:5c786957b6f8 94 lock_buffer = new Mutex();
ramirobmar 0:5c786957b6f8 95 sem_adc = new Semaphore(RES_ADC);
ramirobmar 0:5c786957b6f8 96 }
ramirobmar 0:5c786957b6f8 97
ramirobmar 0:5c786957b6f8 98 int getdata(unsigned char* buf, int count)
ramirobmar 0:5c786957b6f8 99 {
ramirobmar 0:5c786957b6f8 100 return mysock.receive((char *)buf, (size_t)count);
ramirobmar 0:5c786957b6f8 101 }
ramirobmar 0:5c786957b6f8 102
ramirobmar 0:5c786957b6f8 103 void init_system_network()
ramirobmar 0:5c786957b6f8 104 {
ramirobmar 0:5c786957b6f8 105 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
ramirobmar 0:5c786957b6f8 106 MQTTString topicString = MQTTString_initializer;
ramirobmar 0:5c786957b6f8 107
ramirobmar 0:5c786957b6f8 108 unsigned char buf[200];
ramirobmar 0:5c786957b6f8 109 int buflen = sizeof(buf);
ramirobmar 0:5c786957b6f8 110
ramirobmar 0:5c786957b6f8 111 unsigned char payload[25]= "mypayload";
ramirobmar 0:5c786957b6f8 112 int payloadlen = strlen((char *)payload);
ramirobmar 0:5c786957b6f8 113 bool status_err = false;
ramirobmar 0:5c786957b6f8 114
ramirobmar 0:5c786957b6f8 115 int len = 0;
ramirobmar 0:5c786957b6f8 116 int connack_rc;
ramirobmar 0:5c786957b6f8 117 unsigned char sessionPresent;
ramirobmar 0:5c786957b6f8 118
ramirobmar 0:5c786957b6f8 119 unsigned char dup;
ramirobmar 0:5c786957b6f8 120 int qos;
ramirobmar 0:5c786957b6f8 121 unsigned char retained;
ramirobmar 0:5c786957b6f8 122 unsigned short msgid;
ramirobmar 0:5c786957b6f8 123 int payloadlen_in;
ramirobmar 0:5c786957b6f8 124 char* payload_in;
ramirobmar 0:5c786957b6f8 125 int rc;
ramirobmar 0:5c786957b6f8 126
ramirobmar 0:5c786957b6f8 127
ramirobmar 0:5c786957b6f8 128 eth.init(); //Use DHCP
ramirobmar 0:5c786957b6f8 129 eth.connect();
ramirobmar 0:5c786957b6f8 130
ramirobmar 0:5c786957b6f8 131
ramirobmar 0:5c786957b6f8 132 rc=mysock.connect("iot.eclipse.org", 1883);
ramirobmar 0:5c786957b6f8 133 if(rc==-1)
ramirobmar 0:5c786957b6f8 134 {
ramirobmar 0:5c786957b6f8 135 eth.disconnect();
ramirobmar 0:5c786957b6f8 136 return;
ramirobmar 0:5c786957b6f8 137 }
ramirobmar 0:5c786957b6f8 138
ramirobmar 0:5c786957b6f8 139 data.clientID.cstring = "SendReceive mbed MQTT ";
ramirobmar 0:5c786957b6f8 140 data.keepAliveInterval = 20;
ramirobmar 0:5c786957b6f8 141 data.cleansession = 1;
ramirobmar 0:5c786957b6f8 142
ramirobmar 0:5c786957b6f8 143 mysock.set_blocking(true, 1000); /* 1 second Timeout */
ramirobmar 0:5c786957b6f8 144
ramirobmar 0:5c786957b6f8 145 len = MQTTSerialize_connect(buf, buflen, &data);
ramirobmar 0:5c786957b6f8 146 rc = mysock.send((char *)buf, len);
ramirobmar 0:5c786957b6f8 147 if(rc==-1)
ramirobmar 0:5c786957b6f8 148 {
ramirobmar 0:5c786957b6f8 149 eth.disconnect();
ramirobmar 0:5c786957b6f8 150 return;
ramirobmar 0:5c786957b6f8 151
ramirobmar 0:5c786957b6f8 152 }
ramirobmar 0:5c786957b6f8 153
ramirobmar 0:5c786957b6f8 154 /* wait for connack */
ramirobmar 0:5c786957b6f8 155 if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
ramirobmar 0:5c786957b6f8 156 {
ramirobmar 0:5c786957b6f8 157
ramirobmar 0:5c786957b6f8 158 if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0)
ramirobmar 0:5c786957b6f8 159 {
ramirobmar 0:5c786957b6f8 160 sprintf(msg_err,"Unable to connect, return code %d",connack_rc);
ramirobmar 0:5c786957b6f8 161 status_err = true;
ramirobmar 0:5c786957b6f8 162 }
ramirobmar 0:5c786957b6f8 163 else
ramirobmar 0:5c786957b6f8 164 status_err = false;
ramirobmar 0:5c786957b6f8 165 }
ramirobmar 0:5c786957b6f8 166 else
ramirobmar 0:5c786957b6f8 167 status_err = true;
ramirobmar 0:5c786957b6f8 168
ramirobmar 0:5c786957b6f8 169 if (!status_err)
ramirobmar 0:5c786957b6f8 170 {
ramirobmar 0:5c786957b6f8 171 topicString.cstring = "pubtopic";
ramirobmar 0:5c786957b6f8 172
ramirobmar 0:5c786957b6f8 173 while (!toStop)
ramirobmar 0:5c786957b6f8 174 {
ramirobmar 0:5c786957b6f8 175 if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
ramirobmar 0:5c786957b6f8 176 {
ramirobmar 0:5c786957b6f8 177
ramirobmar 0:5c786957b6f8 178 MQTTString receivedTopic;
ramirobmar 0:5c786957b6f8 179 rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen);
ramirobmar 0:5c786957b6f8 180 sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in);
ramirobmar 0:5c786957b6f8 181 }
ramirobmar 0:5c786957b6f8 182
ramirobmar 0:5c786957b6f8 183 sprintf(msg_status,"publishing reading\n");
ramirobmar 0:5c786957b6f8 184 len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
ramirobmar 0:5c786957b6f8 185 rc = mysock.send((char *)buf, len);
ramirobmar 0:5c786957b6f8 186
ramirobmar 0:5c786957b6f8 187 }
ramirobmar 0:5c786957b6f8 188
ramirobmar 0:5c786957b6f8 189
ramirobmar 0:5c786957b6f8 190 }
ramirobmar 0:5c786957b6f8 191 else{
ramirobmar 0:5c786957b6f8 192 eth.disconnect();
ramirobmar 0:5c786957b6f8 193 }
ramirobmar 0:5c786957b6f8 194
ramirobmar 0:5c786957b6f8 195 return;
ramirobmar 0:5c786957b6f8 196
ramirobmar 0:5c786957b6f8 197
ramirobmar 0:5c786957b6f8 198
ramirobmar 0:5c786957b6f8 199
ramirobmar 0:5c786957b6f8 200 }
ramirobmar 0:5c786957b6f8 201
ramirobmar 0:5c786957b6f8 202 int publish()
ramirobmar 0:5c786957b6f8 203 {
ramirobmar 0:5c786957b6f8 204 MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
ramirobmar 0:5c786957b6f8 205 int rc = 0;
ramirobmar 0:5c786957b6f8 206 unsigned char buf[200];
ramirobmar 0:5c786957b6f8 207 int buflen = sizeof(buf);
ramirobmar 0:5c786957b6f8 208 TCPSocketConnection mysock;
ramirobmar 0:5c786957b6f8 209 MQTTString topicString = MQTTString_initializer;
ramirobmar 0:5c786957b6f8 210 unsigned char payload[25] = "I'm alive!";
ramirobmar 0:5c786957b6f8 211 int payloadlen = strlen((char *)payload);
ramirobmar 0:5c786957b6f8 212 int len = 0;
ramirobmar 0:5c786957b6f8 213
ramirobmar 0:5c786957b6f8 214 mysock.connect("m2m.eclipse.org", 1883);
ramirobmar 0:5c786957b6f8 215
ramirobmar 0:5c786957b6f8 216 sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE);
ramirobmar 0:5c786957b6f8 217 data.keepAliveInterval = 20;
ramirobmar 0:5c786957b6f8 218 data.cleansession = 1;
ramirobmar 0:5c786957b6f8 219 data.MQTTVersion = 3;
ramirobmar 0:5c786957b6f8 220
ramirobmar 0:5c786957b6f8 221 len = MQTTSerialize_connect(buf, buflen, &data);
ramirobmar 0:5c786957b6f8 222
ramirobmar 0:5c786957b6f8 223 sprintf(topicString.cstring,"MESUREMENT:%d",mean_value);
ramirobmar 0:5c786957b6f8 224
ramirobmar 0:5c786957b6f8 225 len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen);
ramirobmar 0:5c786957b6f8 226
ramirobmar 0:5c786957b6f8 227 len += MQTTSerialize_disconnect(buf + len, buflen - len);
ramirobmar 0:5c786957b6f8 228
ramirobmar 0:5c786957b6f8 229 rc = 0;
ramirobmar 0:5c786957b6f8 230 while (rc < len)
ramirobmar 0:5c786957b6f8 231 {
ramirobmar 0:5c786957b6f8 232 int rc1 = mysock.send((char *)buf, len);
ramirobmar 0:5c786957b6f8 233 if (rc1 == -1)
ramirobmar 0:5c786957b6f8 234 {
ramirobmar 0:5c786957b6f8 235 sprintf(msg_status,"Send failed");
ramirobmar 0:5c786957b6f8 236
ramirobmar 0:5c786957b6f8 237 break;
ramirobmar 0:5c786957b6f8 238 }
ramirobmar 0:5c786957b6f8 239 else
ramirobmar 0:5c786957b6f8 240 rc += rc1;
ramirobmar 0:5c786957b6f8 241 }
ramirobmar 0:5c786957b6f8 242 if (rc == len)
ramirobmar 0:5c786957b6f8 243 sprintf(msg_status,"Send succeeded");
ramirobmar 0:5c786957b6f8 244
ramirobmar 0:5c786957b6f8 245 wait(0.2);
ramirobmar 0:5c786957b6f8 246
ramirobmar 0:5c786957b6f8 247 return 0;
ramirobmar 0:5c786957b6f8 248 }
ramirobmar 0:5c786957b6f8 249
ramirobmar 0:5c786957b6f8 250
ramirobmar 0:5c786957b6f8 251
ramirobmar 0:5c786957b6f8 252
ramirobmar 0:5c786957b6f8 253 int main()
ramirobmar 0:5c786957b6f8 254 {
ramirobmar 0:5c786957b6f8 255
ramirobmar 0:5c786957b6f8 256
ramirobmar 0:5c786957b6f8 257 init_system_measurements(); // Initialization system's measurement
ramirobmar 0:5c786957b6f8 258 init_system_network();
ramirobmar 0:5c786957b6f8 259
ramirobmar 0:5c786957b6f8 260 thread1 = new Thread(system_measurements_thread);
ramirobmar 0:5c786957b6f8 261 thread2 = new Thread(set_values);
ramirobmar 0:5c786957b6f8 262
ramirobmar 0:5c786957b6f8 263
ramirobmar 0:5c786957b6f8 264 while (true) {
ramirobmar 0:5c786957b6f8 265
ramirobmar 0:5c786957b6f8 266 }
ramirobmar 0:5c786957b6f8 267
ramirobmar 0:5c786957b6f8 268 }