iot_water_monitor_v2
Dependencies: easy-connect-v16 Watchdog FP MQTTPacket RecordType-v-16 watersenor_and_temp_code
Simple-MQTT/SimpleMQTT.h
- Committer:
- DuyLionTran
- Date:
- 2018-02-27
- Revision:
- 40:4356c209c58d
- Parent:
- 39:a5ee98bd0050
- Child:
- 43:dcde0e66874a
File content as of revision 40:4356c209c58d:
#ifndef __SIMPLEMQTT_H__ #define __SIMPLEMQTT_H__ /*************************************************************** * Includes ***************************************************************/ #include "easy-connect.h" #include "MQTTClient.h" #include "NDefLib/NDefNfcTag.h" #include "NDefLib/RecordType/RecordURI.h" #include "MQTTNetwork.h" #include "MQTTmbed.h" #include "Json.h" #include "CommandExecution.h" #include "flash_programming.h" /*************************************************************** * Definitions ***************************************************************/ // Configuration values needed to connect to IBM IoT Cloud #define ORG MQTT_ORG_ID // connect to ORG.internetofthings.ibmcloud.com/ For a registered connection, replace with your org #define ID MQTT_DEVICE_ID // For a registered connection is your device id #define AUTH_TOKEN MQTT_DEVICE_PASSWORD // For a registered connection is a device auth-token #define DEFAULT_TYPE_NAME MQTT_DEVICE_TYPE // For a registered connection is device type #define AUTH_METHOD MQTT_USERNAME #define TYPE DEFAULT_TYPE_NAME // For a registered connection, replace with your type #define IBM_IOT_PORT MQTT_PORT #define MQTT_MAX_PACKET_SIZE 400 #define MQTT_MAX_PAYLOAD_SIZE 300 /*************************************************************** * Variables ***************************************************************/ typedef enum { ADC_VALUE = 0, SENSOR_VALUE, RELAY_STATE, CONFIG_VALUE } UploadType; typedef enum { CONTROL_CMD = 0, READ_CMD, SETUP_CMD } CommandType; struct UploadValue { float ADC_PHVal; float ADC_DOVal; float SENSOR_PHVal; float SENSOR_DOVal; uint8_t RELAY_State_1; uint8_t RELAY_State_2; uint8_t RELAY_State_3; uint32_t CONFIG_Time; uint8_t CONFIG_Mode; uint8_t CONFIG_MinOxi; uint8_t CONFIG_MaxOxi; uint16_t CONFIG_UploadInterval; } UploadValue; char *projectName = "WaterMonitor"; static char id[30] = ID; // mac without colons static char org[12] = ORG; static char type[30] = TYPE; static char auth_token[30] = AUTH_TOKEN; // Auth_token is only used in non-quickstart mode static int connack_rc = 0; // MQTT connack return code static bool wifiConnected = true; static bool netConnecting = false; static bool mqttConnecting = false; static bool netConnected = false; static bool connected = false; static int retryAttempt = 0; static int connectTimeout = 1000; uint16_t commandID = 0; static char subscription_url[MQTT_MAX_PAYLOAD_SIZE]; extern struct UploadValue DataStruct; /*************************************************************** * Unity function definitions ***************************************************************/ void MQTT_MessageHandles(uint8_t ControlSignal); /** brief Callback function when MQTT message arrives * param[in] msgMQTT * retral None */ void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT); /** brief Subscribe to a MQTT topic and set the MQTT callback function * param[in] subscribeTopic Topic to be subscribed * param[in] client MQTT client * retral returnCode from MQTTClient.h */ int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct); /** brief Connect to the internet then the MQTT network * param[in] client MQTT client * param[in] mqttNetwork MQTT network * param[in] network The internet network interface (ethernet, wifi...) * retral Internet connect result and returnCode from MQTTClient.h */ int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct); /** brief Setup the number of attempt to re-connect to the internet * param[in] attemptNumber The number of attemp */ int MQTT_GetConnTimeout(int attemptNumber); /** brief Try to reconnect to the internet and MQTT network * retral None */ void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct); /** brief Publish ADC values to the server * param[in] client MQTT client * param[in] inputTime The time when the data is attempt to be sent * param[in] adcVal_0 The ADC value to be sent * retral returnCode from MQTTClient.h */ int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0); /** brief Publish Sensor values to the server * param[in] client MQTT client * param[in] inputTime The time when the data is attempt to be sent * param[in] pHVal The pHVal value to be sent * retral returnCode from MQTTClient.h */ int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float DOVal); /** brief Publish relay states to the server * param[in] client MQTT client * param[in] inputTime The time when the data is attempt to be sent * param[in] relay1 Relay 1 state * param[in] relay2 Relay 2 state * retral returnCode from MQTTClient.h */ int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3); /** brief Publish relay states to the server * param[in] client MQTT client * param[in] inputTime The time when the data is attempt to be sent * param[in] mode current mode: automatic (0) or manual (1) * param[in] maxOxi Maximum Oxygen value * param[in] minOxi Minimum Oxygen value // * param[in] uploadInterval Interval between upload turns * retral returnCode from MQTTClient.h */ int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi); /** brief Upload all the data to the MQTT server * param[in] client MQTT client * param[in] inputTime The time when the data is attempt to be sent * param[in] uploadInterval The period between each upload moment * retral returnCode from MQTTClient.h */ int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct); /********************************************************************************************************************************************************************************************/ /*************************************************************** * Unity function declarations ***************************************************************/ void MQTT_SubscribeCallback(MQTT::MessageData &msgMQTT) { // Message Handles char msg[MQTT_MAX_PAYLOAD_SIZE]; msg[0]='\0'; strncat (msg, (char*)msgMQTT.message.payload, msgMQTT.message.payloadlen); printf ("--->>> MQTT_SubscribeCallback msg: %s\n\r", msg); /* {"type":"3","deviceId":"string"} */ Json json(msg, msgMQTT.message.payloadlen); if (!json.isValidJson()) { printf("Invalid JSON: %s", msg); } else { if (json.type(0) != JSMN_OBJECT ) { printf("Invalid JSON. ROOT element is not Object: %s", msg); } else { int CommandType; int KeyIndex = json.findKeyIndexIn("type", 0); int KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); int ret = json.tokenIntegerValue(KeyValueIndex, CommandType); int receiveCmdID; printf("Command Type: %d, error %d\r\n", CommandType, ret); switch (CommandType) { case CONTROL_CMD: CE_Calibrate(); break; case 3: int relayState1, relayState2, relayState3; KeyIndex = json.findKeyIndexIn("cmdID", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); KeyIndex = json.findKeyIndexIn("relayState1", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, relayState1); KeyIndex = json.findKeyIndexIn("relayState2", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, relayState2); KeyIndex = json.findKeyIndexIn("relayState3", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, relayState3); DataStruct.RELAY_State_1 = relayState1; DataStruct.RELAY_State_2 = relayState2; DataStruct.RELAY_State_3 = relayState3; FP_WriteRelayStates(DataStruct.RELAY_State_1, DataStruct.RELAY_State_2, DataStruct.RELAY_State_3); CE_HandleRelays(relayState1, relayState2, relayState3); break; case 4: int mode, minOxiVal, maxOxiVal, uploadInterval, setRTCTime; KeyIndex = json.findKeyIndexIn("cmdID", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, receiveCmdID); KeyIndex = json.findKeyIndexIn("mode", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, mode); KeyIndex = json.findKeyIndexIn("minOxygenVal", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, minOxiVal); KeyIndex = json.findKeyIndexIn("maxOxygenVal", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, maxOxiVal); KeyIndex = json.findKeyIndexIn("uploadInterval", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, uploadInterval); KeyIndex = json.findKeyIndexIn("setRTCTime", 0); KeyValueIndex = json.findChildIndexOf(KeyIndex, 0); ret = json.tokenIntegerValue(KeyValueIndex, setRTCTime); DataStruct.CONFIG_Mode = mode; DataStruct.CONFIG_MinOxi = minOxiVal; DataStruct.CONFIG_MaxOxi = maxOxiVal; DataStruct.CONFIG_UploadInterval = uploadInterval; FP_WriteConfigValues(DataStruct.CONFIG_Mode, DataStruct.CONFIG_MinOxi, DataStruct.CONFIG_MaxOxi, DataStruct.CONFIG_UploadInterval); CE_SetRTCTime(setRTCTime); break; default: break; } } } } int MQTT_Subscribe(char *subscribeTopic, MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, struct UploadValue uploadStruct) { return client->subscribe(subscribeTopic, MQTT::QOS1, MQTT_SubscribeCallback); } int MQTT_Connect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) { const char* iot_ibm = MQTT_BROKER_URL; char hostname[strlen(org) + strlen(iot_ibm) + 1]; sprintf(hostname, "%s%s", org, iot_ibm); // Construct clientId - d:org:type:id char clientId[strlen(org) + strlen(type) + strlen(id) + 5]; sprintf(clientId, "d:%s:%s:%s", org, type, id); sprintf(subscription_url, "%s.%s/#/device/%s/%s/", org, "internetofthings.ibmcloud.com", id, DEFAULT_TYPE_NAME); // Network debug statements LOG("=====================================\n\r"); LOG("Nucleo IP ADDRESS: %s\n\r", network->get_ip_address()); LOG("Nucleo MAC ADDRESS: %s\n\r", network->get_mac_address()); LOG("Server Hostname: %s port: %d\n\r", hostname, IBM_IOT_PORT); LOG("Client ID: %s\n\r", clientId); LOG("Topic: %s\n\r",MQTT_EVENT_TOPIC); LOG("Subscription URL: %s\n\r", subscription_url); LOG("=====================================\n\r"); netConnecting = true; int rc = mqttNetwork->connect(hostname, IBM_IOT_PORT); if (rc != 0) { printf("rc from TCP connect is %d\r\n", rc); return rc; } printf ("--->TCP Connected\n\r"); netConnected = true; netConnecting = false; // MQTT Connect mqttConnecting = true; MQTTPacket_connectData data = MQTTPacket_connectData_initializer; data.MQTTVersion = 4; data.struct_version = 0; data.clientID.cstring = clientId; data.keepAliveInterval = MQTT_KEEPALIVE; // in Sec data.username.cstring = AUTH_METHOD; data.password.cstring = auth_token; printf ("AutToken: %s\n\r", auth_token); if ((rc = client->connect(data)) != 0) { printf("rc from MQTT connect is %d\r\n", rc); connack_rc = rc; return rc; } connected = true; printf ("--->MQTT Connected\n\r"); if ((rc = MQTT_Subscribe(MQTT_COMMAND_TOPIC, client, uploadStruct)) == 0) { LOG ("--->>>MQTT subscribed to: %s\n\r", MQTT_COMMAND_TOPIC); } else { LOG ("--->>>ERROR MQTT subscribe : %s\n\r", MQTT_COMMAND_TOPIC); } mqttConnecting = false; connack_rc = rc; return rc; } int MQTT_GetConnTimeout(int attemptNumber) { // First 10 attempts try within 3 seconds, next 10 attempts retry after every 1 minute // after 20 attempts, retry every 10 minutes return (attemptNumber < 10) ? 3 : (attemptNumber < 20) ? 60 : 600; } void MQTT_AttemptConnect(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, MQTTNetwork *mqttNetwork, NetworkInterface* network, struct UploadValue uploadStruct) { connected = false; while (MQTT_Connect(client, mqttNetwork, network, uploadStruct) != MQTT_CONNECTION_ACCEPTED) { if (connack_rc == MQTT_NOT_AUTHORIZED || connack_rc == MQTT_BAD_USERNAME_OR_PASSWORD) { printf ("File: %s, Line: %d Error: %d\n\r",__FILE__,__LINE__, connack_rc); return; // don't reattempt to connect if credentials are wrong } int timeout = MQTT_GetConnTimeout(++retryAttempt); WARN("Retry attempt number %d waiting %d\n", retryAttempt, timeout); // if ipstack and client were on the heap we could deconstruct and goto a label where they are constructed // or maybe just add the proper members to do this disconnect and call MQTT_AttemptConnect(...) // this works - reset the system when the retry count gets to a threshold if (retryAttempt == 5) NVIC_SystemReset(); else wait(timeout); } } int MQTT_PublishADC(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float adcVal_0) { MQTT::Message message; const char* pubTopic = MQTT_EVENT_TOPIC; char buf[MQTT_MAX_PAYLOAD_SIZE]; char timeBuf[50]; if (!client->isConnected()) { printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; } strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); sprintf(buf, "{\"type\":1,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"adc0\":%.1f}", timeBuf, commandID, adcVal_0); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf); if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) printf("message too long!\r\n"); LOG("Publishing %s\n\r", buf); return client->publish(pubTopic, message); } int MQTT_PublishSensorVal(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, float DOVal) { MQTT::Message message; const char* pubTopic = MQTT_EVENT_TOPIC; char buf[MQTT_MAX_PAYLOAD_SIZE]; char timeBuf[50]; if (!client->isConnected()) { printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; } strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); sprintf(buf, "{\"type\":2,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"DO(ppm)\":%.2f}", timeBuf, commandID, DOVal); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf); if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) printf("message too long!\r\n"); LOG("Publishing %s\n\r", buf); return client->publish(pubTopic, message); } int MQTT_PublishRelayState(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, int relay1, int relay2, int relay3) { MQTT::Message message; const char* pubTopic = MQTT_EVENT_TOPIC; char buf[MQTT_MAX_PAYLOAD_SIZE]; char timeBuf[50]; if (!client->isConnected()) { printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; } strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); sprintf(buf, "{\"type\":3,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"relay1\":%d,\"relay2\":%d,\"relay3\":%d}", timeBuf, commandID, relay1, relay2, relay3); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf); if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) printf("message too long!\r\n"); LOG("Publishing %s\n\r", buf); return client->publish(pubTopic, message); } int MQTT_PublishConfigValue(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t mode, uint8_t minOxi, uint8_t maxOxi) { MQTT::Message message; const char* pubTopic = MQTT_EVENT_TOPIC; char buf[MQTT_MAX_PAYLOAD_SIZE]; char timeBuf[50]; if (!client->isConnected()) { printf ("---> MQTT DISCONNECTED\n\r"); return MQTT::FAILURE; } strftime(timeBuf, 50, "%Y/%m/%d %H:%M:%S", localtime(&inputTime)); sprintf(buf, "{\"type\":4,\"deviceId\":\"PROEVN\",\"time\":\"%s\",\"cmdId\":%d,\"mode\":%d,\"minOxygenVal\":%d,\"maxOxygenVal\":%d}", timeBuf, commandID, mode, minOxi, maxOxi); message.qos = MQTT::QOS0; message.retained = false; message.dup = false; message.payload = (void*)buf; message.payloadlen = strlen(buf); if((message.payloadlen + strlen(pubTopic)+1) >= MQTT_MAX_PACKET_SIZE) printf("message too long!\r\n"); LOG("Publishing %s\n\r", buf); return client->publish(pubTopic, message); } int MQTT_PublishAll(MQTT::Client<MQTTNetwork, Countdown, MQTT_MAX_PACKET_SIZE>* client, time_t inputTime, uint8_t uploadType, struct UploadValue uploadStruct) { int retVal; switch (uploadType) { case (ADC_VALUE): retVal = MQTT_PublishADC(client, inputTime, uploadStruct.ADC_DOVal); break; case (SENSOR_VALUE): retVal = MQTT_PublishSensorVal(client, inputTime, uploadStruct.SENSOR_DOVal); break; case (RELAY_STATE): retVal = MQTT_PublishRelayState(client, inputTime, uploadStruct.RELAY_State_1, uploadStruct.RELAY_State_2, uploadStruct.RELAY_State_3); break; case (CONFIG_VALUE): retVal = MQTT_PublishConfigValue(client, inputTime, uploadStruct.CONFIG_Mode, uploadStruct.CONFIG_MinOxi, uploadStruct.CONFIG_MaxOxi); break; default: break; } return retVal; } #endif /* __SIMPLEMQTT_H__ */