RTOs con capa MQTT
Dependencies: ADS1015 EthernetInterface MQTT mbed-rtos mbed
main.cpp@0:5c786957b6f8, 2015-08-24 (annotated)
- Committer:
- ramirobmar
- Date:
- Mon Aug 24 13:43:25 2015 +0000
- Revision:
- 0:5c786957b6f8
Revision_01_01
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
ramirobmar | 0:5c786957b6f8 | 1 | #include "mbed.h" |
ramirobmar | 0:5c786957b6f8 | 2 | #include "rtos.h" |
ramirobmar | 0:5c786957b6f8 | 3 | #include "Adafruit_ADS1015.h" |
ramirobmar | 0:5c786957b6f8 | 4 | #include "EthernetInterface.h" |
ramirobmar | 0:5c786957b6f8 | 5 | #include "MQTTPacket.h" |
ramirobmar | 0:5c786957b6f8 | 6 | |
ramirobmar | 0:5c786957b6f8 | 7 | #include <errno.h> |
ramirobmar | 0:5c786957b6f8 | 8 | #include <stdlib.h> |
ramirobmar | 0:5c786957b6f8 | 9 | #include <string.h> |
ramirobmar | 0:5c786957b6f8 | 10 | |
ramirobmar | 0:5c786957b6f8 | 11 | |
ramirobmar | 0:5c786957b6f8 | 12 | #define LEN_BUFFER 100 |
ramirobmar | 0:5c786957b6f8 | 13 | #define RES_ADC 1 |
ramirobmar | 0:5c786957b6f8 | 14 | #define ID_SN_DEVICE 0x01 |
ramirobmar | 0:5c786957b6f8 | 15 | #define READ_SIGNAL 1 |
ramirobmar | 0:5c786957b6f8 | 16 | |
ramirobmar | 0:5c786957b6f8 | 17 | uint32_t measurement; |
ramirobmar | 0:5c786957b6f8 | 18 | uint32_t mean_value; |
ramirobmar | 0:5c786957b6f8 | 19 | uint16_t gain; |
ramirobmar | 0:5c786957b6f8 | 20 | uint32_t offset; |
ramirobmar | 0:5c786957b6f8 | 21 | uint32_t * buffer; |
ramirobmar | 0:5c786957b6f8 | 22 | uint8_t counter; |
ramirobmar | 0:5c786957b6f8 | 23 | Thread * thread1; |
ramirobmar | 0:5c786957b6f8 | 24 | Mutex * lock_buffer; |
ramirobmar | 0:5c786957b6f8 | 25 | Semaphore * sem_adc; |
ramirobmar | 0:5c786957b6f8 | 26 | Thread * thread2; |
ramirobmar | 0:5c786957b6f8 | 27 | Adafruit_ADS1015 * m_adc; |
ramirobmar | 0:5c786957b6f8 | 28 | |
ramirobmar | 0:5c786957b6f8 | 29 | EthernetInterface eth; |
ramirobmar | 0:5c786957b6f8 | 30 | TCPSocketConnection mysock; |
ramirobmar | 0:5c786957b6f8 | 31 | char msg_err[81]; |
ramirobmar | 0:5c786957b6f8 | 32 | char msg_control[81]; |
ramirobmar | 0:5c786957b6f8 | 33 | char msg_status[81]; |
ramirobmar | 0:5c786957b6f8 | 34 | int toStop = 0; |
ramirobmar | 0:5c786957b6f8 | 35 | |
ramirobmar | 0:5c786957b6f8 | 36 | int publish(); |
ramirobmar | 0:5c786957b6f8 | 37 | |
ramirobmar | 0:5c786957b6f8 | 38 | void set_values(void const *argument) |
ramirobmar | 0:5c786957b6f8 | 39 | { |
ramirobmar | 0:5c786957b6f8 | 40 | uint8_t i; |
ramirobmar | 0:5c786957b6f8 | 41 | uint64_t aux_value; |
ramirobmar | 0:5c786957b6f8 | 42 | |
ramirobmar | 0:5c786957b6f8 | 43 | while (true) { |
ramirobmar | 0:5c786957b6f8 | 44 | |
ramirobmar | 0:5c786957b6f8 | 45 | lock_buffer->lock(osWaitForever); |
ramirobmar | 0:5c786957b6f8 | 46 | |
ramirobmar | 0:5c786957b6f8 | 47 | for(i=0;i<counter;i++) |
ramirobmar | 0:5c786957b6f8 | 48 | { |
ramirobmar | 0:5c786957b6f8 | 49 | aux_value = aux_value + buffer[i]; |
ramirobmar | 0:5c786957b6f8 | 50 | } |
ramirobmar | 0:5c786957b6f8 | 51 | mean_value = (uint32_t) aux_value / counter; |
ramirobmar | 0:5c786957b6f8 | 52 | |
ramirobmar | 0:5c786957b6f8 | 53 | lock_buffer->unlock(); |
ramirobmar | 0:5c786957b6f8 | 54 | publish(); |
ramirobmar | 0:5c786957b6f8 | 55 | |
ramirobmar | 0:5c786957b6f8 | 56 | //Thread::signal_wait(READ_SIGNAL); |
ramirobmar | 0:5c786957b6f8 | 57 | Thread::wait(10); |
ramirobmar | 0:5c786957b6f8 | 58 | } |
ramirobmar | 0:5c786957b6f8 | 59 | |
ramirobmar | 0:5c786957b6f8 | 60 | } |
ramirobmar | 0:5c786957b6f8 | 61 | |
ramirobmar | 0:5c786957b6f8 | 62 | |
ramirobmar | 0:5c786957b6f8 | 63 | |
ramirobmar | 0:5c786957b6f8 | 64 | void system_measurements_thread(void const *argument) |
ramirobmar | 0:5c786957b6f8 | 65 | { |
ramirobmar | 0:5c786957b6f8 | 66 | uint16_t measurement; |
ramirobmar | 0:5c786957b6f8 | 67 | uint32_t data_in; |
ramirobmar | 0:5c786957b6f8 | 68 | |
ramirobmar | 0:5c786957b6f8 | 69 | while (true) { |
ramirobmar | 0:5c786957b6f8 | 70 | |
ramirobmar | 0:5c786957b6f8 | 71 | |
ramirobmar | 0:5c786957b6f8 | 72 | sem_adc->wait(osWaitForever); |
ramirobmar | 0:5c786957b6f8 | 73 | measurement=m_adc->readADC_Differential_0_1(); |
ramirobmar | 0:5c786957b6f8 | 74 | sem_adc->release(); |
ramirobmar | 0:5c786957b6f8 | 75 | |
ramirobmar | 0:5c786957b6f8 | 76 | data_in = measurement * gain - offset; |
ramirobmar | 0:5c786957b6f8 | 77 | |
ramirobmar | 0:5c786957b6f8 | 78 | lock_buffer->lock(osWaitForever); |
ramirobmar | 0:5c786957b6f8 | 79 | buffer[counter++]=data_in; |
ramirobmar | 0:5c786957b6f8 | 80 | if(counter == LEN_BUFFER) counter = 0; |
ramirobmar | 0:5c786957b6f8 | 81 | lock_buffer->unlock(); |
ramirobmar | 0:5c786957b6f8 | 82 | |
ramirobmar | 0:5c786957b6f8 | 83 | Thread::wait(10); |
ramirobmar | 0:5c786957b6f8 | 84 | |
ramirobmar | 0:5c786957b6f8 | 85 | } |
ramirobmar | 0:5c786957b6f8 | 86 | } |
ramirobmar | 0:5c786957b6f8 | 87 | |
ramirobmar | 0:5c786957b6f8 | 88 | void init_system_measurements() |
ramirobmar | 0:5c786957b6f8 | 89 | { |
ramirobmar | 0:5c786957b6f8 | 90 | m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor |
ramirobmar | 0:5c786957b6f8 | 91 | m_adc->setGain(GAIN_ONE); |
ramirobmar | 0:5c786957b6f8 | 92 | buffer = new uint32_t[LEN_BUFFER]; |
ramirobmar | 0:5c786957b6f8 | 93 | counter=0; |
ramirobmar | 0:5c786957b6f8 | 94 | lock_buffer = new Mutex(); |
ramirobmar | 0:5c786957b6f8 | 95 | sem_adc = new Semaphore(RES_ADC); |
ramirobmar | 0:5c786957b6f8 | 96 | } |
ramirobmar | 0:5c786957b6f8 | 97 | |
ramirobmar | 0:5c786957b6f8 | 98 | int getdata(unsigned char* buf, int count) |
ramirobmar | 0:5c786957b6f8 | 99 | { |
ramirobmar | 0:5c786957b6f8 | 100 | return mysock.receive((char *)buf, (size_t)count); |
ramirobmar | 0:5c786957b6f8 | 101 | } |
ramirobmar | 0:5c786957b6f8 | 102 | |
ramirobmar | 0:5c786957b6f8 | 103 | void init_system_network() |
ramirobmar | 0:5c786957b6f8 | 104 | { |
ramirobmar | 0:5c786957b6f8 | 105 | MQTTPacket_connectData data = MQTTPacket_connectData_initializer; |
ramirobmar | 0:5c786957b6f8 | 106 | MQTTString topicString = MQTTString_initializer; |
ramirobmar | 0:5c786957b6f8 | 107 | |
ramirobmar | 0:5c786957b6f8 | 108 | unsigned char buf[200]; |
ramirobmar | 0:5c786957b6f8 | 109 | int buflen = sizeof(buf); |
ramirobmar | 0:5c786957b6f8 | 110 | |
ramirobmar | 0:5c786957b6f8 | 111 | unsigned char payload[25]= "mypayload"; |
ramirobmar | 0:5c786957b6f8 | 112 | int payloadlen = strlen((char *)payload); |
ramirobmar | 0:5c786957b6f8 | 113 | bool status_err = false; |
ramirobmar | 0:5c786957b6f8 | 114 | |
ramirobmar | 0:5c786957b6f8 | 115 | int len = 0; |
ramirobmar | 0:5c786957b6f8 | 116 | int connack_rc; |
ramirobmar | 0:5c786957b6f8 | 117 | unsigned char sessionPresent; |
ramirobmar | 0:5c786957b6f8 | 118 | |
ramirobmar | 0:5c786957b6f8 | 119 | unsigned char dup; |
ramirobmar | 0:5c786957b6f8 | 120 | int qos; |
ramirobmar | 0:5c786957b6f8 | 121 | unsigned char retained; |
ramirobmar | 0:5c786957b6f8 | 122 | unsigned short msgid; |
ramirobmar | 0:5c786957b6f8 | 123 | int payloadlen_in; |
ramirobmar | 0:5c786957b6f8 | 124 | char* payload_in; |
ramirobmar | 0:5c786957b6f8 | 125 | int rc; |
ramirobmar | 0:5c786957b6f8 | 126 | |
ramirobmar | 0:5c786957b6f8 | 127 | |
ramirobmar | 0:5c786957b6f8 | 128 | eth.init(); //Use DHCP |
ramirobmar | 0:5c786957b6f8 | 129 | eth.connect(); |
ramirobmar | 0:5c786957b6f8 | 130 | |
ramirobmar | 0:5c786957b6f8 | 131 | |
ramirobmar | 0:5c786957b6f8 | 132 | rc=mysock.connect("iot.eclipse.org", 1883); |
ramirobmar | 0:5c786957b6f8 | 133 | if(rc==-1) |
ramirobmar | 0:5c786957b6f8 | 134 | { |
ramirobmar | 0:5c786957b6f8 | 135 | eth.disconnect(); |
ramirobmar | 0:5c786957b6f8 | 136 | return; |
ramirobmar | 0:5c786957b6f8 | 137 | } |
ramirobmar | 0:5c786957b6f8 | 138 | |
ramirobmar | 0:5c786957b6f8 | 139 | data.clientID.cstring = "SendReceive mbed MQTT "; |
ramirobmar | 0:5c786957b6f8 | 140 | data.keepAliveInterval = 20; |
ramirobmar | 0:5c786957b6f8 | 141 | data.cleansession = 1; |
ramirobmar | 0:5c786957b6f8 | 142 | |
ramirobmar | 0:5c786957b6f8 | 143 | mysock.set_blocking(true, 1000); /* 1 second Timeout */ |
ramirobmar | 0:5c786957b6f8 | 144 | |
ramirobmar | 0:5c786957b6f8 | 145 | len = MQTTSerialize_connect(buf, buflen, &data); |
ramirobmar | 0:5c786957b6f8 | 146 | rc = mysock.send((char *)buf, len); |
ramirobmar | 0:5c786957b6f8 | 147 | if(rc==-1) |
ramirobmar | 0:5c786957b6f8 | 148 | { |
ramirobmar | 0:5c786957b6f8 | 149 | eth.disconnect(); |
ramirobmar | 0:5c786957b6f8 | 150 | return; |
ramirobmar | 0:5c786957b6f8 | 151 | |
ramirobmar | 0:5c786957b6f8 | 152 | } |
ramirobmar | 0:5c786957b6f8 | 153 | |
ramirobmar | 0:5c786957b6f8 | 154 | /* wait for connack */ |
ramirobmar | 0:5c786957b6f8 | 155 | if (MQTTPacket_read(buf, buflen, getdata) == CONNACK) |
ramirobmar | 0:5c786957b6f8 | 156 | { |
ramirobmar | 0:5c786957b6f8 | 157 | |
ramirobmar | 0:5c786957b6f8 | 158 | if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0) |
ramirobmar | 0:5c786957b6f8 | 159 | { |
ramirobmar | 0:5c786957b6f8 | 160 | sprintf(msg_err,"Unable to connect, return code %d",connack_rc); |
ramirobmar | 0:5c786957b6f8 | 161 | status_err = true; |
ramirobmar | 0:5c786957b6f8 | 162 | } |
ramirobmar | 0:5c786957b6f8 | 163 | else |
ramirobmar | 0:5c786957b6f8 | 164 | status_err = false; |
ramirobmar | 0:5c786957b6f8 | 165 | } |
ramirobmar | 0:5c786957b6f8 | 166 | else |
ramirobmar | 0:5c786957b6f8 | 167 | status_err = true; |
ramirobmar | 0:5c786957b6f8 | 168 | |
ramirobmar | 0:5c786957b6f8 | 169 | if (!status_err) |
ramirobmar | 0:5c786957b6f8 | 170 | { |
ramirobmar | 0:5c786957b6f8 | 171 | topicString.cstring = "pubtopic"; |
ramirobmar | 0:5c786957b6f8 | 172 | |
ramirobmar | 0:5c786957b6f8 | 173 | while (!toStop) |
ramirobmar | 0:5c786957b6f8 | 174 | { |
ramirobmar | 0:5c786957b6f8 | 175 | if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH) |
ramirobmar | 0:5c786957b6f8 | 176 | { |
ramirobmar | 0:5c786957b6f8 | 177 | |
ramirobmar | 0:5c786957b6f8 | 178 | MQTTString receivedTopic; |
ramirobmar | 0:5c786957b6f8 | 179 | rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen); |
ramirobmar | 0:5c786957b6f8 | 180 | sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in); |
ramirobmar | 0:5c786957b6f8 | 181 | } |
ramirobmar | 0:5c786957b6f8 | 182 | |
ramirobmar | 0:5c786957b6f8 | 183 | sprintf(msg_status,"publishing reading\n"); |
ramirobmar | 0:5c786957b6f8 | 184 | len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen); |
ramirobmar | 0:5c786957b6f8 | 185 | rc = mysock.send((char *)buf, len); |
ramirobmar | 0:5c786957b6f8 | 186 | |
ramirobmar | 0:5c786957b6f8 | 187 | } |
ramirobmar | 0:5c786957b6f8 | 188 | |
ramirobmar | 0:5c786957b6f8 | 189 | |
ramirobmar | 0:5c786957b6f8 | 190 | } |
ramirobmar | 0:5c786957b6f8 | 191 | else{ |
ramirobmar | 0:5c786957b6f8 | 192 | eth.disconnect(); |
ramirobmar | 0:5c786957b6f8 | 193 | } |
ramirobmar | 0:5c786957b6f8 | 194 | |
ramirobmar | 0:5c786957b6f8 | 195 | return; |
ramirobmar | 0:5c786957b6f8 | 196 | |
ramirobmar | 0:5c786957b6f8 | 197 | |
ramirobmar | 0:5c786957b6f8 | 198 | |
ramirobmar | 0:5c786957b6f8 | 199 | |
ramirobmar | 0:5c786957b6f8 | 200 | } |
ramirobmar | 0:5c786957b6f8 | 201 | |
ramirobmar | 0:5c786957b6f8 | 202 | int publish() |
ramirobmar | 0:5c786957b6f8 | 203 | { |
ramirobmar | 0:5c786957b6f8 | 204 | MQTTPacket_connectData data = MQTTPacket_connectData_initializer; |
ramirobmar | 0:5c786957b6f8 | 205 | int rc = 0; |
ramirobmar | 0:5c786957b6f8 | 206 | unsigned char buf[200]; |
ramirobmar | 0:5c786957b6f8 | 207 | int buflen = sizeof(buf); |
ramirobmar | 0:5c786957b6f8 | 208 | TCPSocketConnection mysock; |
ramirobmar | 0:5c786957b6f8 | 209 | MQTTString topicString = MQTTString_initializer; |
ramirobmar | 0:5c786957b6f8 | 210 | unsigned char payload[25] = "I'm alive!"; |
ramirobmar | 0:5c786957b6f8 | 211 | int payloadlen = strlen((char *)payload); |
ramirobmar | 0:5c786957b6f8 | 212 | int len = 0; |
ramirobmar | 0:5c786957b6f8 | 213 | |
ramirobmar | 0:5c786957b6f8 | 214 | mysock.connect("m2m.eclipse.org", 1883); |
ramirobmar | 0:5c786957b6f8 | 215 | |
ramirobmar | 0:5c786957b6f8 | 216 | sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE); |
ramirobmar | 0:5c786957b6f8 | 217 | data.keepAliveInterval = 20; |
ramirobmar | 0:5c786957b6f8 | 218 | data.cleansession = 1; |
ramirobmar | 0:5c786957b6f8 | 219 | data.MQTTVersion = 3; |
ramirobmar | 0:5c786957b6f8 | 220 | |
ramirobmar | 0:5c786957b6f8 | 221 | len = MQTTSerialize_connect(buf, buflen, &data); |
ramirobmar | 0:5c786957b6f8 | 222 | |
ramirobmar | 0:5c786957b6f8 | 223 | sprintf(topicString.cstring,"MESUREMENT:%d",mean_value); |
ramirobmar | 0:5c786957b6f8 | 224 | |
ramirobmar | 0:5c786957b6f8 | 225 | len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen); |
ramirobmar | 0:5c786957b6f8 | 226 | |
ramirobmar | 0:5c786957b6f8 | 227 | len += MQTTSerialize_disconnect(buf + len, buflen - len); |
ramirobmar | 0:5c786957b6f8 | 228 | |
ramirobmar | 0:5c786957b6f8 | 229 | rc = 0; |
ramirobmar | 0:5c786957b6f8 | 230 | while (rc < len) |
ramirobmar | 0:5c786957b6f8 | 231 | { |
ramirobmar | 0:5c786957b6f8 | 232 | int rc1 = mysock.send((char *)buf, len); |
ramirobmar | 0:5c786957b6f8 | 233 | if (rc1 == -1) |
ramirobmar | 0:5c786957b6f8 | 234 | { |
ramirobmar | 0:5c786957b6f8 | 235 | sprintf(msg_status,"Send failed"); |
ramirobmar | 0:5c786957b6f8 | 236 | |
ramirobmar | 0:5c786957b6f8 | 237 | break; |
ramirobmar | 0:5c786957b6f8 | 238 | } |
ramirobmar | 0:5c786957b6f8 | 239 | else |
ramirobmar | 0:5c786957b6f8 | 240 | rc += rc1; |
ramirobmar | 0:5c786957b6f8 | 241 | } |
ramirobmar | 0:5c786957b6f8 | 242 | if (rc == len) |
ramirobmar | 0:5c786957b6f8 | 243 | sprintf(msg_status,"Send succeeded"); |
ramirobmar | 0:5c786957b6f8 | 244 | |
ramirobmar | 0:5c786957b6f8 | 245 | wait(0.2); |
ramirobmar | 0:5c786957b6f8 | 246 | |
ramirobmar | 0:5c786957b6f8 | 247 | return 0; |
ramirobmar | 0:5c786957b6f8 | 248 | } |
ramirobmar | 0:5c786957b6f8 | 249 | |
ramirobmar | 0:5c786957b6f8 | 250 | |
ramirobmar | 0:5c786957b6f8 | 251 | |
ramirobmar | 0:5c786957b6f8 | 252 | |
ramirobmar | 0:5c786957b6f8 | 253 | int main() |
ramirobmar | 0:5c786957b6f8 | 254 | { |
ramirobmar | 0:5c786957b6f8 | 255 | |
ramirobmar | 0:5c786957b6f8 | 256 | |
ramirobmar | 0:5c786957b6f8 | 257 | init_system_measurements(); // Initialization system's measurement |
ramirobmar | 0:5c786957b6f8 | 258 | init_system_network(); |
ramirobmar | 0:5c786957b6f8 | 259 | |
ramirobmar | 0:5c786957b6f8 | 260 | thread1 = new Thread(system_measurements_thread); |
ramirobmar | 0:5c786957b6f8 | 261 | thread2 = new Thread(set_values); |
ramirobmar | 0:5c786957b6f8 | 262 | |
ramirobmar | 0:5c786957b6f8 | 263 | |
ramirobmar | 0:5c786957b6f8 | 264 | while (true) { |
ramirobmar | 0:5c786957b6f8 | 265 | |
ramirobmar | 0:5c786957b6f8 | 266 | } |
ramirobmar | 0:5c786957b6f8 | 267 | |
ramirobmar | 0:5c786957b6f8 | 268 | } |