Vergil Cola
/
MQTTGatewayK64
Fork of my MQTTGateway
Diff: MQTTSManager/MQTTSManager.cpp
- Revision:
- 0:f1d3878b8dd9
diff -r 000000000000 -r f1d3878b8dd9 MQTTSManager/MQTTSManager.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/MQTTSManager/MQTTSManager.cpp Sat Apr 08 14:45:51 2017 +0000 @@ -0,0 +1,133 @@ +#include "MQTTSManager.h" +#include "XbeeMonitor.h" +#include "Utils.h" +#include "jsmn.h" +#include <string> + +using namespace MQTT; + +#define MQTTS_PORT 8883 + +static const char * topic_update = "garden_update"; +static const char * topic_listen = "garden_status"; +static const char * hostname = "mqtt.mbedhacks.com"; +static const char * clientID = "mbed-sample"; +static const char * username = "mbedhacks"; +static const char * password = "qwer123"; + +static MQTTThreadedClient * pmqtt = NULL; +Thread mqttThd(osPriorityNormal, DEFAULT_STACK_SIZE * 2); +RadioControlData postdata; +static char tempbuff[100]; + +static int jsoneq(const char * json, jsmntok_t * tok, const char * s) +{ + if (tok->type == JSMN_STRING && (int) strlen(s) == tok->end - tok->start && + strncmp(json + tok->start, s, tok->end - tok->start) == 0) { + return 0; + } + return -1; +} + +void messageArrived(MessageData& md) +{ + int i, r; + + jsmn_parser p; + jsmntok_t t[100]; + + Message &message = md.message; + printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); + printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload); + + // handle payload + const char * jsonstring = std::string((const char *) message.payload, message.payloadlen).c_str(); + + jsmn_init(&p); + r = jsmn_parse(&p, jsonstring, strlen(jsonstring), t, sizeof(t)/sizeof(t[0])); + + uint64_t radio_id = 0; + int sprinkler_pin = 1; // 0 - turn on sprinkler, 1 - off + + /* Top level element is an object */ + if ((r > 0) && (t[0].type == JSMN_OBJECT)) + { + /* Loop over all tokens */ + for (i = 1; i < r; i++) + { + if (jsoneq(jsonstring, &t[i], "radioid") == 0) + { + memset(tempbuff, 0, sizeof(tempbuff)); + strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start); + radio_id = strtoull(&tempbuff[0], NULL, 0); + i++; + } + else if (jsoneq(jsonstring, &t[i], "sprinkler") == 0) + { + memset(tempbuff, 0, sizeof(tempbuff)); + strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start); + sprinkler_pin = strtoul(&tempbuff[0], NULL, 0); + i++; + } + else + { + + } + } + } + + // TODO: Send the values to the XBeeMonitor thread + printf("Radio ID: %llu\r\n", radio_id); + printf("Sprinkler Pin : %d\r\n", sprinkler_pin); + postdata.radioID = radio_id; + postdata.sprinkler_pin = sprinkler_pin; + postRadioControl(postdata); +} + +int mqttsInit(NetworkInterface * net, const char * pem) +{ + pmqtt = new MQTTThreadedClient(net, pem); + if (pmqtt == NULL) + return -1; + + MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer; + logindata.MQTTVersion = 3; + logindata.clientID.cstring = (char *) clientID; + logindata.username.cstring = (char *) username; + logindata.password.cstring = (char *) password; + + pmqtt->setConnectionParameters(hostname, MQTTS_PORT, logindata); + pmqtt->addTopicHandler(topic_listen, messageArrived); + + return 0; +} + +void postMQTTUpdate(SensorData &msg) +{ + // Serialize data to json string ... + if (pmqtt) + { + PubMessage message; + message.qos = QOS0; + message.id = 123; + + strcpy(&message.topic[0], topic_update); + size_t numbytes = snprintf(&message.payload[0], MAX_MQTT_PAYLOAD_SIZE, + "{\"radio\":%llu,\"status\":{\"sprinkler\":%d,\"humidity\":%d,\"temperature\":%.2f,\"luminance\":%d}}", + msg.deviceaddr, + msg.sprinkler, + msg.humidity, + msg.temperature, + msg.luminance); + printf("[%s]\r\n", &message.payload[0]); + message.payloadlen = numbytes; + pmqtt->publish(message); + } +} + +int runMQTTS() +{ + if ( pmqtt && (mqttThd.start(mbed::callback(pmqtt, &MQTTThreadedClient::startListener)) != osOK ) ) + return -1; + return 0; +} \ No newline at end of file