Fork of my original MQTTGateway

Dependencies:   mbed-http

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers MQTTSManager.cpp Source File

MQTTSManager.cpp

00001 #include "MQTTSManager.h"
00002 #include "XbeeMonitor.h"
00003 #include "Utils.h"
00004 #include "jsmn.h"
00005 #include <string>
00006 
00007 using namespace MQTT;
00008 
00009 #define MQTTS_PORT  8883
00010 
00011 static const char * topic_update = "garden_update";
00012 static const char * topic_listen = "garden_status";
00013 static const char * hostname = "mqtt.mbedhacks.com";
00014 static const char * clientID = "mbed-sample";
00015 static const char * username = "mbedhacks";
00016 static const char * password = "qwer123";
00017 
00018 static MQTTThreadedClient * pmqtt = NULL;
00019 Thread mqttThd(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
00020 RadioControlData postdata;
00021 static char tempbuff[100];
00022 
00023 static int jsoneq(const char * json, jsmntok_t * tok, const char * s)
00024 {
00025     if (tok->type == JSMN_STRING && (int) strlen(s) == tok->end - tok->start &&
00026         strncmp(json + tok->start, s, tok->end - tok->start) == 0) {
00027             return 0;
00028     }
00029     return -1;    
00030 }
00031 
00032 void messageArrived(MessageData& md)
00033 {
00034     int i, r;
00035     
00036     jsmn_parser p;
00037     jsmntok_t t[100];
00038         
00039     Message &message = md.message;
00040     printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
00041     printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
00042     
00043     // handle payload
00044     const char * jsonstring = std::string((const char *) message.payload, message.payloadlen).c_str();
00045     
00046     jsmn_init(&p);
00047     r = jsmn_parse(&p, jsonstring, strlen(jsonstring), t, sizeof(t)/sizeof(t[0]));
00048     
00049     uint64_t radio_id = 0;
00050     int sprinkler_pin = 1; // 0 - turn on sprinkler, 1 - off
00051     
00052     /* Top level element is an object */
00053     if ((r > 0) && (t[0].type == JSMN_OBJECT)) 
00054     { 
00055         /* Loop over all tokens */
00056         for (i = 1; i < r; i++)
00057         {
00058             if (jsoneq(jsonstring, &t[i], "radioid") == 0)
00059             {
00060                 memset(tempbuff, 0, sizeof(tempbuff));                
00061                 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start);
00062                 radio_id = strtoull(&tempbuff[0], NULL, 0);
00063                 i++;                
00064             }
00065             else if (jsoneq(jsonstring, &t[i], "sprinkler") == 0)
00066             {
00067                 memset(tempbuff, 0, sizeof(tempbuff));
00068                 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start);
00069                 sprinkler_pin = strtoul(&tempbuff[0], NULL, 0);
00070                 i++;
00071             }
00072             else
00073             {
00074                 
00075             }
00076         }    
00077     }
00078     
00079     // TODO: Send the values to the XBeeMonitor thread
00080     printf("Radio ID: %llu\r\n", radio_id);
00081     printf("Sprinkler Pin : %d\r\n", sprinkler_pin);
00082     postdata.radioID = radio_id;
00083     postdata.sprinkler_pin = sprinkler_pin;
00084     postRadioControl(postdata);
00085 }
00086 
00087 int mqttsInit(NetworkInterface * net, const char * pem)
00088 {
00089     pmqtt = new MQTTThreadedClient(net, pem);
00090     if (pmqtt == NULL) 
00091         return -1;
00092     
00093     MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer;
00094     logindata.MQTTVersion = 3;
00095     logindata.clientID.cstring = (char *) clientID;
00096     logindata.username.cstring = (char *) username;
00097     logindata.password.cstring = (char *) password;
00098     
00099     pmqtt->setConnectionParameters(hostname, MQTTS_PORT, logindata);
00100     pmqtt->addTopicHandler(topic_listen, messageArrived);
00101     
00102     return 0;
00103 }
00104 
00105 void postMQTTUpdate(SensorData &msg)
00106 {
00107     // Serialize data to json string ...
00108     if (pmqtt)
00109     {
00110         PubMessage message;
00111         message.qos = QOS0;
00112         message.id = 123;
00113         
00114         strcpy(&message.topic[0], topic_update);
00115         size_t numbytes = snprintf(&message.payload[0], MAX_MQTT_PAYLOAD_SIZE,
00116                 "{\"radio\":%llu,\"status\":{\"sprinkler\":%d,\"humidity\":%d,\"temperature\":%.2f,\"luminance\":%d}}", 
00117                 msg.deviceaddr,
00118                 msg.sprinkler,
00119                 msg.humidity,
00120                 msg.temperature,
00121                 msg.luminance);
00122         printf("[%s]\r\n", &message.payload[0]);
00123         message.payloadlen = numbytes;
00124         pmqtt->publish(message);
00125     }
00126 }
00127 
00128 int runMQTTS()
00129 {
00130     if ( pmqtt && (mqttThd.start(mbed::callback(pmqtt, &MQTTThreadedClient::startListener)) != osOK ) )
00131         return -1;
00132     return 0;
00133 }