Fork of my original MQTTGateway

Dependencies:   mbed-http

Committer:
vpcola
Date:
Sat Apr 08 14:43:14 2017 +0000
Revision:
0:a1734fe1ec4b
Initial commit

Who changed what in which revision?

UserRevisionLine numberNew contents of line
vpcola 0:a1734fe1ec4b 1 #include "MQTTSManager.h"
vpcola 0:a1734fe1ec4b 2 #include "XbeeMonitor.h"
vpcola 0:a1734fe1ec4b 3 #include "Utils.h"
vpcola 0:a1734fe1ec4b 4 #include "jsmn.h"
vpcola 0:a1734fe1ec4b 5 #include <string>
vpcola 0:a1734fe1ec4b 6
vpcola 0:a1734fe1ec4b 7 using namespace MQTT;
vpcola 0:a1734fe1ec4b 8
vpcola 0:a1734fe1ec4b 9 #define MQTTS_PORT 8883
vpcola 0:a1734fe1ec4b 10
vpcola 0:a1734fe1ec4b 11 static const char * topic_update = "garden_update";
vpcola 0:a1734fe1ec4b 12 static const char * topic_listen = "garden_status";
vpcola 0:a1734fe1ec4b 13 static const char * hostname = "mqtt.mbedhacks.com";
vpcola 0:a1734fe1ec4b 14 static const char * clientID = "mbed-sample";
vpcola 0:a1734fe1ec4b 15 static const char * username = "mbedhacks";
vpcola 0:a1734fe1ec4b 16 static const char * password = "qwer123";
vpcola 0:a1734fe1ec4b 17
vpcola 0:a1734fe1ec4b 18 static MQTTThreadedClient * pmqtt = NULL;
vpcola 0:a1734fe1ec4b 19 Thread mqttThd(osPriorityNormal, DEFAULT_STACK_SIZE * 2);
vpcola 0:a1734fe1ec4b 20 RadioControlData postdata;
vpcola 0:a1734fe1ec4b 21 static char tempbuff[100];
vpcola 0:a1734fe1ec4b 22
vpcola 0:a1734fe1ec4b 23 static int jsoneq(const char * json, jsmntok_t * tok, const char * s)
vpcola 0:a1734fe1ec4b 24 {
vpcola 0:a1734fe1ec4b 25 if (tok->type == JSMN_STRING && (int) strlen(s) == tok->end - tok->start &&
vpcola 0:a1734fe1ec4b 26 strncmp(json + tok->start, s, tok->end - tok->start) == 0) {
vpcola 0:a1734fe1ec4b 27 return 0;
vpcola 0:a1734fe1ec4b 28 }
vpcola 0:a1734fe1ec4b 29 return -1;
vpcola 0:a1734fe1ec4b 30 }
vpcola 0:a1734fe1ec4b 31
vpcola 0:a1734fe1ec4b 32 void messageArrived(MessageData& md)
vpcola 0:a1734fe1ec4b 33 {
vpcola 0:a1734fe1ec4b 34 int i, r;
vpcola 0:a1734fe1ec4b 35
vpcola 0:a1734fe1ec4b 36 jsmn_parser p;
vpcola 0:a1734fe1ec4b 37 jsmntok_t t[100];
vpcola 0:a1734fe1ec4b 38
vpcola 0:a1734fe1ec4b 39 Message &message = md.message;
vpcola 0:a1734fe1ec4b 40 printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id);
vpcola 0:a1734fe1ec4b 41 printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload);
vpcola 0:a1734fe1ec4b 42
vpcola 0:a1734fe1ec4b 43 // handle payload
vpcola 0:a1734fe1ec4b 44 const char * jsonstring = std::string((const char *) message.payload, message.payloadlen).c_str();
vpcola 0:a1734fe1ec4b 45
vpcola 0:a1734fe1ec4b 46 jsmn_init(&p);
vpcola 0:a1734fe1ec4b 47 r = jsmn_parse(&p, jsonstring, strlen(jsonstring), t, sizeof(t)/sizeof(t[0]));
vpcola 0:a1734fe1ec4b 48
vpcola 0:a1734fe1ec4b 49 uint64_t radio_id = 0;
vpcola 0:a1734fe1ec4b 50 int sprinkler_pin = 1; // 0 - turn on sprinkler, 1 - off
vpcola 0:a1734fe1ec4b 51
vpcola 0:a1734fe1ec4b 52 /* Top level element is an object */
vpcola 0:a1734fe1ec4b 53 if ((r > 0) && (t[0].type == JSMN_OBJECT))
vpcola 0:a1734fe1ec4b 54 {
vpcola 0:a1734fe1ec4b 55 /* Loop over all tokens */
vpcola 0:a1734fe1ec4b 56 for (i = 1; i < r; i++)
vpcola 0:a1734fe1ec4b 57 {
vpcola 0:a1734fe1ec4b 58 if (jsoneq(jsonstring, &t[i], "radioid") == 0)
vpcola 0:a1734fe1ec4b 59 {
vpcola 0:a1734fe1ec4b 60 memset(tempbuff, 0, sizeof(tempbuff));
vpcola 0:a1734fe1ec4b 61 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start);
vpcola 0:a1734fe1ec4b 62 radio_id = strtoull(&tempbuff[0], NULL, 0);
vpcola 0:a1734fe1ec4b 63 i++;
vpcola 0:a1734fe1ec4b 64 }
vpcola 0:a1734fe1ec4b 65 else if (jsoneq(jsonstring, &t[i], "sprinkler") == 0)
vpcola 0:a1734fe1ec4b 66 {
vpcola 0:a1734fe1ec4b 67 memset(tempbuff, 0, sizeof(tempbuff));
vpcola 0:a1734fe1ec4b 68 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start);
vpcola 0:a1734fe1ec4b 69 sprinkler_pin = strtoul(&tempbuff[0], NULL, 0);
vpcola 0:a1734fe1ec4b 70 i++;
vpcola 0:a1734fe1ec4b 71 }
vpcola 0:a1734fe1ec4b 72 else
vpcola 0:a1734fe1ec4b 73 {
vpcola 0:a1734fe1ec4b 74
vpcola 0:a1734fe1ec4b 75 }
vpcola 0:a1734fe1ec4b 76 }
vpcola 0:a1734fe1ec4b 77 }
vpcola 0:a1734fe1ec4b 78
vpcola 0:a1734fe1ec4b 79 // TODO: Send the values to the XBeeMonitor thread
vpcola 0:a1734fe1ec4b 80 printf("Radio ID: %llu\r\n", radio_id);
vpcola 0:a1734fe1ec4b 81 printf("Sprinkler Pin : %d\r\n", sprinkler_pin);
vpcola 0:a1734fe1ec4b 82 postdata.radioID = radio_id;
vpcola 0:a1734fe1ec4b 83 postdata.sprinkler_pin = sprinkler_pin;
vpcola 0:a1734fe1ec4b 84 postRadioControl(postdata);
vpcola 0:a1734fe1ec4b 85 }
vpcola 0:a1734fe1ec4b 86
vpcola 0:a1734fe1ec4b 87 int mqttsInit(NetworkInterface * net, const char * pem)
vpcola 0:a1734fe1ec4b 88 {
vpcola 0:a1734fe1ec4b 89 pmqtt = new MQTTThreadedClient(net, pem);
vpcola 0:a1734fe1ec4b 90 if (pmqtt == NULL)
vpcola 0:a1734fe1ec4b 91 return -1;
vpcola 0:a1734fe1ec4b 92
vpcola 0:a1734fe1ec4b 93 MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer;
vpcola 0:a1734fe1ec4b 94 logindata.MQTTVersion = 3;
vpcola 0:a1734fe1ec4b 95 logindata.clientID.cstring = (char *) clientID;
vpcola 0:a1734fe1ec4b 96 logindata.username.cstring = (char *) username;
vpcola 0:a1734fe1ec4b 97 logindata.password.cstring = (char *) password;
vpcola 0:a1734fe1ec4b 98
vpcola 0:a1734fe1ec4b 99 pmqtt->setConnectionParameters(hostname, MQTTS_PORT, logindata);
vpcola 0:a1734fe1ec4b 100 pmqtt->addTopicHandler(topic_listen, messageArrived);
vpcola 0:a1734fe1ec4b 101
vpcola 0:a1734fe1ec4b 102 return 0;
vpcola 0:a1734fe1ec4b 103 }
vpcola 0:a1734fe1ec4b 104
vpcola 0:a1734fe1ec4b 105 void postMQTTUpdate(SensorData &msg)
vpcola 0:a1734fe1ec4b 106 {
vpcola 0:a1734fe1ec4b 107 // Serialize data to json string ...
vpcola 0:a1734fe1ec4b 108 if (pmqtt)
vpcola 0:a1734fe1ec4b 109 {
vpcola 0:a1734fe1ec4b 110 PubMessage message;
vpcola 0:a1734fe1ec4b 111 message.qos = QOS0;
vpcola 0:a1734fe1ec4b 112 message.id = 123;
vpcola 0:a1734fe1ec4b 113
vpcola 0:a1734fe1ec4b 114 strcpy(&message.topic[0], topic_update);
vpcola 0:a1734fe1ec4b 115 size_t numbytes = snprintf(&message.payload[0], MAX_MQTT_PAYLOAD_SIZE,
vpcola 0:a1734fe1ec4b 116 "{\"radio\":%llu,\"status\":{\"sprinkler\":%d,\"humidity\":%d,\"temperature\":%.2f,\"luminance\":%d}}",
vpcola 0:a1734fe1ec4b 117 msg.deviceaddr,
vpcola 0:a1734fe1ec4b 118 msg.sprinkler,
vpcola 0:a1734fe1ec4b 119 msg.humidity,
vpcola 0:a1734fe1ec4b 120 msg.temperature,
vpcola 0:a1734fe1ec4b 121 msg.luminance);
vpcola 0:a1734fe1ec4b 122 printf("[%s]\r\n", &message.payload[0]);
vpcola 0:a1734fe1ec4b 123 message.payloadlen = numbytes;
vpcola 0:a1734fe1ec4b 124 pmqtt->publish(message);
vpcola 0:a1734fe1ec4b 125 }
vpcola 0:a1734fe1ec4b 126 }
vpcola 0:a1734fe1ec4b 127
vpcola 0:a1734fe1ec4b 128 int runMQTTS()
vpcola 0:a1734fe1ec4b 129 {
vpcola 0:a1734fe1ec4b 130 if ( pmqtt && (mqttThd.start(mbed::callback(pmqtt, &MQTTThreadedClient::startListener)) != osOK ) )
vpcola 0:a1734fe1ec4b 131 return -1;
vpcola 0:a1734fe1ec4b 132 return 0;
vpcola 0:a1734fe1ec4b 133 }