RTOs con capa MQTT

Dependencies:   ADS1015 EthernetInterface MQTT mbed-rtos mbed

Revision:
0:5c786957b6f8
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/main.cpp	Mon Aug 24 13:43:25 2015 +0000
@@ -0,0 +1,268 @@
+#include "mbed.h"
+#include "rtos.h"
+#include "Adafruit_ADS1015.h"
+#include "EthernetInterface.h"
+#include "MQTTPacket.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+
+
+#define LEN_BUFFER 100
+#define RES_ADC 1
+#define ID_SN_DEVICE 0x01
+#define READ_SIGNAL 1
+
+uint32_t measurement;
+uint32_t mean_value;
+uint16_t gain;
+uint32_t offset;
+uint32_t * buffer;
+uint8_t counter;
+Thread * thread1;
+Mutex * lock_buffer;
+Semaphore * sem_adc;
+Thread * thread2;
+Adafruit_ADS1015 * m_adc;
+
+EthernetInterface eth;
+TCPSocketConnection mysock; 
+char msg_err[81];
+char msg_control[81];
+char msg_status[81];
+int toStop = 0;
+
+int publish();
+    
+void set_values(void const *argument)
+{
+    uint8_t i;
+    uint64_t aux_value;
+    
+    while (true) {
+        
+        lock_buffer->lock(osWaitForever);
+        
+        for(i=0;i<counter;i++)
+        {
+            aux_value = aux_value +  buffer[i];  
+        }
+        mean_value = (uint32_t) aux_value / counter;
+        
+        lock_buffer->unlock();
+        publish();
+        
+        //Thread::signal_wait(READ_SIGNAL);
+        Thread::wait(10);
+    }
+    
+}
+
+
+
+void system_measurements_thread(void const *argument)
+{
+    uint16_t measurement;
+    uint32_t data_in;
+    
+    while (true) {
+        
+        
+        sem_adc->wait(osWaitForever);
+        measurement=m_adc->readADC_Differential_0_1(); 
+        sem_adc->release();
+        
+        data_in = measurement * gain - offset;
+        
+        lock_buffer->lock(osWaitForever);
+        buffer[counter++]=data_in;
+        if(counter == LEN_BUFFER) counter = 0;
+        lock_buffer->unlock();
+               
+        Thread::wait(10);
+          
+    }   
+}
+
+void init_system_measurements()
+{
+    m_adc=new Adafruit_ADS1015(0, ADS1015_ADDRESS); // set i2c adress = 0 to allow ADS1115 to use this as default constructor
+    m_adc->setGain(GAIN_ONE);
+    buffer = new uint32_t[LEN_BUFFER];
+    counter=0;
+    lock_buffer = new Mutex();
+    sem_adc = new Semaphore(RES_ADC);
+}
+
+int getdata(unsigned char* buf, int count)
+{
+    return mysock.receive((char *)buf, (size_t)count);
+}
+
+void init_system_network()
+{
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    MQTTString topicString = MQTTString_initializer;
+   
+    unsigned char buf[200];
+    int buflen = sizeof(buf);
+       
+    unsigned char payload[25]= "mypayload";
+    int payloadlen = strlen((char *)payload);
+    bool status_err = false;
+    
+    int len = 0;
+    int connack_rc;
+    unsigned char sessionPresent;
+    
+    unsigned char dup;
+    int qos;
+    unsigned char retained;
+    unsigned short msgid;
+    int payloadlen_in;
+    char* payload_in;
+    int rc;       
+    
+    
+    eth.init(); //Use DHCP
+    eth.connect();  
+    
+    
+    rc=mysock.connect("iot.eclipse.org", 1883); 
+    if(rc==-1)
+    {
+        eth.disconnect();
+        return;
+    }
+    
+    data.clientID.cstring = "SendReceive mbed MQTT ";
+    data.keepAliveInterval = 20;
+    data.cleansession = 1;
+    
+    mysock.set_blocking(true, 1000);  /* 1 second Timeout */
+    
+    len = MQTTSerialize_connect(buf, buflen, &data);
+    rc = mysock.send((char *)buf, len);
+    if(rc==-1)
+    {
+        eth.disconnect();
+        return;
+    
+    }
+
+    /* wait for connack */
+    if (MQTTPacket_read(buf, buflen, getdata) == CONNACK)
+    {
+        
+       if (MQTTDeserialize_connack((unsigned char*)&sessionPresent,(unsigned char*)&connack_rc, (unsigned char*)buf, (int)buflen) != 1 || connack_rc != 0)
+        {
+            sprintf(msg_err,"Unable to connect, return code %d",connack_rc);
+            status_err = true;
+        }
+        else
+           status_err = false; 
+    }
+    else
+        status_err = true;
+        
+    if (!status_err)
+    {
+        topicString.cstring = "pubtopic";    
+        
+        while (!toStop)
+        {
+            if (MQTTPacket_read(buf, buflen, getdata) == PUBLISH)
+            {
+                
+                MQTTString receivedTopic;
+                rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid,&receivedTopic,(unsigned char **)&payload_in,&payloadlen_in, buf, buflen);
+                sprintf(msg_control,"message arrived %.*s\n", payloadlen_in, payload_in);
+            }
+
+            sprintf(msg_status,"publishing reading\n");
+            len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, payload, payloadlen);
+            rc = mysock.send((char *)buf, len);
+            
+        }
+
+    
+    }
+    else{
+        eth.disconnect();
+    }
+    
+    return;
+        
+        
+    
+
+}
+
+int publish()
+{
+    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
+    int rc = 0;
+    unsigned char buf[200];
+    int buflen = sizeof(buf);
+    TCPSocketConnection mysock; 
+    MQTTString topicString = MQTTString_initializer;
+    unsigned char payload[25] = "I'm alive!";
+    int payloadlen = strlen((char *)payload);
+    int len = 0;
+    
+    mysock.connect("m2m.eclipse.org", 1883);
+    
+    sprintf(data.clientID.cstring,"ID:%d",ID_SN_DEVICE);
+    data.keepAliveInterval = 20;
+    data.cleansession = 1;
+    data.MQTTVersion = 3;
+
+    len = MQTTSerialize_connect(buf, buflen, &data);
+
+    sprintf(topicString.cstring,"MESUREMENT:%d",mean_value);
+    
+    len += MQTTSerialize_publish(buf + len, buflen - len, 0, 0, 0, 0, topicString, payload, payloadlen);
+
+    len += MQTTSerialize_disconnect(buf + len, buflen - len);
+
+    rc = 0;
+    while (rc < len)
+    {
+        int rc1 = mysock.send((char *)buf, len);
+        if (rc1 == -1)
+        {
+            sprintf(msg_status,"Send failed");
+            
+            break;
+        }
+        else
+            rc += rc1;
+    }
+    if (rc == len)
+        sprintf(msg_status,"Send succeeded");
+        
+    wait(0.2);
+
+    return 0;
+}
+
+
+
+
+int main()
+{
+    
+    
+    init_system_measurements(); // Initialization system's measurement  
+    init_system_network();
+    
+    thread1 = new Thread(system_measurements_thread);
+    thread2 = new Thread(set_values);
+    
+    
+    while (true) {
+        
+    }
+
+}