Vergil Cola
/
MQTTGateway2
Fork of my original MQTTGateway
Embed:
(wiki syntax)
Show/hide line numbers
MQTTSManager.cpp
00001 #include "MQTTSManager.h" 00002 #include "XbeeMonitor.h" 00003 #include "Utils.h" 00004 #include "jsmn.h" 00005 #include <string> 00006 00007 using namespace MQTT; 00008 00009 #define MQTTS_PORT 8883 00010 00011 static const char * topic_update = "garden_update"; 00012 static const char * topic_listen = "garden_status"; 00013 static const char * hostname = "mqtt.mbedhacks.com"; 00014 static const char * clientID = "mbed-sample"; 00015 static const char * username = "mbedhacks"; 00016 static const char * password = "qwer123"; 00017 00018 static MQTTThreadedClient * pmqtt = NULL; 00019 Thread mqttThd(osPriorityNormal, DEFAULT_STACK_SIZE * 2); 00020 RadioControlData postdata; 00021 static char tempbuff[100]; 00022 00023 static int jsoneq(const char * json, jsmntok_t * tok, const char * s) 00024 { 00025 if (tok->type == JSMN_STRING && (int) strlen(s) == tok->end - tok->start && 00026 strncmp(json + tok->start, s, tok->end - tok->start) == 0) { 00027 return 0; 00028 } 00029 return -1; 00030 } 00031 00032 void messageArrived(MessageData& md) 00033 { 00034 int i, r; 00035 00036 jsmn_parser p; 00037 jsmntok_t t[100]; 00038 00039 Message &message = md.message; 00040 printf("Arrived Callback 1 : qos %d, retained %d, dup %d, packetid %d\r\n", message.qos, message.retained, message.dup, message.id); 00041 printf("Payload [%.*s]\r\n", message.payloadlen, (char*)message.payload); 00042 00043 // handle payload 00044 const char * jsonstring = std::string((const char *) message.payload, message.payloadlen).c_str(); 00045 00046 jsmn_init(&p); 00047 r = jsmn_parse(&p, jsonstring, strlen(jsonstring), t, sizeof(t)/sizeof(t[0])); 00048 00049 uint64_t radio_id = 0; 00050 int sprinkler_pin = 1; // 0 - turn on sprinkler, 1 - off 00051 00052 /* Top level element is an object */ 00053 if ((r > 0) && (t[0].type == JSMN_OBJECT)) 00054 { 00055 /* Loop over all tokens */ 00056 for (i = 1; i < r; i++) 00057 { 00058 if (jsoneq(jsonstring, &t[i], "radioid") == 0) 00059 { 00060 memset(tempbuff, 0, sizeof(tempbuff)); 00061 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start); 00062 radio_id = strtoull(&tempbuff[0], NULL, 0); 00063 i++; 00064 } 00065 else if (jsoneq(jsonstring, &t[i], "sprinkler") == 0) 00066 { 00067 memset(tempbuff, 0, sizeof(tempbuff)); 00068 strncpy(tempbuff, jsonstring + t[i+1].start, t[i+1].end - t[i+1].start); 00069 sprinkler_pin = strtoul(&tempbuff[0], NULL, 0); 00070 i++; 00071 } 00072 else 00073 { 00074 00075 } 00076 } 00077 } 00078 00079 // TODO: Send the values to the XBeeMonitor thread 00080 printf("Radio ID: %llu\r\n", radio_id); 00081 printf("Sprinkler Pin : %d\r\n", sprinkler_pin); 00082 postdata.radioID = radio_id; 00083 postdata.sprinkler_pin = sprinkler_pin; 00084 postRadioControl(postdata); 00085 } 00086 00087 int mqttsInit(NetworkInterface * net, const char * pem) 00088 { 00089 pmqtt = new MQTTThreadedClient(net, pem); 00090 if (pmqtt == NULL) 00091 return -1; 00092 00093 MQTTPacket_connectData logindata = MQTTPacket_connectData_initializer; 00094 logindata.MQTTVersion = 3; 00095 logindata.clientID.cstring = (char *) clientID; 00096 logindata.username.cstring = (char *) username; 00097 logindata.password.cstring = (char *) password; 00098 00099 pmqtt->setConnectionParameters(hostname, MQTTS_PORT, logindata); 00100 pmqtt->addTopicHandler(topic_listen, messageArrived); 00101 00102 return 0; 00103 } 00104 00105 void postMQTTUpdate(SensorData &msg) 00106 { 00107 // Serialize data to json string ... 00108 if (pmqtt) 00109 { 00110 PubMessage message; 00111 message.qos = QOS0; 00112 message.id = 123; 00113 00114 strcpy(&message.topic[0], topic_update); 00115 size_t numbytes = snprintf(&message.payload[0], MAX_MQTT_PAYLOAD_SIZE, 00116 "{\"radio\":%llu,\"status\":{\"sprinkler\":%d,\"humidity\":%d,\"temperature\":%.2f,\"luminance\":%d}}", 00117 msg.deviceaddr, 00118 msg.sprinkler, 00119 msg.humidity, 00120 msg.temperature, 00121 msg.luminance); 00122 printf("[%s]\r\n", &message.payload[0]); 00123 message.payloadlen = numbytes; 00124 pmqtt->publish(message); 00125 } 00126 } 00127 00128 int runMQTTS() 00129 { 00130 if ( pmqtt && (mqttThd.start(mbed::callback(pmqtt, &MQTTThreadedClient::startListener)) != osOK ) ) 00131 return -1; 00132 return 0; 00133 }
Generated on Tue Jul 12 2022 18:06:46 by 1.7.2